Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a "lazy" queue #145

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions bench/bench_lazy_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
open Multicore_bench
module Queue = Saturn_lockfree.Lazy_queue

let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
let t = Queue.create () in

let op push = if push then Queue.push t 101 else Queue.pop_opt t |> ignore in

let init _ =
assert (Queue.is_empty t);
Util.generate_push_and_pop_sequence n_msgs
in
let work _ bits = Util.Bits.iter op bits in

Times.record ~budgetf ~n_domains:1 ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"

let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2)
?(n_msgs = 50 * Util.iter_factor) () =
let n_domains = n_adders + n_takers in

let t = Queue.create () in

let n_msgs_to_take = Atomic.make 0 |> Multicore_magic.copy_as_padded in
let n_msgs_to_add = Atomic.make 0 |> Multicore_magic.copy_as_padded in

let init _ =
assert (Queue.is_empty t);
Atomic.set n_msgs_to_take n_msgs;
Atomic.set n_msgs_to_add n_msgs
in
let work i () =
if i < n_adders then
let rec work () =
let n = Util.alloc n_msgs_to_add in
if 0 < n then begin
for i = 1 to n do
Queue.push t i
done;
work ()
end
in
work ()
else
let rec work () =
let n = Util.alloc n_msgs_to_take in
if n <> 0 then
let rec loop n =
if 0 < n then begin
match Queue.pop_opt t with
| None ->
Domain.cpu_relax ();
loop n
| Some _ -> loop (n - 1)
end
else work ()
in
loop n
in
work ()
in

let config =
let format role n =
Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s")
in
Printf.sprintf "%s, %s"
(format "nb adder" n_adders)
(format "nb taker" n_takers)
in

Times.record ~budgetf ~n_domains ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config

