From 30fafa9f101186f9900960ad72f00c08a1684692 Mon Sep 17 00:00:00 2001 From: Vesa Karvonen Date: Tue, 9 Jul 2024 10:53:11 +0300 Subject: [PATCH] Add a "lazy" queue This essentially uses a "lazy" semi-immutable queue implemented using a mutable spine that is updated incrementally such that every operation on the queue is O(1). The lazy queue is then wrapped as a single atomic. This results in a relatively space efficient concurrent queue. Performance also seems relatively good with the obvious caveat that as only a single atomic is used, that atomic is a contention point, which somewhat limits scalability compared to queues that have separate mutable head and tail atomics. --- bench/bench_lazy_queue.ml | 79 +++++++++++++++++++++++++++++ bench/main.ml | 1 + src_lockfree/lazy_queue.ml | 83 +++++++++++++++++++++++++++++++ src_lockfree/lazy_queue.mli | 28 +++++++++++ src_lockfree/saturn_lockfree.ml | 3 +- src_lockfree/saturn_lockfree.mli | 1 + test/lazy_queue/dune | 7 +++ test/lazy_queue/stm_lazy_queue.ml | 61 +++++++++++++++++++++++ 8 files changed, 262 insertions(+), 1 deletion(-) create mode 100644 bench/bench_lazy_queue.ml create mode 100644 src_lockfree/lazy_queue.ml create mode 100644 src_lockfree/lazy_queue.mli create mode 100644 test/lazy_queue/dune create mode 100644 test/lazy_queue/stm_lazy_queue.ml diff --git a/bench/bench_lazy_queue.ml b/bench/bench_lazy_queue.ml new file mode 100644 index 00000000..e7a8e995 --- /dev/null +++ b/bench/bench_lazy_queue.ml @@ -0,0 +1,79 @@ +open Multicore_bench +module Queue = Saturn_lockfree.Lazy_queue + +let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () = + let t = Queue.create () in + + let op push = if push then Queue.push t 101 else Queue.pop_opt t |> ignore in + + let init _ = + assert (Queue.is_empty t); + Util.generate_push_and_pop_sequence n_msgs + in + let work _ bits = Util.Bits.iter op bits in + + Times.record ~budgetf ~n_domains:1 ~init ~work () + |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain" + +let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2) + ?(n_msgs = 50 * Util.iter_factor) () = + let n_domains = n_adders + n_takers in + + let t = Queue.create () in + + let n_msgs_to_take = Atomic.make 0 |> Multicore_magic.copy_as_padded in + let n_msgs_to_add = Atomic.make 0 |> Multicore_magic.copy_as_padded in + + let init _ = + assert (Queue.is_empty t); + Atomic.set n_msgs_to_take n_msgs; + Atomic.set n_msgs_to_add n_msgs + in + let work i () = + if i < n_adders then + let rec work () = + let n = Util.alloc n_msgs_to_add in + if 0 < n then begin + for i = 1 to n do + Queue.push t i + done; + work () + end + in + work () + else + let rec work () = + let n = Util.alloc n_msgs_to_take in + if n <> 0 then + let rec loop n = + if 0 < n then begin + match Queue.pop_opt t with + | None -> + Domain.cpu_relax (); + loop n + | Some _ -> loop (n - 1) + end + else work () + in + loop n + in + work () + in + + let config = + let format role n = + Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s") + in + Printf.sprintf "%s, %s" + (format "nb adder" n_adders) + (format "nb taker" n_takers) + in + + Times.record ~budgetf ~n_domains ~init ~work () + |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config + +let run_suite ~budgetf = + run_one_domain ~budgetf () + @ (Util.cross [ 1; 2 ] [ 1; 2 ] + |> List.concat_map @@ fun (n_adders, n_takers) -> + run_one ~budgetf ~n_adders ~n_takers ()) diff --git a/bench/main.ml b/bench/main.ml index accc3fff..ae15c80f 100644 --- a/bench/main.ml +++ b/bench/main.ml @@ -2,6 +2,7 @@ let benchmarks = [ ("Saturn Relaxed_queue", Bench_relaxed_queue.run_suite); ("Saturn_lockfree Queue", Bench_queue.run_suite); + ("Saturn_lockfree Lazy_queue", Bench_lazy_queue.run_suite); ("Saturn_lockfree Single_prod_single_cons_queue", Bench_spsc_queue.run_suite); ("Saturn_lockfree Size", Bench_size.run_suite); ("Saturn_lockfree Skiplist", Bench_skiplist.run_suite); diff --git a/src_lockfree/lazy_queue.ml b/src_lockfree/lazy_queue.ml new file mode 100644 index 00000000..21bd6502 --- /dev/null +++ b/src_lockfree/lazy_queue.ml @@ -0,0 +1,83 @@ +exception Empty + +type ('a, _) result = + | Option : ('a, 'a option) result + | Value : ('a, 'a) result + +type ('a, _) tdt = + | Nil : ('a, [> `Nil ]) tdt + | Cons : { value : 'a; mutable next : 'a spine } -> ('a, [> `Cons ]) tdt + +and 'a spine = S : ('a, [< `Nil | `Cons ]) tdt -> 'a spine [@@unboxed] + +type 'a cons = ('a, [ `Cons ]) tdt + +type 'a state = + | Zero + | Queue of { head : 'a cons; tail : 'a cons } + | Snoc of { head : 'a cons; tail : 'a cons; cons : 'a cons } + +type 'a t = 'a state Atomic.t + +let create () = Atomic.make Zero |> Multicore_magic.copy_as_padded + +let rec push t cons backoff = + let before = Atomic.get t in + let after = + match before with + | Zero -> Queue { head = cons; tail = cons } + | Queue q -> Snoc { head = q.head; tail = q.tail; cons } + | Snoc s -> + let (Cons tl) = s.tail in + if tl.next == S Nil then tl.next <- S s.cons; + Snoc { head = s.head; tail = s.cons; cons } + in + if not (Atomic.compare_and_set t before after) then + push t cons (Backoff.once backoff) + +let push t value = push t (Cons { value; next = S Nil }) Backoff.default + +let rec pop_as : type a r. a t -> (a, r) result -> _ -> r = + fun t result backoff -> + match Atomic.get t with + | Zero -> begin + match result with Option -> None | Value -> raise_notrace Empty + end + | Queue q as before -> + let (Cons hd) = q.head in + let after = + match hd.next with + | S Nil -> Zero + | S (Cons _ as head) -> Queue { head; tail = q.tail } + in + if Atomic.compare_and_set t before after then + match result with Value -> hd.value | Option -> Some hd.value + else pop_as t result (Backoff.once backoff) + | Snoc s as before -> + let (Cons tl) = s.tail in + if tl.next == S Nil then tl.next <- S s.cons; + let (Cons hd) = s.head in + let open struct + external as_cons : 'a spine -> 'a cons = "%identity" + end in + let after = Queue { head = as_cons hd.next; tail = s.cons } in + if Atomic.compare_and_set t before after then + match result with Value -> hd.value | Option -> Some hd.value + else pop_as t result (Backoff.once backoff) + +let peek_as : type a r. a t -> (a, r) result -> r = + fun t result -> + match Atomic.get t with + | Zero -> begin + match result with Option -> None | Value -> raise_notrace Empty + end + | Queue { head = Cons { value; _ }; _ } | Snoc { head = Cons { value; _ }; _ } + -> begin + match result with Value -> value | Option -> Some value + end + +let is_empty t = Zero == Atomic.get t +let[@inline] pop_exn t = pop_as t Value Backoff.default +let[@inline] pop_opt t = pop_as t Option Backoff.default +let[@inline] peek_exn t = peek_as t Value +let[@inline] peek_opt t = peek_as t Option diff --git a/src_lockfree/lazy_queue.mli b/src_lockfree/lazy_queue.mli new file mode 100644 index 00000000..33355ad3 --- /dev/null +++ b/src_lockfree/lazy_queue.mli @@ -0,0 +1,28 @@ +(** *) + +type !'a t +(** *) + +val create : unit -> 'a t +(** *) + +val is_empty : 'a t -> bool +(** *) + +val push : 'a t -> 'a -> unit +(** *) + +exception Empty +(** *) + +val pop_exn : 'a t -> 'a +(** *) + +val pop_opt : 'a t -> 'a option +(** *) + +val peek_exn : 'a t -> 'a +(** *) + +val peek_opt : 'a t -> 'a option +(** *) diff --git a/src_lockfree/saturn_lockfree.ml b/src_lockfree/saturn_lockfree.ml index 904aaec1..f3533846 100644 --- a/src_lockfree/saturn_lockfree.ml +++ b/src_lockfree/saturn_lockfree.ml @@ -28,11 +28,12 @@ Copyright (c) 2017, Nicolas ASSOUAD module Queue = Michael_scott_queue module Queue_unsafe = Michael_scott_queue_unsafe +module Lazy_queue = Lazy_queue module Stack = Treiber_stack module Work_stealing_deque = Ws_deque module Single_prod_single_cons_queue = Spsc_queue module Single_prod_single_cons_queue_unsafe = Spsc_queue_unsafe module Single_consumer_queue = Mpsc_queue module Relaxed_queue = Mpmc_relaxed_queue -module Size = Size module Skiplist = Skiplist +module Size = Size diff --git a/src_lockfree/saturn_lockfree.mli b/src_lockfree/saturn_lockfree.mli index 145f7d98..763e988e 100644 --- a/src_lockfree/saturn_lockfree.mli +++ b/src_lockfree/saturn_lockfree.mli @@ -32,6 +32,7 @@ Copyright (c) 2017, Nicolas ASSOUAD module Queue = Michael_scott_queue module Queue_unsafe = Michael_scott_queue_unsafe +module Lazy_queue = Lazy_queue module Stack = Treiber_stack module Work_stealing_deque = Ws_deque module Single_prod_single_cons_queue = Spsc_queue diff --git a/test/lazy_queue/dune b/test/lazy_queue/dune new file mode 100644 index 00000000..081980b2 --- /dev/null +++ b/test/lazy_queue/dune @@ -0,0 +1,7 @@ +(test + (package saturn_lockfree) + (name stm_lazy_queue) + (modules stm_lazy_queue) + (libraries saturn_lockfree qcheck-core qcheck-stm.stm stm_run) + (enabled_if + (= %{arch_sixtyfour} true))) diff --git a/test/lazy_queue/stm_lazy_queue.ml b/test/lazy_queue/stm_lazy_queue.ml new file mode 100644 index 00000000..ef69bedf --- /dev/null +++ b/test/lazy_queue/stm_lazy_queue.ml @@ -0,0 +1,61 @@ +open QCheck +open STM +module Queue = Saturn_lockfree.Lazy_queue + +module Spec = struct + type cmd = Push of int | Pop | Peek | Is_empty + + let show_cmd c = + match c with + | Push i -> "Push " ^ string_of_int i + | Pop -> "Pop" + | Peek -> "Peek" + | Is_empty -> "Is_empty" + + type state = int list + type sut = int Queue.t + + let arb_cmd _s = + let int_gen = Gen.nat in + QCheck.make ~print:show_cmd + (Gen.oneof + [ + Gen.map (fun i -> Push i) int_gen; + Gen.return Pop; + Gen.return Peek; + Gen.return Is_empty; + ]) + + let init_state = [] + let init_sut () = Queue.create () + let cleanup _ = () + + let next_state c s = + match c with + | Push i -> i :: s + | Pop -> begin match List.rev s with [] -> s | _ :: s' -> List.rev s' end + | Peek | Is_empty -> s + + let precond _ _ = true + + let run c d = + match c with + | Push i -> Res (unit, Queue.push d i) + | Pop -> Res (option int, Queue.pop_opt d) + | Peek -> Res (option int, Queue.peek_opt d) + | Is_empty -> Res (bool, Queue.is_empty d) + + let postcond c (s : state) res = + match (c, res) with + | Push _, Res ((Unit, _), _) -> true + | (Pop | Peek), Res ((Option Int, _), res) -> begin + match List.rev s with [] -> res = None | j :: _ -> res = Some j + end + | Is_empty, Res ((Bool, _), res) -> res = (s = []) + | _, _ -> false +end + +let () = + Stm_run.run ~count:500 ~verbose:true ~name:"Saturn_lockfree.Lazy_queue" + (module Spec) + |> exit