Skip to content

Commit

Permalink
Make Eio.Promise lock-free
Browse files Browse the repository at this point in the history
This is slightly faster and simplifies the code.
  • Loading branch information
talex5 committed Jan 9, 2023
1 parent a62f7b8 commit e93a042
Showing 1 changed file with 26 additions and 75 deletions.
101 changes: 26 additions & 75 deletions lib_eio/core/promise.ml
Original file line number Diff line number Diff line change
@@ -1,41 +1,10 @@
(* Note on thread-safety
Promises can be shared between domains, so everything here must be thread-safe.
Wrapping everything in a mutex would be one way to do that, but that makes reads
slow, and only one domain would be able to read at a time.
Instead, we use an Atomic to hold the state, plus an additional mutex for the waiters
while in the Unresolved state. This makes resolved promises faster (at the cost of
making operations on unresolved promises a bit slower). It also makes reasoning about
the code more fun.
We can think of atomics and mutexes as "boxes", containing values and
invariants. To use them, you open the box to get access to the contents,
then close the box afterwards, restoring the invariant. For mutexes,
open/close is lock/unlock. For atomics, every operation implicitly opens and
closes the box. Any number of callers can share a reference to the box
itself; the runtime ensures a box can only be opened by one user at a time.
We can hold a full reference to something (meaning no-one else has access to it
and we can mutate it), or a fraction (giving us read-only access but also
ensuring that no-one else can mutate it either). *)

type 'a state =
| Resolved of 'a
| Unresolved of 'a Waiters.t * Mutex.t
(* The Unresolved state's mutex box contains:
- Full access to the Waiters.
- Half access to the promise's state.
- The invariant that if the promise is resolved then the waiters list is empty. *)
| Unresolved of Broadcast.t

type !'a promise = {
id : Ctf.id;

state : 'a state Atomic.t;
(* This atomic box contains either:
- A non-zero share of the reference to the Resolved state.
- A half-share of the reference to the Unresolved state. *)
state : 'a state Atomic.t; (* Note: we always switch to Resolved before broadcasting *)
}

type +!'a t
Expand All @@ -51,7 +20,7 @@ let of_public_resolver : 'a u -> 'a promise = Obj.magic
let create_with_id id =
let t = {
id;
state = Atomic.make (Unresolved (Waiters.create (), Mutex.create ()));
state = Atomic.make (Unresolved (Broadcast.create ()));
} in
to_public_promise t, to_public_resolver t

Expand All @@ -68,33 +37,33 @@ let create_resolved x =
let await t =
let t = of_public_promise t in
match Atomic.get t.state with
(* If the atomic is resolved, we take a share of that reference and return
the remainder to the atomic (which will still be non-zero). We can then
continue to know that the promise is resolved after the [Atomic.get]. *)
| Resolved x ->
Ctf.note_read t.id;
x
| Unresolved (q, mutex) ->
(* We discovered that the promise was unresolved, but we can't be sure it still is,
since we had to return the half-share reference to the atomic. So the [get] is
just to get access to the mutex. *)
Ctf.note_try_read t.id;
Mutex.lock mutex;
(* Having opened the mutex, we have:
- Access to the waiters.
- Half access to the promise's state (so we know it can't change until we close the mutex).
- The mutex invariant. *)
| Unresolved b ->
Suspend.enter (fun ctx enqueue ->
match Broadcast.suspend b (fun () -> enqueue (Ok ())) with
| None -> () (* We got resumed immediately *)
| Some request ->
match Atomic.get t.state with
| Resolved _ ->
(* The promise was resolved as we were suspending.
Resume now if we haven't already done so. *)
if Broadcast.cancel request then enqueue (Ok ())
| Unresolved _ ->
(* We observed the promise to be still unresolved after registering a waiter.
Therefore any resolution must happen after we were registered and we will be notified. *)
Ctf.note_try_read t.id;
Cancel.Fiber_context.set_cancel_fn ctx (fun ex ->
if Broadcast.cancel request then enqueue (Error ex)
(* else already resumed *)
)
);
match Atomic.get t.state with
| Unresolved _ ->
(* The promise is unresolved, and can't change while we hold the mutex.
It's therefore safe to add a new waiter (and let [Waiters.await] close the mutex). *)
Waiters.await ~mutex:(Some mutex) q t.id
(* Otherwise, the promise was resolved by the time we took the lock.
Release the lock (which is fine, as we didn't change anything). *)
| Resolved x ->
Mutex.unlock mutex;
Ctf.note_read t.id;
x
| Unresolved _ -> assert false

let await_exn t =
match await t with
Expand All @@ -105,30 +74,12 @@ let resolve t v =
let rec resolve' t v =
match Atomic.get t.state with
| Resolved _ -> invalid_arg "Can't resolve already-resolved promise"
| Unresolved (q, mutex) as prev ->
(* The above [get] just gets us access to the mutex;
By the time we get here, the promise may have become resolved. *)
Mutex.lock mutex;
(* Having opened the mutex, we have:
- Access to the waiters.
- Half access to the promise's state (so we know it can't change until we close the mutex).
- The mutex invariant.
Now we open the atomic again, getting the other half access. Together,
this gives us full access to the state (i.e. no-one else can be using
it), allowing us to change it.
Note: we don't actually need an atomic CAS here, just a get and a set
would do, but this seems simplest. *)
| Unresolved b as prev ->
if Atomic.compare_and_set t.state prev (Resolved v) then (
(* The atomic now has half-access to the fullfilled state (which counts
as non-zero), and we have the other half. Now we need to restore the
mutex invariant by clearing the wakers. *)
Ctf.note_resolved t.id ~ex:None;
Waiters.wake_all q v;
Mutex.unlock mutex
Broadcast.resume_all b
) else (
(* Otherwise, the promise was already resolved when we opened the mutex.
Close it without any changes and retry. *)
Mutex.unlock mutex;
(* Otherwise, the promise was already resolved. Retry (to get the error). *)
resolve' t v
)
in
Expand Down

0 comments on commit e93a042

Please sign in to comment.