diff --git a/Makefile b/Makefile index 8934f79a0..7a851b86f 100644 --- a/Makefile +++ b/Makefile @@ -19,6 +19,7 @@ test_luv: EIO_BACKEND=luv dune runtest dscheck: + dune exec -- ./lib_eio/tests/dscheck/test_sync.exe dune exec -- ./lib_eio/tests/dscheck/test_semaphore.exe dune exec -- ./lib_eio/tests/dscheck/test_cells.exe diff --git a/lib_eio/stream.ml b/lib_eio/stream.ml index fdce7688c..974cfa3b7 100644 --- a/lib_eio/stream.ml +++ b/lib_eio/stream.ml @@ -1,128 +1,136 @@ -type 'a t = { - mutex : Mutex.t; - - id : Ctf.id; - - capacity : int; - items : 'a Queue.t; - - (* Readers suspended because [items] is empty. *) - readers : 'a Waiters.t; - - (* Writers suspended because [items] is at capacity. *) - writers : unit Waiters.t; -} - -let with_mutex t f = - Mutex.lock t.mutex; - match f () with - | x -> Mutex.unlock t.mutex; x - | exception ex -> Mutex.unlock t.mutex; raise ex - -(* Invariants *) -let _validate t = - with_mutex t @@ fun () -> - assert (Queue.length t.items <= t.capacity); - assert (Waiters.is_empty t.readers || Queue.is_empty t.items); - assert (Waiters.is_empty t.writers || Queue.length t.items = t.capacity) - -let create capacity = - assert (capacity >= 0); - let id = Ctf.mint_id () in - Ctf.note_created id Ctf.Stream; - { - mutex = Mutex.create (); - id; - capacity; - items = Queue.create (); - readers = Waiters.create (); - writers = Waiters.create (); +module Locking = struct + type 'a t = { + mutex : Mutex.t; + + id : Ctf.id; + + capacity : int; (* [capacity > 0] *) + items : 'a Queue.t; + + (* Readers suspended because [items] is empty. *) + readers : 'a Waiters.t; + + (* Writers suspended because [items] is at capacity. *) + writers : unit Waiters.t; } -let add t item = - Mutex.lock t.mutex; - match Waiters.wake_one t.readers item with - | `Ok -> Mutex.unlock t.mutex - | `Queue_empty -> - (* No-one is waiting for an item. Queue it. *) - if Queue.length t.items < t.capacity then ( - Queue.add item t.items; - Mutex.unlock t.mutex - ) else ( - (* The queue is full. Wait for our turn first. *) - Suspend.enter_unchecked @@ fun ctx enqueue -> - Waiters.await_internal ~mutex:(Some t.mutex) t.writers t.id ctx (fun r -> - (* This is called directly from [wake_one] and so we have the lock. - We're still running in [wake_one]'s domain here. *) - if Result.is_ok r then ( - (* We get here immediately when called by [take], either: - 1. after removing an item, so there is space, or - 2. if [capacity = 0]; [take] will immediately remove the new item. *) - Queue.add item t.items; - ); - enqueue r - ) - ) - -let take t = - Mutex.lock t.mutex; - match Queue.take_opt t.items with - | None -> - (* There aren't any items, so we probably need to wait for one. - However, there's also the special case of a zero-capacity queue to deal with. - [is_empty writers || capacity = 0] *) - begin match Waiters.wake_one t.writers () with - | `Queue_empty -> - Waiters.await ~mutex:(Some t.mutex) t.readers t.id - | `Ok -> - (* [capacity = 0] (this is the only way we can get waiters and no items). - [wake_one] has just added an item to the queue; remove it to restore - the invariant before closing the mutex. *) - let x = Queue.take t.items in - Mutex.unlock t.mutex; - x - end - | Some v -> - (* If anyone was waiting for space, let the next one go. - [is_empty writers || length items = t.capacity - 1] *) - begin match Waiters.wake_one t.writers () with - | `Ok (* [length items = t.capacity] again *) - | `Queue_empty -> () (* [is_empty writers] *) - end; - Mutex.unlock t.mutex; - v - -let take_nonblocking t = - Mutex.lock t.mutex; - match Queue.take_opt t.items with - | None -> - (* There aren't any items. - However, there's also the special case of a zero-capacity queue to deal with. - [is_empty writers || capacity = 0] *) - begin match Waiters.wake_one t.writers () with - | `Queue_empty -> Mutex.unlock t.mutex; None - | `Ok -> - (* [capacity = 0] (this is the only way we can get waiters and no items). - [wake_one] has just added an item to the queue; remove it to restore - the invariant before closing the mutex. *) - let x = Queue.take t.items in - Mutex.unlock t.mutex; - Some x - end - | Some v -> - (* If anyone was waiting for space, let the next one go. - [is_empty writers || length items = t.capacity - 1] *) - begin match Waiters.wake_one t.writers () with - | `Ok (* [length items = t.capacity] again *) - | `Queue_empty -> () (* [is_empty writers] *) - end; + let with_mutex t f = + Mutex.lock t.mutex; + match f () with + | x -> Mutex.unlock t.mutex; x + | exception ex -> Mutex.unlock t.mutex; raise ex + + (* Invariants *) + let _validate t = + with_mutex t @@ fun () -> + assert (Queue.length t.items <= t.capacity); + assert (Waiters.is_empty t.readers || Queue.is_empty t.items); + assert (Waiters.is_empty t.writers || Queue.length t.items = t.capacity) + + let create capacity = + assert (capacity > 0); + let id = Ctf.mint_id () in + Ctf.note_created id Ctf.Stream; + { + mutex = Mutex.create (); + id; + capacity; + items = Queue.create (); + readers = Waiters.create (); + writers = Waiters.create (); + } + + let add t item = + Mutex.lock t.mutex; + match Waiters.wake_one t.readers item with + | `Ok -> Mutex.unlock t.mutex + | `Queue_empty -> + (* No-one is waiting for an item. Queue it. *) + if Queue.length t.items < t.capacity then ( + Queue.add item t.items; + Mutex.unlock t.mutex + ) else ( + (* The queue is full. Wait for our turn first. *) + Suspend.enter_unchecked @@ fun ctx enqueue -> + Waiters.await_internal ~mutex:(Some t.mutex) t.writers t.id ctx (fun r -> + (* This is called directly from [wake_one] and so we have the lock. + We're still running in [wake_one]'s domain here. *) + if Result.is_ok r then ( + (* We get here immediately when called by [take], after removing an item, + so there is space *) + Queue.add item t.items; + ); + enqueue r + ) + ) + + let take t = + Mutex.lock t.mutex; + match Queue.take_opt t.items with + | None -> + (* There aren't any items, so we need to wait for one. *) + Waiters.await ~mutex:(Some t.mutex) t.readers t.id + | Some v -> + (* If anyone was waiting for space, let the next one go. + [is_empty writers || length items = t.capacity - 1] *) + begin match Waiters.wake_one t.writers () with + | `Ok (* [length items = t.capacity] again *) + | `Queue_empty -> () (* [is_empty writers] *) + end; + Mutex.unlock t.mutex; + v + + let take_nonblocking t = + Mutex.lock t.mutex; + match Queue.take_opt t.items with + | None -> Mutex.unlock t.mutex; None (* There aren't any items. *) + | Some v -> + (* If anyone was waiting for space, let the next one go. + [is_empty writers || length items = t.capacity - 1] *) + begin match Waiters.wake_one t.writers () with + | `Ok (* [length items = t.capacity] again *) + | `Queue_empty -> () (* [is_empty writers] *) + end; + Mutex.unlock t.mutex; + Some v + + let length t = + Mutex.lock t.mutex; + let len = Queue.length t.items in Mutex.unlock t.mutex; - Some v + len + + let dump f t = + Fmt.pf f "" (length t) t.capacity +end + +type 'a t = + | Sync of 'a Sync.t + | Locking of 'a Locking.t + +let create = function + | 0 -> Sync (Sync.create ()) + | capacity -> Locking (Locking.create capacity) -let length t = - Mutex.lock t.mutex; - let len = Queue.length t.items in - Mutex.unlock t.mutex; - len +let add t v = + match t with + | Sync x -> Sync.put x v + | Locking x -> Locking.add x v + +let take = function + | Sync x -> Sync.take x + | Locking x -> Locking.take x + +let take_nonblocking = function + | Sync x -> Sync.take_nonblocking x + | Locking x -> Locking.take_nonblocking x + +let length = function + | Sync _ -> 0 + | Locking x -> Locking.length x let is_empty t = (length t = 0) + +let dump f = function + | Sync x -> Sync.dump f x + | Locking x -> Locking.dump f x diff --git a/lib_eio/stream.mli b/lib_eio/stream.mli index 68e97c646..6554cac1a 100644 --- a/lib_eio/stream.mli +++ b/lib_eio/stream.mli @@ -45,3 +45,6 @@ val length : 'a t -> int val is_empty : 'a t -> bool (** [is_empty t] is [length t = 0]. *) + +val dump : 'a t Fmt.t +(** For debugging. *) diff --git a/lib_eio/sync.ml b/lib_eio/sync.ml new file mode 100644 index 000000000..ee78c62a7 --- /dev/null +++ b/lib_eio/sync.ml @@ -0,0 +1,448 @@ +(* A lock-free synchronous channel with cancellation, using Cells. + + Producers and consumers are paired off and then the producer transfers its + value to the consumer. This is effectively a bounded queue with a capacity + of zero. + + Both producers and consumers can cancel while waiting. + + There is an atomic int ([balance]), plus two queues ([consumers] and + [producers]) made using Cells. When [balance] is positive, it is the number + of producers waiting with values that no one is yet responsible for + resuming. When negative, it is the (negative) number of waiting consumers + that no one is responsible for resuming. + + To put an item: + + 1. The producer increments [balance]. + 2. If it was negative, the producer resumes one waiting consumer on the [consumers] queue. + Otherwise, it suspends itself on the [producers] queue. + + To take an item: + + 1. The consumer decrements [balance]. + 2. If it was positive, the consumer resumes one waiting producer on the [producers] queue. + Otherwise, it suspends itself on the [consumers] queue. + + Therefore, we never try to resume on a queue unless another party has + started the process of suspending on it. + + The system will not become idle while a client is responsible for resuming + something. Therefore, when idle: + + - If [balance <= 0] then there are no waiting producers. + - If [balance >= 0] then there are no waiting consumers. + - So, we never have waiting consumers and producers at the same time. + + As usual with Cells, either party may get to the new cell first. Whichever party + arrives first writes a callback, which the other party will then call when they arrive. + + Note on terminology: + + - The "suspender" of a cell is the party that incremented the queue's suspend index, + and the "resumer" of a cell is the party that incremented the resume index. + + - Whether "suspending" or "resuming" a cell, you may still have to suspend + your fiber and resume it later. + + States + + There are four cell states: + + - [In_transition] indicates that the cell is still being initialised, or might be + getting cancelled. Either way, the suspending party is actively working to + change the cell's state. + + - [Item] indicates that the producer is ready to provide an item. + + - [Slot] indicates that the consumer is ready to receive an item. + + - [Finished] indicates that the cell is no longer being used (the value has + been consumed or the cell has finished being cancelled). + + The possible sequences of states on the [producers] queue are: + + In_transition -C> Slot -P> Finished (consumer arrives first) + `P> Item -C> Finished (producer arrives first) + `P> In_transition -P> Finished (producer cancels) + `C> Slot -P> Finished (cancellation interrupted) + + Only the producer can cancel here. For the [consumers] queue it's the + opposite - the consumer can cancel its [Slot]. + + Cancellation + + Note that there are two kinds of cancellation here: + + 1. A cancelled cell is not considered part of its queue. Anyone seeing one + (due to a race) will skip over it and use the next cell. + + 2. After a consumer and producer have been paired off (and the cell removed + from its queue), the consumer callback may reject the value. If this + happens, the producer must start all over again to find another consumer. + + Whenever a consumer sets its callback to reject values, it should then start + the process of cancelling its cell (if acting as a suspender) so that the + cell can be GC'd. + + A consumer can only cancel its cell when it's on the [consumers] queue. + If it's on [producers], it knows a wake up will be coming shortly anyway. + A consumer cancels its cell as follows: + + 1. The consumer sets its cell in [consumers] to [In_transition]. + 2. It increments [balance] (from a negative value). It is now committed to cancelling. + 3. It sets its cell to [Finished]. + + (1) will fail if the cell got resumed first. In that case the consumer just + rejects the cancellation attempt. + + (2) will fail if [balance >= 0]. In that case the consumer has not cancelled, + and is about to be resumed instead. It tries to return to the [Slot] state. + If that fails, the cell now contains an Item and the consumer takes it. + + (3) will fail if a producer arrived after the consumer committed to cancelling. + In that case, the consumer passes the Item on to the next consumer (there + must be another one, since both the consumer and producer incremented + [balance] from a negative value). + + Cancelling a producer is very similar to cancelling a consumer, just with the + [producers] queue and decrementing the balance from a positive value. + + Non-blocking take + + To perform a non-blocking take: + + 1. The consumer decrements [balance] from a positive number. + 2. The consumer takes the next resume cell from [producers]. + 3. The consumer takes the [Item] from the cell, setting it to [Finished]. + + (1) will fail if there are no unassigned items available. + Then the [take_nonblocking] returns [None], as there are no items waiting. + + (3) will fail if the producer is initialising or cancelling. In either case, + the consumer sets its cell to a request with a dummy callback that rejects + all values and continues immediately. + + The exchange + + Once a producer and consumer have been paired off (and so their cell is now Finished), + the producer's value is passed to the consumer's callback. If the consumer accepts it, + then both fibers are resumed. If not, the producer starts again (incrementing [balance] + again) and waits for another consumer. + + The above has not been formally verified (exercise for reader!). *) + +(* Import these directly because we copy this file for the dscheck tests. *) +module Fiber_context = Eio__core.Private.Fiber_context +module Suspend = Eio__core.Private.Suspend +module Cancel = Eio__core.Cancel + +type 'a item = { + v : 'a; + kp : (bool, exn) result -> unit; (* [Ok false] means consumer refused the item; retry. *) + cancel : [ + | `Resuming (* In the process of resuming, so can't cancel. *) + | `Suspended of (unit -> bool) (* Call this function to attempt to leave the queue. *) + | `Cancelled of exn (* Already cancelled. *) + ] Atomic.t; +} + +type 'a cell = + | In_transition + | Slot of ('a -> bool) + | Item of 'a item + | Finished + +module Cell = struct + type 'a t = 'a cell + + let init = In_transition + + let segment_order = 2 + + let dump f = function + | In_transition -> Fmt.string f "In_transition" + | Slot _ -> Fmt.string f "Slot" + | Item _ -> Fmt.string f "Item" + | Finished -> Fmt.string f "Finished" +end + +module Q = Cells.Make(Cell) + +type 'a t = { + balance : int Atomic.t; + consumers : 'a Q.t; + producers : 'a Q.t; +} + +type 'a loc = + | Short of 'a Cell.t Atomic.t (* Acting as resumer of cell *) + | Long of ('a Q.segment * 'a Cell.t Atomic.t) (* Acting as suspender of cell; can cancel *) + +let dump f t = + Fmt.pf f "@[Sync (balance=%d)@,@[Consumers:@,%a@]@,@[Producers:@,%a@]@]" + (Atomic.get t.balance) + Q.dump t.consumers + Q.dump t.producers + +(* Give [item] to consumer [kc]. [item]'s cell is now Finished. *) +let exchange item kc = item.kp (Ok (kc item.v)) + +(* Add [value] to [cell]. + If the cell is in transition, place [value] there and let the other party handle it later. + If the peer's value is already present, do the exchange. + If the peer cancelled the cell then try the next one on the given resume queue (if we're adding + to a suspend queue then it can't be cancelled, because the caller controls cancellation). + This is only used when our fiber is already suspended, + since we can't create [value] before we have the continuation. *) +let rec add_to_cell queue value cell = + match Atomic.get cell, value with + | Finished, _ -> add_to_cell queue value (Q.next_resume queue) (* Cancelled - skip *) + | (Slot kc as old), Item item + | (Item item as old), Slot kc -> + if Atomic.compare_and_set cell old Finished then exchange item kc + else add_to_cell queue value cell + | In_transition, _ -> + if Atomic.compare_and_set cell In_transition value then () + else add_to_cell queue value cell + | (Slot _ | Item _), _ -> assert false + +(* Cancelling *) + +let rec decr_balance_if_positive t = + let cur = Atomic.get t.balance in + if cur > 0 then ( + if Atomic.compare_and_set t.balance cur (cur - 1) then true + else decr_balance_if_positive t + ) else false + +let rec incr_balance_if_negative t = + let cur = Atomic.get t.balance in + if cur < 0 then ( + if Atomic.compare_and_set t.balance cur (cur + 1) then true + else incr_balance_if_negative t + ) else false + +(* Cancel [cell] on our suspend queue. + This function works for both consumers and producers, as we can tell from + the value what our role is (and if there isn't a value, we're finished anyway). + Neither party will try to cancel before writing its own value. + Returns [true] if the caller cancelled successfully, + or [false] if it must wait (as it's being resumed). *) +let cancel t (segment, cell) = + let cancel2 update_balance ~old = + if Atomic.compare_and_set cell old In_transition then ( + if update_balance t then ( + (* At this point, we are committed to cancelling. *) + begin match Atomic.exchange cell Finished with + | Finished -> assert false + | In_transition -> Q.cancel_cell segment + | Item request -> add_to_cell t.consumers (Item request) (Q.next_resume t.consumers) + | Slot kc -> add_to_cell t.producers (Slot kc) (Q.next_resume t.producers) + end; + true + ) else ( + (* We decided not to cancel. We know a resume is coming. *) + if Atomic.compare_and_set cell In_transition old then false + else ( + match old, Atomic.get cell with + | Slot kc, Item request + | Item request, Slot kc -> + Atomic.set cell Finished; + exchange request kc; + false + | _ -> assert false + ) + ) + ) else false (* The peer resumed us first *) + in + match Atomic.get cell with + | Finished -> false (* The peer resumed us first *) + | Slot _ as old -> cancel2 incr_balance_if_negative ~old (* We are a consumer *) + | Item _ as old -> cancel2 decr_balance_if_positive ~old (* We are a producer *) + | In_transition -> + (* Either we're initialising the cell, in which case we haven't told the + application how to cancel this location yet, or we're already + cancelling, but cancelling twice isn't permitted. *) + assert false + +(* A producer can't cancel if it is resuming on the [consumers] queue, and will instead + just wait for the slot in that case, which will arrive soon. However, after getting + a slot the producer may be rejected and be asked to start again on the [producers] queue, + so we need to remember that we were cancelled to prevent that. It's also possible that + we're already restarting but haven't got around to updating [request.cancel] yet; we'll + notice the new [`Cancelled] state when we do. *) +let cancel_put request ex = + match Atomic.exchange request.cancel (`Cancelled ex) with + | `Cancelled _ -> failwith "Already cancelled!" + | `Resuming -> false (* Cancellation fails for now, but we remember we wanted to cancel. *) + | `Suspended cancel -> cancel () + +(* Putting. *) + +(* Like [add_to_cell], but we haven't created our value yet as we haven't suspended the fiber. *) +let rec producer_resume_cell t ~success ~in_transition cell = + match Atomic.get (cell : _ Cell.t Atomic.t) with + | Item _ -> assert false + | In_transition -> in_transition cell + | Finished -> producer_resume_cell t ~success ~in_transition (Q.next_resume t.consumers) + | Slot k as old -> + if Atomic.compare_and_set cell old Finished then success k + else producer_resume_cell t ~success ~in_transition cell + +(* This is essentially the main [put] function, but parameterised so it can be shared with + the rejoin-after-rejection case. *) +let producer_join (t : _ t) ~success ~suspend = + let old = Atomic.fetch_and_add t.balance (+1) in + if old < 0 then ( + let cell = Q.next_resume t.consumers in + producer_resume_cell t cell + ~success + ~in_transition:(fun cell -> suspend (Short cell)) + ) else ( + suspend (Long (Q.next_suspend t.producers)) + ) + +(* Called when a consumer took our value but then rejected it. + We start the put operation again, except that our fiber is already suspended + so no need to do that again. We're probably running in the consumer's domain + (unless the consumer provided their callback while we were cancelling). *) +let put_already_suspended t request = + producer_join t + ~success:(exchange request) + ~suspend:(fun loc -> + let Short cell | Long (_, cell) = loc in + add_to_cell t.consumers (Item request) cell; + let rec aux () = + match Atomic.get request.cancel, loc with + | (`Suspended _ | `Resuming as prev), Long loc -> + (* We might be suspended for a while. Update the cancel function with the new location. *) + let cancel_fn () = cancel t loc in + if not (Atomic.compare_and_set request.cancel prev (`Suspended cancel_fn)) then aux () + | `Cancelled ex, Long loc -> + (* We got cancelled after the peer removed our cell and before we updated the + cancel function with the new location, or we were cancelled while doing a + (non-cancellable) resume. Deal with it now. *) + if cancel t loc then request.kp (Error ex); + (* else we got resumed first *) + | _, Short _ -> + (* We can't cancel while in the process of resuming a cell on the [consumers] queue. + We could set [cancel] to [`Resuming] here, but there's no need as trying to use the + old cancel function will find the old cell is cancelled and set [request.cancel] + to [`Cancelled]), as required. *) + () + in aux () + ) + +(* We tried to [put] and no slot was immediately available. + Suspend the fiber and use the continuation to finish initialising the cell. + Note that we may be suspending the fiber even when using the "resume" queue, + if the consumer is still in the process of writing its slot. *) +let put_suspend t v loc = + Suspend.enter_unchecked @@ fun ctx enqueue -> + let cancel = + match loc with + | Short _ -> `Resuming (* Can't cancel this *) + | Long loc -> `Suspended (fun () -> cancel t loc) + in + let rec item = { + v; + cancel = Atomic.make cancel; + kp = function + | Error _ as e -> enqueue e (* Cancelled by [put_already_suspended]. *) + | Ok true -> enqueue (Ok ()) (* Success! *) + | Ok false -> put_already_suspended t item (* Consumer rejected value. Restart. *) + } in + let Short cell | Long (_, cell) = loc in + add_to_cell t.consumers (Item item) cell; + (* Set up the cancel handler in either case because we might change queues later: *) + match Fiber_context.get_error ctx with + | Some ex -> + if cancel_put item ex then enqueue (Error ex); + (* else being resumed *) + | None -> + Fiber_context.set_cancel_fn ctx (fun ex -> + if cancel_put item ex then enqueue (Error ex) + (* else being resumed *) + ) + +let rec put (t : _ t) v = + producer_join t + ~success:(fun kc -> if kc v then () else put t v) + ~suspend:(put_suspend t v) + +(* Taking. *) + +(* Mirror of [producer_resume_cell]. *) +let rec consumer_resume_cell t ~success ~in_transition cell = + match Atomic.get (cell : _ Cell.t Atomic.t) with + | Slot _ -> assert false + | In_transition -> in_transition cell + | Finished -> consumer_resume_cell t ~success ~in_transition (Q.next_resume t.producers) + | Item req as old -> + if Atomic.compare_and_set cell old Finished then success req + else consumer_resume_cell t ~success ~in_transition cell + +let take_suspend t loc = + Suspend.enter_unchecked @@ fun ctx enqueue -> + let Short cell | Long (_, cell) = loc in + let kc v = enqueue (Ok v); true in + add_to_cell t.producers (Slot kc) cell; + match loc with + | Short _ -> () + | Long loc -> + match Fiber_context.get_error ctx with + | Some ex -> + if cancel t loc then enqueue (Error ex); + (* else being resumed *) + | None -> + Fiber_context.set_cancel_fn ctx (fun ex -> + if cancel t loc then enqueue (Error ex) + (* else being resumed *) + ) + +let take (t : _ t) = + let old = Atomic.fetch_and_add t.balance (-1) in + if old > 0 then ( + let cell = Q.next_resume t.producers in + consumer_resume_cell t cell + ~success:(fun item -> item.kp (Ok true); item.v) + ~in_transition:(fun cell -> take_suspend t (Short cell)) + ) else ( + take_suspend t (Long (Q.next_suspend t.consumers)) + ) + +let reject = Slot (fun _ -> false) + +let take_nonblocking (t : _ t) = + if decr_balance_if_positive t then ( + let rec aux cell = + consumer_resume_cell t cell + ~success:(fun item -> + item.kp (Ok true); (* Always accept the item *) + Some item.v + ) + ~in_transition:(fun cell -> + (* Our producer is still in the process of writing its [Item], but + we're non-blocking and can't wait. We're always acting as the + resumer, so we can't cancel the cell. Instead, we provide a + consumer callback that always rejects. + todo: could spin for a bit here first - the Item will probably arrive soon, + and that would avoid making the producer start again. *) + Domain.cpu_relax (); (* Brief wait to encourage producer to finish *) + if Atomic.compare_and_set cell In_transition reject then None + else aux cell + ) + in aux (Q.next_resume t.producers) + ) else None (* No waiting producers for us *) + +(* Creation and status. *) + +let create () = + { + consumers = Q.make (); + producers = Q.make (); + balance = Atomic.make 0; + } + +let balance t = Atomic.get t.balance diff --git a/lib_eio/sync.mli b/lib_eio/sync.mli new file mode 100644 index 000000000..bb304b508 --- /dev/null +++ b/lib_eio/sync.mli @@ -0,0 +1,52 @@ +(* A lock-free synchronous channel with cancellation, using Cells. + + Producers and consumers are paired off and then the producer transfers its + value to the consumer. This is effectively a bounded queue with a capacity + of zero. + + Both producers and consumers can cancel while waiting. *) + +type 'a t +(** A lock-free synchronous channel. *) + +val create : unit -> 'a t +(** [create ()] is a fresh channel with a balance of 0. *) + +val put : 'a t -> 'a -> unit +(** [put t x] gives [x] to a waiting consumer. + + If no consumer is available, it waits until one comes along and accepts [x]. + + Note: Producers are mostly handled fairly, in the order in which they arrive, + but consumers can cancel or reject values so this isn't guaranteed. *) + +val take : 'a t -> 'a +(** [take t] waits until a producer is available with an item and then returns it. + + Note: Consumers are mostly handled fairly, in the order in which they arrive, + but producers can cancel so this isn't guaranteed if [t] is shared between + domains. *) + +val take_nonblocking : 'a t -> 'a option +(** [take_nonblocking t] is like {!take}, but returns [None] if no producer is immediately available. + + Note: When [t] is shared between domains, it is possible that a producer may be assigned but still be + in the process of writing its value to [t]. + In this case, [take_nonblocking] will cancel and retry with a new producer, + causing the old producer to lose its place in the queue and have to rejoin at the end. + Since the producer reached the head of the queue while it was still joining, + the queue is presumably very short in this case anyway. *) + +val balance : 'a t -> int +(** [balance t] is the number of waiting producers minus the number of waiting consumers. + + If the balance is non-negative then it is the number of waiting producers. + If non-positive, it is the number of waiting consumers. + There cannot be waiting producers and waiting consumers at the same time. + + If [t] is shared between domains then the value may already be out-of-date + by the time this function returns, so this is mostly useful for debugging + or reporting metrics. *) + +val dump : 'a t Fmt.t +(** [dump] formats the internal state of a channel, for testing and debugging. *) diff --git a/lib_eio/tests/dscheck/dune b/lib_eio/tests/dscheck/dune index ab707d76a..671607c3d 100644 --- a/lib_eio/tests/dscheck/dune +++ b/lib_eio/tests/dscheck/dune @@ -1,16 +1,22 @@ ; We copy cells.ml here so we can build it using TracedAtomic instead of the default one. (copy_files# (files ../../core/cells.ml)) (copy_files# (files ../../sem_state.ml)) +(copy_files# (files ../../sync.ml)) (executables - (names test_cells test_semaphore) - (libraries dscheck optint fmt)) + (names test_cells test_semaphore test_sync) + (libraries dscheck optint fmt eio)) (rule (alias dscheck) (package eio) (action (run %{exe:test_cells.exe}))) +(rule + (alias dscheck) + (package eio) + (action (run %{exe:test_sync.exe}))) + (rule (alias dscheck) (package eio) diff --git a/lib_eio/tests/dscheck/fake_sched.ml b/lib_eio/tests/dscheck/fake_sched.ml new file mode 100644 index 000000000..d240df9c2 --- /dev/null +++ b/lib_eio/tests/dscheck/fake_sched.ml @@ -0,0 +1,22 @@ +let cancel ctx = Eio.Cancel.cancel ctx (Failure "test cancellation") + +let run fn = + let module Fiber_context = Eio__core.Private.Fiber_context in + let continue_result k = function + | Ok x -> Effect.Deep.continue k x + | Error x -> Effect.Deep.discontinue k x + in + let ctx = ref None in + Effect.Deep.try_with fn () + { effc = fun (type a) (e : a Effect.t) : ((a, 'b) Effect.Deep.continuation -> 'b) option -> + match e with + | Eio.Private.Effects.Suspend fn -> + Some (fun cont -> + assert (!ctx = None); + let c = Fiber_context.make_root () in + fn c (continue_result cont); + ctx := Some (Fiber_context.cancellation_context c) + ) + | _ -> None + }; + !ctx diff --git a/lib_eio/tests/dscheck/fake_sched.mli b/lib_eio/tests/dscheck/fake_sched.mli new file mode 100644 index 000000000..aebdd3da8 --- /dev/null +++ b/lib_eio/tests/dscheck/fake_sched.mli @@ -0,0 +1,8 @@ +val run : (unit -> unit) -> Eio.Cancel.t option +(** [run fn] runs [fn ()] in a new fiber and returns its context so it can be cancelled. + + [fn] may suspend at most once. + If it doesn't suspend then [run] returns [None] after it finishes. *) + +val cancel : Eio.Cancel.t -> unit +(** [cancel ctx] cancels the context with a suitable dummy exception. *) diff --git a/lib_eio/tests/dscheck/test_sync.ml b/lib_eio/tests/dscheck/test_sync.ml new file mode 100644 index 000000000..c42efca8c --- /dev/null +++ b/lib_eio/tests/dscheck/test_sync.ml @@ -0,0 +1,94 @@ +let debug = false + +module T = Sync + +(* Create a synchronous channel. [prod] producers write values to it and [cons] consumers take values. + Both producers and consumers try to cancel if they can. + [take_nonblocking] additional consumers also perform a single non-blocking take. + At the end, we check that: + - We received the expected values. + - No processes are still queued up (since everything tries to cancel before finishing). + *) +let test ~prod ~cons ~take_nonblocking () = + let messages = ref [] in + let log fmt = (fmt ^^ "@.") |> Format.kasprintf @@ fun msg -> messages := msg :: !messages in + if debug then log "== start =="; + let t = T.create () in + let finished_producers = ref 0 in + let expected_total = ref 0 in + let received = ref 0 in + let cancelled_consumers = ref 0 in + let cancelled_producers = ref 0 in + let run_consumer l = + Fake_sched.run + (fun () -> + match T.take t with + | v -> + if debug then log "c%d: Recv %d" l v; + received := !received + v + | exception Eio__core.Cancel.Cancelled _ -> + if debug then log "c%d: Cancelled" l; + incr cancelled_consumers + ) + |> Option.iter (fun ctx -> + if debug then log "c%d: Suspended" l; + Fake_sched.cancel ctx; + ) + in + let run_producer v = + Fake_sched.run + (fun () -> + match T.put t v with + | () -> + if debug then log "p%d: Sent" v; + expected_total := !expected_total + v; + incr finished_producers + | exception Eio__core.Cancel.Cancelled _ -> + if debug then log "p%d: Cancelled" v; + incr finished_producers; + incr cancelled_producers + ) + |> Option.iter (fun ctx -> + if debug then log "p%d: Suspended sending" v; + Fake_sched.cancel ctx + ) + in + for i = 1 to prod do + Atomic.spawn (fun () -> run_producer i) + done; + for i = 1 to cons do + Atomic.spawn (fun () -> run_consumer i) + done; + for i = 1 to take_nonblocking do + Atomic.spawn (fun () -> + match T.take_nonblocking t with + | None -> + if debug then log "nb%d: found nothing" i; + incr cancelled_consumers; + | Some v -> + if debug then log "nb%d: took %d" i v; + received := !received + v + ) + done; + Atomic.final (fun () -> + if debug then ( + List.iter print_string (List.rev !messages); + Fmt.pr "%a@." T.dump t; + Fmt.pr "Received total = %d/%d (%d/%d cancelled consumers)@." + !received !expected_total + !cancelled_consumers (cons + take_nonblocking); + Fmt.pr "Finished producers = %d/%d (incl %d cancelled)@." + !finished_producers prod + !cancelled_producers; + ); + assert (!finished_producers = prod); + (* Everyone finishes by trying to cancel (if they didn't succeed immediately), + so there shouldn't be any balance at the end. *) + assert (T.balance t = 0); + assert (!received = !expected_total); + ) + +let () = + Atomic.trace (test ~prod:1 ~cons:1 ~take_nonblocking:1); + Atomic.trace (test ~prod:2 ~cons:1 ~take_nonblocking:0); + Atomic.trace (test ~prod:1 ~cons:2 ~take_nonblocking:0); diff --git a/lib_eio/tests/dune b/lib_eio/tests/dune index 089661d57..48fdb1987 100644 --- a/lib_eio/tests/dune +++ b/lib_eio/tests/dune @@ -1,3 +1,6 @@ (mdx (package eio) - (deps (package eio))) + (deps + (package eio) + (file ./dscheck/fake_sched.ml) + (file ./dscheck/fake_sched.mli))) diff --git a/lib_eio/tests/sync.md b/lib_eio/tests/sync.md new file mode 100644 index 000000000..63ebb1749 --- /dev/null +++ b/lib_eio/tests/sync.md @@ -0,0 +1,334 @@ +```ocaml +# #require "eio";; +# #mod_use "dscheck/fake_sched.ml";; +module Fake_sched : + sig + val cancel : Eio.Cancel.t -> unit + val run : (unit -> unit) -> Eio.Cancel.t option + end +``` +```ocaml +module T = Eio__Sync +module Fiber_context = Eio.Private.Fiber_context + +let show t = Fmt.pr "%a@." T.dump t + +let put t v = + Fake_sched.run + (fun () -> + match T.put t v with + | () -> Fmt.pr "Sent %s@." v + | exception (Eio.Cancel.Cancelled _) -> Fmt.pr "Send of %s was cancelled@." v + ) + |> Option.map (fun ctx -> + Fmt.pr "Waiting for a consumer for %s@." v; + ctx + ) + +let take t label = + Fake_sched.run + (fun () -> + match T.take t with + | v -> Fmt.pr "%s: Took %s@." label v + | exception (Eio.Cancel.Cancelled _) -> Fmt.pr "%s: Take cancelled@." label + ) + |> Option.map (fun ctx -> + Fmt.pr "%s: Waiting for producer@." label; + ctx + ) +``` + +Initially there are no consumers or producers: + +```ocaml +# let t : string T.t = T.create ();; +val t : string T.t = +# show t;; +Sync (balance=0) + Consumers: + Segment 0 (prev=None, pointers=2, cancelled=0): + In_transition (suspend) (resume) + In_transition + In_transition + In_transition + End + Producers: + Segment 0 (prev=None, pointers=2, cancelled=0): + In_transition (suspend) (resume) + In_transition + In_transition + In_transition + End +- : unit = () +``` + +Adding one consumer makes the balance go negative: + +```ocaml +# take t "cons1";; +cons1: Waiting for producer +- : Eio.Cancel.t option = Some + +# show t;; +Sync (balance=-1) + Consumers: + Segment 0 (prev=None, pointers=2, cancelled=0): + Slot (resume) + In_transition (suspend) + In_transition + In_transition + End + Producers: + Segment 0 (prev=None, pointers=2, cancelled=0): + In_transition (suspend) (resume) + In_transition + In_transition + In_transition + End +- : unit = () +``` + +Sending a value wakes it: + +```ocaml +# put t "A";; +cons1: Took A +Sent A +- : Eio.Cancel.t option = None + +# show t;; +Sync (balance=0) + Consumers: + Segment 0 (prev=None, pointers=2, cancelled=0): + Finished + In_transition (suspend) (resume) + In_transition + In_transition + End + Producers: + Segment 0 (prev=None, pointers=2, cancelled=0): + In_transition (suspend) (resume) + In_transition + In_transition + In_transition + End +- : unit = () +``` + +Trying to send a second value waits on the producers queue, setting the balance to 1: + +```ocaml +# put t "B";; +Waiting for a consumer for B +- : Eio.Cancel.t option = Some + +# show t;; +Sync (balance=1) + Consumers: + Segment 0 (prev=None, pointers=2, cancelled=0): + Finished + In_transition (suspend) (resume) + In_transition + In_transition + End + Producers: + Segment 0 (prev=None, pointers=2, cancelled=0): + Item (resume) + In_transition (suspend) + In_transition + In_transition + End +- : unit = () +``` + +Sending a third value must also wait: +```ocaml +# put t "C";; +Waiting for a consumer for C +- : Eio.Cancel.t option = Some + +# show t;; +Sync (balance=2) + Consumers: + Segment 0 (prev=None, pointers=2, cancelled=0): + Finished + In_transition (suspend) (resume) + In_transition + In_transition + End + Producers: + Segment 0 (prev=None, pointers=2, cancelled=0): + Item (resume) + Item + In_transition (suspend) + In_transition + End +- : unit = () +``` + +The next consumer reads the first value and wakes the first producer: +```ocaml +# take t "cons2";; +Sent B +cons2: Took B +- : Eio.Cancel.t option = None + +# show t;; +Sync (balance=1) + Consumers: + Segment 0 (prev=None, pointers=2, cancelled=0): + Finished + In_transition (suspend) (resume) + In_transition + In_transition + End + Producers: + Segment 0 (prev=None, pointers=2, cancelled=0): + Finished + Item (resume) + In_transition (suspend) + In_transition + End +- : unit = () +``` + +Finally, we collect the last value: +```ocaml +# take t "cons3";; +Sent C +cons3: Took C +- : Eio.Cancel.t option = None +``` + +## Cancellation + +Cancelling a consumer restores the balance: +```ocaml +# let t : string T.t = T.create ();; +val t : string T.t = +# let request = take t "cons1" |> Option.get;; +cons1: Waiting for producer +val request : Eio.Cancel.t = + +# show t;; +Sync (balance=-1) + Consumers: + Segment 0 (prev=None, pointers=2, cancelled=0): + Slot (resume) + In_transition (suspend) + In_transition + In_transition + End + Producers: + Segment 0 (prev=None, pointers=2, cancelled=0): + In_transition (suspend) (resume) + In_transition + In_transition + In_transition + End +- : unit = () +``` + +```ocaml +# Fake_sched.cancel request;; +cons1: Take cancelled +- : unit = () + +# show t;; +Sync (balance=0) + Consumers: + Segment 0 (prev=None, pointers=2, cancelled=1): + Finished (resume) + In_transition (suspend) + In_transition + In_transition + End + Producers: + Segment 0 (prev=None, pointers=2, cancelled=0): + In_transition (suspend) (resume) + In_transition + In_transition + In_transition + End +- : unit = () +``` + +Cancelling a producer restores the balance count: + +```ocaml +# let t : string T.t = T.create ();; +val t : string T.t = +# let a = put t "A" |> Option.get;; +Waiting for a consumer for A +val a : Eio.Cancel.t = +# put t "B" |> Option.get;; +Waiting for a consumer for B +- : Eio.Cancel.t = + +# show t;; +Sync (balance=2) + Consumers: + Segment 0 (prev=None, pointers=2, cancelled=0): + In_transition (suspend) (resume) + In_transition + In_transition + In_transition + End + Producers: + Segment 0 (prev=None, pointers=2, cancelled=0): + Item (resume) + Item + In_transition (suspend) + In_transition + End +- : unit = () + +# Fake_sched.cancel a;; +Send of A was cancelled +- : unit = () + +# show t;; +Sync (balance=1) + Consumers: + Segment 0 (prev=None, pointers=2, cancelled=0): + In_transition (suspend) (resume) + In_transition + In_transition + In_transition + End + Producers: + Segment 0 (prev=None, pointers=2, cancelled=1): + Finished (resume) + Item + In_transition (suspend) + In_transition + End +- : unit = () +``` + +The next consumer sees the second value: + +```ocaml +# take t "cons4";; +Sent B +cons4: Took B +- : Eio.Cancel.t option = None + +# show t;; +Sync (balance=0) + Consumers: + Segment 0 (prev=None, pointers=2, cancelled=0): + In_transition (suspend) (resume) + In_transition + In_transition + In_transition + End + Producers: + Segment 0 (prev=None, pointers=2, cancelled=1): + Finished + Finished + In_transition (suspend) (resume) + In_transition + End +- : unit = () +```