diff --git a/lib_eio/core/promise.ml b/lib_eio/core/promise.ml index 9fee1b542..3b2fdcaa4 100644 --- a/lib_eio/core/promise.ml +++ b/lib_eio/core/promise.ml @@ -1,41 +1,10 @@ -(* Note on thread-safety - - Promises can be shared between domains, so everything here must be thread-safe. - - Wrapping everything in a mutex would be one way to do that, but that makes reads - slow, and only one domain would be able to read at a time. - - Instead, we use an Atomic to hold the state, plus an additional mutex for the waiters - while in the Unresolved state. This makes resolved promises faster (at the cost of - making operations on unresolved promises a bit slower). It also makes reasoning about - the code more fun. - - We can think of atomics and mutexes as "boxes", containing values and - invariants. To use them, you open the box to get access to the contents, - then close the box afterwards, restoring the invariant. For mutexes, - open/close is lock/unlock. For atomics, every operation implicitly opens and - closes the box. Any number of callers can share a reference to the box - itself; the runtime ensures a box can only be opened by one user at a time. - - We can hold a full reference to something (meaning no-one else has access to it - and we can mutate it), or a fraction (giving us read-only access but also - ensuring that no-one else can mutate it either). *) - type 'a state = | Resolved of 'a - | Unresolved of 'a Waiters.t * Mutex.t - (* The Unresolved state's mutex box contains: - - Full access to the Waiters. - - Half access to the promise's state. - - The invariant that if the promise is resolved then the waiters list is empty. *) + | Unresolved of Broadcast.t type !'a promise = { id : Ctf.id; - - state : 'a state Atomic.t; - (* This atomic box contains either: - - A non-zero share of the reference to the Resolved state. - - A half-share of the reference to the Unresolved state. *) + state : 'a state Atomic.t; (* Note: we always switch to Resolved before broadcasting *) } type +!'a t @@ -51,7 +20,7 @@ let of_public_resolver : 'a u -> 'a promise = Obj.magic let create_with_id id = let t = { id; - state = Atomic.make (Unresolved (Waiters.create (), Mutex.create ())); + state = Atomic.make (Unresolved (Broadcast.create ())); } in to_public_promise t, to_public_resolver t @@ -68,33 +37,33 @@ let create_resolved x = let await t = let t = of_public_promise t in match Atomic.get t.state with - (* If the atomic is resolved, we take a share of that reference and return - the remainder to the atomic (which will still be non-zero). We can then - continue to know that the promise is resolved after the [Atomic.get]. *) | Resolved x -> Ctf.note_read t.id; x - | Unresolved (q, mutex) -> - (* We discovered that the promise was unresolved, but we can't be sure it still is, - since we had to return the half-share reference to the atomic. So the [get] is - just to get access to the mutex. *) - Ctf.note_try_read t.id; - Mutex.lock mutex; - (* Having opened the mutex, we have: - - Access to the waiters. - - Half access to the promise's state (so we know it can't change until we close the mutex). - - The mutex invariant. *) + | Unresolved b -> + Suspend.enter (fun ctx enqueue -> + match Broadcast.suspend b (fun () -> enqueue (Ok ())) with + | None -> () (* We got resumed immediately *) + | Some request -> + match Atomic.get t.state with + | Resolved _ -> + (* The promise was resolved as we were suspending. + Resume now if we haven't already done so. *) + if Broadcast.cancel request then enqueue (Ok ()) + | Unresolved _ -> + (* We observed the promise to be still unresolved after registering a waiter. + Therefore any resolution must happen after we were registered and we will be notified. *) + Ctf.note_try_read t.id; + Cancel.Fiber_context.set_cancel_fn ctx (fun ex -> + if Broadcast.cancel request then enqueue (Error ex) + (* else already resumed *) + ) + ); match Atomic.get t.state with - | Unresolved _ -> - (* The promise is unresolved, and can't change while we hold the mutex. - It's therefore safe to add a new waiter (and let [Waiters.await] close the mutex). *) - Waiters.await ~mutex:(Some mutex) q t.id - (* Otherwise, the promise was resolved by the time we took the lock. - Release the lock (which is fine, as we didn't change anything). *) | Resolved x -> - Mutex.unlock mutex; Ctf.note_read t.id; x + | Unresolved _ -> assert false let await_exn t = match await t with @@ -105,30 +74,12 @@ let resolve t v = let rec resolve' t v = match Atomic.get t.state with | Resolved _ -> invalid_arg "Can't resolve already-resolved promise" - | Unresolved (q, mutex) as prev -> - (* The above [get] just gets us access to the mutex; - By the time we get here, the promise may have become resolved. *) - Mutex.lock mutex; - (* Having opened the mutex, we have: - - Access to the waiters. - - Half access to the promise's state (so we know it can't change until we close the mutex). - - The mutex invariant. - Now we open the atomic again, getting the other half access. Together, - this gives us full access to the state (i.e. no-one else can be using - it), allowing us to change it. - Note: we don't actually need an atomic CAS here, just a get and a set - would do, but this seems simplest. *) + | Unresolved b as prev -> if Atomic.compare_and_set t.state prev (Resolved v) then ( - (* The atomic now has half-access to the fullfilled state (which counts - as non-zero), and we have the other half. Now we need to restore the - mutex invariant by clearing the wakers. *) Ctf.note_resolved t.id ~ex:None; - Waiters.wake_all q v; - Mutex.unlock mutex + Broadcast.resume_all b ) else ( - (* Otherwise, the promise was already resolved when we opened the mutex. - Close it without any changes and retry. *) - Mutex.unlock mutex; + (* Otherwise, the promise was already resolved. Retry (to get the error). *) resolve' t v ) in