Skip to content

Commit

Permalink
Various tweaks to improve performance
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Jan 27, 2024
1 parent 89ab563 commit 1c98c25
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 129 deletions.
215 changes: 88 additions & 127 deletions src_lockfree/ws_deque.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* Copyright (c) 2015, KC Sivaramakrishnan <sk826@cl.cam.ac.uk>
* Copyright (c) 2017, Nicolas ASSOUAD <nicolas.assouad@ens.fr>
* Copyright (c) 2021, Tom Kelly <ctk21@cl.cam.ac.uk>
* Copyright (c) 2024, Vesa Karvonen <vesa.a.j.k@gmail.com>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
Expand All @@ -27,6 +28,8 @@
* https://dl.acm.org/doi/abs/10.1145/2442516.2442524
*)

module Atomic = Transparent_atomic

module type S = sig
type 'a t

Expand All @@ -38,162 +41,120 @@ module type S = sig
val steal_opt : 'a t -> 'a option
end

module CArray = struct
type 'a t = 'a array

let rec log2 n = if n <= 1 then 0 else 1 + log2 (n asr 1)

let create sz v =
(* [sz] must be a power of two. *)
assert (0 < sz && sz = Int.shift_left 1 (log2 sz));
assert (Int.logand sz (sz - 1) == 0);
Array.make sz v

let[@inline] size t = Array.length t
let[@inline] mask t = size t - 1

let[@inline] index i t =
(* Because [size t] is a power of two, [i mod (size t)] is the same as
[i land (size t - 1)], that is, [i land (mask t)]. *)
Int.logand i (mask t)

let[@inline] get t i = Array.unsafe_get t (index i t)
let[@inline] put t i v = Array.unsafe_set t (index i t) v

let[@inline] transfer src dst top num =
ArrayExtra.blit_circularly (* source array and index: *)
src
(index top src) (* target array and index: *)
dst
(index top dst) (* number of elements: *)
num

let grow t top bottom =
let sz = size t in
assert (bottom - top = sz);
let dst = create (2 * sz) (Obj.magic ()) in
transfer t dst top sz;
dst

let shrink t top bottom =
let sz = size t in
assert (bottom - top <= sz / 2);
let dst = create (sz / 2) (Obj.magic ()) in
transfer t dst top (bottom - top);
dst
end

module M : S = struct
let min_size = 32
let shrink_const = 3
(** This must be a power of two. *)
let min_capacity = 16

type 'a t = {
top : int Atomic.t;
bottom : int Atomic.t;
tab : 'a ref CArray.t Atomic.t;
mutable next_shrink : int;
mutable tab : 'a ref array;
}

let create () =
let top = Atomic.make 1 |> Multicore_magic.copy_as_padded in
let bottom = Atomic.make 1 |> Multicore_magic.copy_as_padded in
let tab =
Atomic.make (CArray.create min_size (Obj.magic ()))
|> Multicore_magic.copy_as_padded
in
let next_shrink = 0 in
{ top; bottom; tab; next_shrink } |> Multicore_magic.copy_as_padded

let set_next_shrink q =
let sz = CArray.size (Atomic.get q.tab) in
if sz <= min_size then q.next_shrink <- 0
else q.next_shrink <- sz / shrink_const

let grow q t b =
Atomic.set q.tab (CArray.grow (Atomic.get q.tab) t b);
set_next_shrink q
let top = Atomic.make 0 |> Multicore_magic.copy_as_padded in
let bottom = Atomic.make 0 |> Multicore_magic.copy_as_padded in
let tab = Array.make min_capacity (Obj.magic ()) in
{ top; bottom; tab } |> Multicore_magic.copy_as_padded

let grow a t b =
let sz = b - t in
let dst = Array.make (sz lsl 1) (Obj.magic ()) in
ArrayExtra.blit_circularly a
(t land (sz - 1))
dst
(t land ((sz * 2) - 1))
sz;
dst

let size q =
let b = Atomic.get q.bottom in
let t = Atomic.get q.top in
b - t
let shrink a t b =
let sz = Array.length a in
let dst = Array.make (sz lsr 1) (Obj.magic ()) in
ArrayExtra.blit_circularly a
(t land (sz - 1))
dst
(t land ((sz lsr 1) - 1))
(b - t);
dst

let push q v =
let v' = ref v in
let b = Atomic.get q.bottom in
let v = ref v in
(* Read of [bottom] by the owner simply does not require a fence as the
[bottom] is only mutated by the owner. *)
let b = Atomic.fenceless_get q.bottom in
let t = Atomic.get q.top in
let a = Atomic.get q.tab in
let a = q.tab in
let size = b - t in
let a =
if size = CArray.size a then begin
grow q t b;
Atomic.get q.tab
end
else a
in
CArray.put a b v';
Atomic.set q.bottom (b + 1)
let capacity = Array.length a in
if size < capacity then begin
Array.unsafe_set a (b land (capacity - 1)) v;
Atomic.incr q.bottom
end
else
let a = grow a t b in
Array.unsafe_set a (b land (Array.length a - 1)) v;
q.tab <- a;
Atomic.incr q.bottom