let run_suite ~budgetf =
run_one_domain ~budgetf ()
@ (Util.cross [ 1; 2 ] [ 1; 2 ]
|> List.concat_map @@ fun (n_adders, n_takers) ->
run_one ~budgetf ~n_adders ~n_takers ())
1 change: 1 addition & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ let benchmarks =
[
("Saturn Relaxed_queue", Bench_relaxed_queue.run_suite);
("Saturn_lockfree Queue", Bench_queue.run_suite);
("Saturn_lockfree Lazy_queue", Bench_lazy_queue.run_suite);
("Saturn_lockfree Single_prod_single_cons_queue", Bench_spsc_queue.run_suite);
("Saturn_lockfree Size", Bench_size.run_suite);
("Saturn_lockfree Skiplist", Bench_skiplist.run_suite);
Expand Down
83 changes: 83 additions & 0 deletions src_lockfree/lazy_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
exception Empty

type ('a, _) result =
| Option : ('a, 'a option) result
| Value : ('a, 'a) result

type ('a, _) tdt =
| Nil : ('a, [> `Nil ]) tdt
| Cons : { value : 'a; mutable next : 'a spine } -> ('a, [> `Cons ]) tdt

and 'a spine = S : ('a, [< `Nil | `Cons ]) tdt -> 'a spine [@@unboxed]

type 'a cons = ('a, [ `Cons ]) tdt

type 'a state =
| Zero
| Queue of { head : 'a cons; tail : 'a cons }
| Snoc of { head : 'a cons; tail : 'a cons; cons : 'a cons }

type 'a t = 'a state Atomic.t

let create () = Atomic.make Zero |> Multicore_magic.copy_as_padded

let rec push t cons backoff =
let before = Atomic.get t in
let after =
match before with
| Zero -> Queue { head = cons; tail = cons }
| Queue q -> Snoc { head = q.head; tail = q.tail; cons }
| Snoc s ->
let (Cons tl) = s.tail in
if tl.next == S Nil then tl.next <- S s.cons;
Snoc { head = s.head; tail = s.cons; cons }
in
if not (Atomic.compare_and_set t before after) then
push t cons (Backoff.once backoff)

let push t value = push t (Cons { value; next = S Nil }) Backoff.default

let rec pop_as : type a r. a t -> (a, r) result -> _ -> r =
fun t result backoff ->
match Atomic.get t with
| Zero -> begin
match result with Option -> None | Value -> raise_notrace Empty
end
| Queue q as before ->
let (Cons hd) = q.head in
let after =
match hd.next with
| S Nil -> Zero
| S (Cons _ as head) -> Queue { head; tail = q.tail }
in
if Atomic.compare_and_set t before after then
match result with Value -> hd.value | Option -> Some hd.value
else pop_as t result (Backoff.once backoff)
| Snoc s as before ->
let (Cons tl) = s.tail in
if tl.next == S Nil then tl.next <- S s.cons;
let (Cons hd) = s.head in
let open struct
external as_cons : 'a spine -> 'a cons = "%identity"
end in
let after = Queue { head = as_cons hd.next; tail = s.cons } in
if Atomic.compare_and_set t before after then
match result with Value -> hd.value | Option -> Some hd.value
else pop_as t result (Backoff.once backoff)

let peek_as : type a r. a t -> (a, r) result -> r =
fun t result ->
match Atomic.get t with
| Zero -> begin
match result with Option -> None | Value -> raise_notrace Empty
end
| Queue { head = Cons { value; _ }; _ } | Snoc { head = Cons { value; _ }; _ }
-> begin
match result with Value -> value | Option -> Some value
end

let is_empty t = Zero == Atomic.get t
let[@inline] pop_exn t = pop_as t Value Backoff.default
let[@inline] pop_opt t = pop_as t Option Backoff.default
let[@inline] peek_exn t = peek_as t Value
let[@inline] peek_opt t = peek_as t Option
28 changes: 28 additions & 0 deletions src_lockfree/lazy_queue.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
(** *)

type !'a t
(** *)

val create : unit -> 'a t
(** *)

val is_empty : 'a t -> bool
(** *)

val push : 'a t -> 'a -> unit
(** *)

exception Empty
(** *)

val pop_exn : 'a t -> 'a
(** *)

val pop_opt : 'a t -> 'a option
(** *)

val peek_exn : 'a t -> 'a
(** *)

val peek_opt : 'a t -> 'a option
(** *)
3 changes: 2 additions & 1 deletion src_lockfree/saturn_lockfree.ml
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ Copyright (c) 2017, Nicolas ASSOUAD <nicolas.assouad@ens.fr>

module Queue = Michael_scott_queue
module Queue_unsafe = Michael_scott_queue_unsafe
module Lazy_queue = Lazy_queue
module Stack = Treiber_stack
module Work_stealing_deque = Ws_deque
module Single_prod_single_cons_queue = Spsc_queue
module Single_prod_single_cons_queue_unsafe = Spsc_queue_unsafe
module Single_consumer_queue = Mpsc_queue
module Relaxed_queue = Mpmc_relaxed_queue
module Size = Size
module Skiplist = Skiplist
module Size = Size
1 change: 1 addition & 0 deletions src_lockfree/saturn_lockfree.mli
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Copyright (c) 2017, Nicolas ASSOUAD <nicolas.assouad@ens.fr>

module Queue = Michael_scott_queue
module Queue_unsafe = Michael_scott_queue_unsafe
module Lazy_queue = Lazy_queue
module Stack = Treiber_stack
module Work_stealing_deque = Ws_deque
module Single_prod_single_cons_queue = Spsc_queue
Expand Down
7 changes: 7 additions & 0 deletions test/lazy_queue/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
(test
(package saturn_lockfree)
(name stm_lazy_queue)
(modules stm_lazy_queue)
(libraries saturn_lockfree qcheck-core qcheck-stm.stm stm_run)
(enabled_if
(= %{arch_sixtyfour} true)))
61 changes: 61 additions & 0 deletions test/lazy_queue/stm_lazy_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
open QCheck
open STM
module Queue = Saturn_lockfree.Lazy_queue

module Spec = struct
type cmd = Push of int | Pop | Peek | Is_empty

let show_cmd c =
match c with
| Push i -> "Push " ^ string_of_int i
| Pop -> "Pop"
| Peek -> "Peek"
| Is_empty -> "Is_empty"

type state = int list
type sut = int Queue.t

let arb_cmd _s =
let int_gen = Gen.nat in
QCheck.make ~print:show_cmd
(Gen.oneof
[
Gen.map (fun i -> Push i) int_gen;
Gen.return Pop;
Gen.return Peek;
Gen.return Is_empty;
])

let init_state = []
let init_sut () = Queue.create ()
let cleanup _ = ()

let next_state c s =
match c with
| Push i -> i :: s
| Pop -> begin match List.rev s with [] -> s | _ :: s' -> List.rev s' end
| Peek | Is_empty -> s

let precond _ _ = true

let run c d =
match c with
| Push i -> Res (unit, Queue.push d i)
| Pop -> Res (option int, Queue.pop_opt d)
| Peek -> Res (option int, Queue.peek_opt d)
| Is_empty -> Res (bool, Queue.is_empty d)

let postcond c (s : state) res =
match (c, res) with
| Push _, Res ((Unit, _), _) -> true
| (Pop | Peek), Res ((Option Int, _), res) -> begin
match List.rev s with [] -> res = None | j :: _ -> res = Some j
end
| Is_empty, Res ((Bool, _), res) -> res = (s = [])
| _, _ -> false
end

let () =
Stm_run.run ~count:500 ~verbose:true ~name:"Saturn_lockfree.Lazy_queue"
(module Spec)
|> exit
Loading