# A lock-free hash table for OCaml
Vesa Karvonen
--- ## Outline 1. Motivation 2. API Design 3. Implementation 4. Performance --- ## Motivation --- ### Use cases in Picos Wanted a parallelism-safe hash table for: - Reference counting FDs - Randomized scheduler - `Awaitable` — atomic + futex So, I wrote one. Took a couple of days initially.
([Picos](https://ocaml-multicore.github.io/picos/) — Interoperable effects based concurrency)
--- ### Why a _lock-free_ htbl? _[Lock-free](https://en.wikipedia.org/wiki/Non-blocking_algorithm) is a progress condition._
"at least one of the threads makes progress"
Can use in contexts where blocking is not an option.
_"Empirically 90% of ops are reads."_
With a non-blocking htbl, reads can scale ~perfectly.
With separate chaining, write can be 1 CAS and 1? FAD.
Expect reasonably good performance.
Scalable lock-based htbl is not trivial either.
--- ## API Design --- ### API (1/3) ```ocaml [|3-6|8-9|11|] type (!'k, !'v) t val create: ?hashed_type:(module HashedType with type t = 'k) -> unit -> ('k, 'v) t val find_exn: ('k, 'v) t -> 'k -> 'v val mem: ('k, 'v) t -> 'k -> bool val find_random_exn: ('k, 'v) t -> 'k ```
`*_exn` functions raise `Not_found` to avoid allocating an option.
--- ### API (2/3) ```ocaml [|1|3-5|7-9|5|] val try_add: ('k, 'v) t -> 'k -> 'v -> bool val try_compare_and_set: ('k, 'v) t -> 'k -> 'v -> 'v -> bool val try_set: ('k, 'v) t -> 'k -> 'v -> bool val set_exn: ('k, 'v) t -> 'k -> 'v -> 'v (* exchange *) val try_compare_and_remove: ('k, 'v) t -> 'k -> 'v -> bool val try_remove: ('k, 'v) t -> 'k -> bool val remove_exn: ('k, 'v) t -> 'k -> 'v ```
First-order operations with high consensus numbers.
Allows atomic update algorithms written on top, e.g. a lock-free randomized bag.
--- ### API (3/3) ```ocaml val to_seq: ('k, 'v) t -> ('k * 'v) Seq.t val remove_all: ('k, 'v) t -> ('k * 'v) Seq.t val copy: ('k, 'v) t -> ('k, 'v) t ```
Supports atomic snapshotting based on the internal resizing algorithm.
**Q:** _What about `length`, `iter`, `add_seq`, `seeded_hash`, ...?_
--- ### Example: A lock-free bag (1/2) ```ocaml module type Bag : sig type !'a t val create : unit -> 'a t val push : 'a t -> 'a -> unit val pop_exn : 'a t -> 'a end ```
`pop_exn` removes and returns a random element.
--- ### Example: A lock-free bag (2/2) ```ocaml [|6|7|8|10|11|12|4|] module Bag : Bag = struct type 'a t = (int, 'a) Htbl.t let create () = Htbl.create ~hashed_type:(module Int) () (* Fun.id *) let rec push t elem = let key = Random.bits () in if not (Htbl.try_add t key elem) then push t elem let rec pop_exn t = let key = Htbl.find_random_exn t in try Htbl.remove_exn t key with Not_found -> pop_exn t end ```
Familiar lock-free retry loops. See [docs](https://ocaml-multicore.github.io/picos/doc/picos_aux/Picos_aux_htbl/index.html#examples) for more examples.
**Pop quiz**: _Why no backoff?_
--- ## Implementation --- ### Basic features _Simple separate chaining_ _Size maintained in a non-linearizable manner_
Can be adapted to use linearizable size.
Scalable, but expensive to read.
_Automatic resizing_
Checks for need to resize probabilistically.
Doesn't block readers ⇒ wait-free.
Writers either "miss" or "hit" and help in parallel.
--- ### Data structure
Let's go from left to right...
--- ### Types (1/3) ```ocaml type ('k, 'v) t = ('k, 'v) state Atomic.t ``` ```ocaml [|2,4,7,8|3|5|6|] type ('k, 'v) state = { hash : 'k -> int; buckets : ('k, 'v) bucket Atomic_array.t; equal : 'k -> 'k -> bool; non_linearizable_size : int Atomic.t array; pending : ('k, 'v) pending; min_buckets : int; max_buckets : int; } ```
Whole state can be update with a single atomic op.
--- ### Types (2/3) ```ocaml [|2-8|9-12|] type ('k, 'v, _) tdt = | Nil : ('k, 'v, [> `Nil ]) tdt | Cons : { key : 'k; value : 'v; rest : ('k, 'v, [ `Nil | `Cons ]) tdt; } -> ('k, 'v, [> `Cons ]) tdt | Resize : { mutable spine : ('k, 'v, [ `Nil | `Cons ]) tdt; } -> ('k, 'v, [> `Resize ]) tdt ``` ```ocaml type ('k, 'v) bucket = | B : ('k, 'v, [< `Nil | `Cons | `Resize ]) tdt -> ('k, 'v) bucket [@@unboxed] ``` --- ### Types (3/3) ```ocaml [|2|3-6|] type ('k, 'v) pending = | Nothing | Resize of { buckets : ('k, 'v) bucket Atomic_array.t; non_linearizable_size : int Atomic.t array; } ``` --- ### Lookup ```ocaml [|2-4|5|6|7-9|10|] let find_exn t key = let r = Atomic.get t in let mask = Atomic_array.length r.buckets - 1 in let i = r.hash key land mask in match Atomic_array.unsafe_fenceless_get r.buckets i with | B Nil -> raise_notrace Not_found | B (Cons cons_r) -> if r.equal cons_r.key key then cons_r.value else assoc r.equal key cons_r.rest | B (Resize resize_r) -> assoc r.equal key resize_r.spine ``` ```ocaml let rec assoc t key = function | Nil -> raise_notrace Not_found | Cons r -> if t r.key key then r.value else assoc t key r.rest ```
Wait-free. Flat `Atomic_array`.
`Atomic.get t` prevents reordering ops on the thread.
--- ### Update (1 of 3) ```ocaml [|2-4|5|7|8-9|10|11|] let rec try_add t key value backoff = let r = Atomic.get t in let mask = Atomic_array.length r.buckets - 1 in let i = r.hash key land mask in match Atomic_array.unsafe_fenceless_get r.buckets i with | B Nil -> let after = Cons { key; value; rest = Nil } in if Atomic_array.unsafe_compare_and_set r.buckets i (B Nil) (B after) then adjust_size t r mask 1 true else try_add t key value (Backoff.once backoff) (* 1 of 3: Empty bucket - add a new binding and adjust size *) ``` --- ### Update (2 of 3) ```ocaml [|7|8|10-13|] let rec try_add t key value backoff = let r = Atomic.get t in let mask = Atomic_array.length r.buckets - 1 in let i = r.hash key land mask in match Atomic_array.unsafe_fenceless_get r.buckets i with | B (Cons _ as before) -> if exists r.equal key before then false else let after = Cons { key; value; rest = before } in if Atomic_array.unsafe_compare_and_set r.buckets i (B before) (B after) then adjust_size t r mask 1 true else try_add t key value (Backoff.once backoff) (* 2 of 3: Non-empty bucket - check before adding *) ``` --- ### Update (3 of 3) ```ocaml [|7|8|9|] let rec try_add t key value backoff = let r = Atomic.get t in let mask = Atomic_array.length r.buckets - 1 in let i = r.hash key land mask in match Atomic_array.unsafe_fenceless_get r.buckets i with | B (Resize _) -> let _ = finish t (Atomic.get t) in try_add t key value Backoff.default (* 3 of 3: Resize - finish resize, then retry *) ``` --- ### Adjusting size (1 of 4) ```ocaml [|2|3|] let rec adjust_size t r mask delta result = let i = Multicore_magic.instantaneous_domain_index () in let n = Array.length r.non_linearizable_size in if i < n then begin (* Increment counter *) end else (* Double counters *) ```
`instantaneous_domain_index ()` ∊ `[0, num_live_domains[`
--- ### Adjusting size (2 of 4)
Original counter array on the left and doubled, i.e. `|2n+1|`, on the right.
--- ### Adjusting size (3 of 4) ```ocaml [|3-5|4|5|7-9|] (* Double counters *) let new_cs = Array.init (n + n + 1) @@ fun i -> if i < n then Array.unsafe_get r.non_linearizable_size i else Atomic.make_contended 0 in let new_r = { r with non_linearizable_size = new_cs } in let r = if Atomic.compare_and_set t r new_r then new_r else Atomic.get t in adjust_size t r mask delta result ```
Reusing the old counters allows others threads to potentially update them in parallel.
--- ### Adjusting size (4 of 4) ```ocaml [|2|3-5|7|9-10|11-13|] (* Increment counter *) Atomic.fetch_and_add (Array.unsafe_get r.non_linearizable_size i) delta |> ignore; if r.pending == Nothing && Int64.to_int (Random.bits64 ()) land mask = 0 && Atomic.get t == r then begin let estimated_size = non_linearizable_size r in let capacity = Atomic_array.length r.buckets in if capacity < estimated_size && capacity < r.max_buckets then try_resize t r (capacity + capacity) ~clear:false |> ignore else if r.min_buckets < capacity && estimated_size + estimated_size + estimated_size < capacity then try_resize t r (capacity lsr 1) ~clear:false |> ignore end; result ```
Size estimate is computed only occasionally, because it is expensive (contention).
`non_linearizable_size` just sums the counters.
--- ### Resizing ---
Table is getting full.
---
We create double size `Atomic_array` and switch `pending` to `Resize { ... }`.
---
We _mark_ original buckets with `Resize` nodes. Recall the `Resize` case in `try_add`.
---
We _split_ buckets on new top hash bit to the new array.
---
We switch `pending` back to `Nothing` and we're done.
--- ### Actually... The mark and split are done incrementally per bucket rather than for all. Halving the buckets _marks two buckets_ and _merges_ them. Copying _marks_ a bucket and _copies_ it. We need to check before update whether op is still pending — otherwise we could disturb the _next_ resize. Updaters help in parallel. Buckets processed with a random _coprime_, i.e. an odd, increment. --- ```ocaml [|2|3|10|11|12|] let try_resize t r new_capacity ~clear = let resize_avoid_aba = if clear then B Nil else B (Resize { spine = Nil }) in let buckets = Atomic_array.make new_capacity resize_avoid_aba in let non_linearizable_size = if clear then Array.init (Array.length r.non_linearizable_size) @@ fun _ -> Atomic.make_contended 0 else r.non_linearizable_size in let new_r = { r with pending = Resize { buckets; non_linearizable_size } } in Atomic.compare_and_set t r new_r && begin finish t new_r |> ignore; true end ```
(`Resize { spine }` is mutable.)
--- ```ocaml [|6|7-8|9-10|11-12|14-15|] let rec finish t r = match r.pending with | Resize { buckets; non_linearizable_size } -> let high_source = Atomic_array.length r.buckets in let high_target = Atomic_array.length buckets in let step = Int64.to_int (Random.bits64 ()) lor 1 in if if high_source < high_target then split_all r buckets 0 t step (* grow *) else if high_target < high_source then merge_all r buckets 0 t step (* shrink *) else copy_all r buckets 0 t step (* copy *) then let new_r = { r with buckets; non_linearizable_size; pending = Nothing } in if Atomic.compare_and_set t r new_r then new_r else finish t (Atomic.get t) else finish t (Atomic.get t) | Nothing -> r ``` --- ```ocaml [|2-3|5-6|9|] let rec take_at backoff bs i = match Atomic_array.unsafe_fenceless_get bs i with | B ((Nil | Cons _) as spine) -> if Atomic_array.unsafe_compare_and_set bs i (B spine) (B (Resize { spine })) then spine else take_at (Backoff.once backoff) bs i | B (Resize spine_r) -> spine_r.spine ```
i.e. marking a source bucket.
Source buckets never change after marking.
--- ```ocaml [|2|3|4|5|8-10|14|] let rec copy_all r target i t step = let i = (i + step) land (Atomic_array.length target - 1) in let spine = take_at Backoff.default r.buckets i in let (B before) = Atomic_array.unsafe_fenceless_get target i in Atomic.get t == r (* resize still ongoing? *) && begin begin match before with (* [Resize _] value is unique to resize *) | Resize _ -> Atomic_array.unsafe_compare_and_set target i (B before) (B spine) |> ignore | Nil | Cons _ -> () end; i = 0 || copy_all r target i t step end ```
`split_all` and `merge_all` are very similar, but don't fit on a slide. 😅
--- ### Recap 1. Init target buckets with a unique value. 2. Mark source bucket(s). 3. Read target bucket. 4. Check resize is still ongoing. 5. Update target bucket comparing to previous value.
Steps 2 to 5 can be done in parallel in any bucket order.
Profit. --- ## Performance --- ### vs `Stdlib` (AMD Zen1) | mix | Picos | Stdlib | | :---: | --------------: | --------------: | | 90% R | **_38.16 M/s_** | 33.87 M/s | | 50% R | 21.24 M/s | **_23.01 M/s_** | | 10% R | 17.51 M/s | **_21.97 M/s_** |
Reads are faster? 👀
Writes are slightly slower as expected.
--- ### vs `Stdlib` (M3 Max) | mix | Picos | Stdlib | | :---: | --------------: | --------------: | | 90% R | **_76.56 M/s_** | 72.66 M/s | | 50% R | 48.35 M/s | **_49.35 M/s_** | | 10% R | 45.13 M/s | **_48.92 M/s_** |
Again, reads are faster? 👀
Writes are slightly slower as expected.
--- ### Scalability (AMD Zen1) | mix | 1 domain | 2 domains | 4 domains | 8 domains | | :---: | --------: | --------: | --------: | --------: | | 90% R | 38.16 M/s | 44.62 M/s | 85.17 M/s | 150.4 M/s | | 50% R | 21.24 M/s | 20.79 M/s | 35.51 M/s | 66.44 M/s | | 10% R | 17.51 M/s | 14.51 M/s | 25.62 M/s | 49.41 M/s |
There is a drop going from 1 to 2 — core topology? 🤔
--- ### Scalability (M3 Max) | mix | 1 domain | 2 domains | 4 domains | 8 domains | | :---: | --------: | ---------: | ---------: | ---------: | | 90% R | 76.56 M/s | 143.16 M/s | 266.94 M/s | 362.86 M/s | | 50% R | 48.35 M/s | 85.28 M/s | 157.66 M/s | 179.22 M/s | | 10% R | 45.13 M/s | 75.27 M/s | 118.60 M/s | 125.70 M/s |
Two 6 P core clusters. One 4 E core cluster.
--- ## Take home - It is not super hard, so why not - Progress guarantees are nice - Pretty good performance - Very fast reads - Can you get anywhere close with locks? - It should be possible to do even better - My implementation is simple --- ## Questions?