Skip to content

Commit

Permalink
Implement caching of the thief side index
Browse files Browse the repository at this point in the history
This reduces contention as operations can be performed on either side in
parallel without interference from the other side.
  • Loading branch information
polytypic committed Mar 3, 2024
1 parent ac6d6e6 commit 204ae86
Showing 1 changed file with 27 additions and 15 deletions.
42 changes: 27 additions & 15 deletions src_lockfree/ws_deque.ml
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,16 @@ module M : S = struct
type 'a t = {
top : int Atomic.t;
bottom : int Atomic.t;
top_cache : int ref;
mutable tab : 'a ref array;
}

let create () =
let top = Atomic.make 0 |> Multicore_magic.copy_as_padded in
let bottom = Atomic.make 0 |> Multicore_magic.copy_as_padded in
let tab = Array.make min_capacity (Obj.magic ()) in
{ top; bottom; tab } |> Multicore_magic.copy_as_padded
let bottom = Atomic.make 0 |> Multicore_magic.copy_as_padded in
let top_cache = ref 0 |> Multicore_magic.copy_as_padded in
{ top; bottom; top_cache; tab } |> Multicore_magic.copy_as_padded

let realloc a t b sz new_sz =
let new_a = Array.make new_sz (Obj.magic ()) in
Expand All @@ -71,16 +73,22 @@ module M : S = struct
(* Read of [bottom] by the owner simply does not require a fence as the
[bottom] is only mutated by the owner. *)
let b = Atomic.fenceless_get q.bottom in
let t = Atomic.get q.top in
let t_cache = !(q.top_cache) in
let a = q.tab in
let size = b - t in
let size = b - t_cache in
let capacity = Array.length a in
if size < capacity then begin
if
size < capacity
||
let t = Atomic.get q.top in
q.top_cache := t;
t != t_cache
then begin
Array.unsafe_set a (b land (capacity - 1)) v;
Atomic.incr q.bottom
end
else
let a = realloc a t b capacity (capacity lsl 1) in
let a = realloc a t_cache b capacity (capacity lsl 1) in
Array.unsafe_set a (b land (Array.length a - 1)) v;
q.tab <- a;
Atomic.incr q.bottom
Expand All @@ -104,25 +112,29 @@ module M : S = struct
q.tab <- realloc a t b capacity (capacity lsr 1);
match poly with Option -> Some res | Value -> res
end
else if size < 0 then begin
(* This write of [bottom] requires no fence. The deque is empty and
remains so until the next [push]. *)
Atomic.fenceless_set q.bottom (b + 1);
match poly with Option -> None | Value -> raise_notrace Exit
end
else
else if b = t then begin
(* Whether or not the [compare_and_set] below succeeds, [top_cache] can be
updated, because in either case [top] has been incremented. *)
q.top_cache := t + 1;
let got = Atomic.compare_and_set q.top t (t + 1) in
(* This write of [bottom] requires no fence. The deque is empty and
remains so until the next [push]. *)
Atomic.fenceless_set q.bottom (b + 1);
let a = q.tab in
let out = Array.unsafe_get a (b land (Array.length a - 1)) in
if got then begin
let a = q.tab in
let out = Array.unsafe_get a (b land (Array.length a - 1)) in
let res = !out in
out := Obj.magic ();
match poly with Option -> Some res | Value -> res
end
else match poly with Option -> None | Value -> raise_notrace Exit
end
else begin
(* This write of [bottom] requires no fence. The deque is empty and
remains so until the next [push]. *)
Atomic.fenceless_set q.bottom (b + 1);
match poly with Option -> None | Value -> raise_notrace Exit
end

let pop q = pop_as q Value
let pop_opt q = pop_as q Option
Expand Down

0 comments on commit 204ae86

Please sign in to comment.