type ('a, _) poly = Option : ('a, 'a option) poly | Value : ('a, 'a) poly

let[@inline] release : type a r. a ref -> (a, r) poly -> r =
fun ptr poly ->
let res = !ptr in
(* we know this ptr will never be dereferenced, but want to
break the reference to ensure that the contents of the
deque array get garbage collected *)
ptr := Obj.magic ();
match poly with Option -> Some res | Value -> res

let pop_as : type a r. a t -> (a, r) poly -> r =
fun q poly ->
if size q = 0 then match poly with Option -> None | Value -> raise Exit
let b = Atomic.fetch_and_add q.bottom (-1) - 1 in
(* Read of [top] at this point requires no fence as we simply need to ensure
that the read happens after updating [bottom]. *)
let t = Atomic.fenceless_get q.top in
let size = b - t in
if 0 < size then begin
let a = q.tab in
let mask = Array.length a - 1 in
let out = Array.unsafe_get a (b land mask) in
let res = !out in
out := Obj.magic ();
if size + size + size <= mask - min_capacity then q.tab <- shrink a t b;
match poly with Option -> Some res | Value -> res
end
else if size < 0 then begin
(* This write of [bottom] requires no fence. The deque is empty and
remains so until the next [push]. *)
Atomic.fenceless_set q.bottom (b + 1);
match poly with Option -> None | Value -> raise_notrace Exit
end
else
let b = Atomic.get q.bottom - 1 in
Atomic.set q.bottom b;
let t = Atomic.get q.top in
let a = Atomic.get q.tab in
let size = b - t in
if size < 0 then begin
(* empty queue *)
Atomic.set q.bottom (b + 1);
match poly with Option -> None | Value -> raise Exit
let got = Atomic.compare_and_set q.top t (t + 1) in
(* This write of [bottom] requires no fence. The deque is empty and
remains so until the next [push]. *)
Atomic.fenceless_set q.bottom (b + 1);
let a = q.tab in
let out = Array.unsafe_get a (b land (Array.length a - 1)) in
if got then begin
let res = !out in
out := Obj.magic ();
match poly with Option -> Some res | Value -> res
end
else
let out = CArray.get a b in
if b = t then
(* single last element *)
if Atomic.compare_and_set q.top t (t + 1) then begin
Atomic.set q.bottom (b + 1);
release out poly
end
else begin
Atomic.set q.bottom (b + 1);
match poly with Option -> None | Value -> raise Exit
end
else begin
(* non-empty queue *)
if q.next_shrink > size then begin
Atomic.set q.tab (CArray.shrink a t b);
set_next_shrink q
end;
release out poly
end
else match poly with Option -> None | Value -> raise_notrace Exit

let pop q = pop_as q Value
let pop_opt q = pop_as q Option

let rec steal_as : type a r. a t -> Backoff.t -> (a, r) poly -> r =
fun q backoff poly ->
let t = Atomic.get q.top in
(* Read of [top] does not require a fence at this point, but the read of
[top] must happen before the read of [bottom]. The write of [top] later
has no effect in case we happened to read an old value of [top]. *)
let t = Atomic.fenceless_get q.top in
let b = Atomic.get q.bottom in
let size = b - t in
if size <= 0 then match poly with Option -> None | Value -> raise Exit
else
let a = Atomic.get q.tab in
let out = CArray.get a t in
if Atomic.compare_and_set q.top t (t + 1) then release out poly
if 0 < size then
let a = q.tab in
let out = Array.unsafe_get a (t land (Array.length a - 1)) in
if Atomic.compare_and_set q.top t (t + 1) then begin
let res = !out in
out := Obj.magic ();
match poly with Option -> Some res | Value -> res
end
else steal_as q (Backoff.once backoff) poly
else match poly with Option -> None | Value -> raise_notrace Exit

let steal q = steal_as q Backoff.default Value
let steal_opt q = steal_as q Backoff.default Option
Expand Down
8 changes: 7 additions & 1 deletion test/ws_deque/dune
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@
(test
(package saturn_lockfree)
(name ws_deque_dscheck)
(libraries atomic dscheck alcotest backoff multicore-magic)
(libraries
atomic
dscheck
alcotest
backoff
transparent_atomic
multicore-magic)
(modules ArrayExtra ws_deque ws_deque_dscheck))

(test
Expand Down
3 changes: 2 additions & 1 deletion test/ws_deque/stm_ws_deque.ml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ let () =
agree_test_par_asym ~count
~name:"STM Saturn_lockfree.Ws_deque test parallel";
(* Note: this can generate, e.g., pop commands/actions in different threads, thus violating the spec. *)
(*
WSDT_dom.neg_agree_test_par ~count
~name:"STM Saturn_lockfree.Ws_deque test parallel, negative";
~name:"STM Saturn_lockfree.Ws_deque test parallel, negative"; *)
]

0 comments on commit 1c98c25

Please sign in to comment.