diff --git a/lib_eio/core/eio__core.ml b/lib_eio/core/eio__core.ml index 2c8f20536..3418d944d 100644 --- a/lib_eio/core/eio__core.ml +++ b/lib_eio/core/eio__core.ml @@ -7,6 +7,7 @@ module Private = struct module Suspend = Suspend module Cells = Cells module Broadcast = Broadcast + module Single_waiter = Single_waiter module Trace = Trace module Fiber_context = Cancel.Fiber_context module Debug = Debug diff --git a/lib_eio/core/eio__core.mli b/lib_eio/core/eio__core.mli index 65e2f41e4..734d9f1c7 100644 --- a/lib_eio/core/eio__core.mli +++ b/lib_eio/core/eio__core.mli @@ -606,6 +606,7 @@ module Private : sig module Cells = Cells module Broadcast = Broadcast + module Single_waiter = Single_waiter (** Every fiber has an associated context. *) module Fiber_context : sig diff --git a/lib_eio_linux/low_level.ml b/lib_eio_linux/low_level.ml index 2db0f2fe6..6967fb80a 100644 --- a/lib_eio_linux/low_level.ml +++ b/lib_eio_linux/low_level.ml @@ -207,11 +207,34 @@ let write ?file_offset:off fd buf len = raise @@ Err.wrap (Uring.error_of_errno res) "write" "" ) -let alloc_fixed () = Effect.perform Sched.Alloc - -let alloc_fixed_or_wait () = Effect.perform Sched.Alloc_or_wait - -let free_fixed buf = Effect.perform (Sched.Free buf) +let alloc_fixed () = + let s = Sched.get () in + match s.mem with + | None -> None + | Some mem -> + match Uring.Region.alloc mem with + | buf -> Some buf + | exception Uring.Region.No_space -> None + +let alloc_fixed_or_wait () = + let s = Sched.get () in + match s.mem with + | None -> failwith "No fixed buffer available" + | Some mem -> + match Uring.Region.alloc mem with + | buf -> buf + | exception Uring.Region.No_space -> + let id = Eio.Private.Trace.mint_id () in + let trigger = Eio.Private.Single_waiter.create () in + Queue.push trigger s.mem_q; + (* todo: remove protect; but needs to remove from queue on cancel *) + Eio.Private.Single_waiter.await_protect trigger "alloc_fixed_or_wait" id + +let free_fixed buf = + let s = Sched.get () in + match Queue.take_opt s.mem_q with + | None -> Uring.Region.free buf + | Some k -> Eio.Private.Single_waiter.wake k (Ok buf) let splice src ~dst ~len = Fd.use_exn "splice-src" src @@ fun src -> diff --git a/lib_eio_linux/sched.ml b/lib_eio_linux/sched.ml index 4d5f1a901..7d9e4b27b 100644 --- a/lib_eio_linux/sched.ml +++ b/lib_eio_linux/sched.ml @@ -50,7 +50,7 @@ type t = { uring: io_job Uring.t; mem: Uring.Region.t option; io_q: (t -> unit) Queue.t; (* waiting for room on [uring] *) - mem_q : Uring.Region.chunk Suspended.t Queue.t; + mem_q : Uring.Region.chunk Eio.Private.Single_waiter.t Queue.t; (* The queue of runnable fibers ready to be resumed. Note: other domains can also add work items here. *) run_q : runnable Lf_queue.t; @@ -74,9 +74,9 @@ type t = { type _ Effect.t += | Enter : (t -> 'a Suspended.t -> unit) -> 'a Effect.t | Cancel : io_job Uring.job -> unit Effect.t - | Alloc : Uring.Region.chunk option Effect.t - | Alloc_or_wait : Uring.Region.chunk Effect.t - | Free : Uring.Region.chunk -> unit Effect.t + | Get : t Effect.t + +let get () = Effect.perform Get let wake_buffer = let b = Bytes.create 8 in @@ -339,21 +339,6 @@ and complete_rw_req st ({len; cur_off; action; _} as req) res = | _, Exactly len -> Suspended.continue action len | n, Upto _ -> Suspended.continue action n -let alloc_buf_or_wait st k = - match st.mem with - | None -> Suspended.discontinue k (Failure "No fixed buffer available") - | Some mem -> - match Uring.Region.alloc mem with - | buf -> Suspended.continue k buf - | exception Uring.Region.No_space -> - Queue.push k st.mem_q; - schedule st - -let free_buf st buf = - match Queue.take_opt st.mem_q with - | None -> Uring.Region.free buf - | Some k -> enqueue_thread st k buf - let rec enqueue_poll_add fd poll_mask st action = Trace.log "poll_add"; let retry = with_cancel_hook ~action st (fun () -> @@ -411,8 +396,9 @@ let run ~extra_effects st main arg = Fiber_context.destroy fiber; Printexc.raise_with_backtrace ex (Printexc.get_raw_backtrace ()) ); - effc = fun (type a) (e : a Effect.t) -> + effc = fun (type a) (e : a Effect.t) : ((a, _) continuation -> _) option -> match e with + | Get -> Some (fun k -> continue k st) | Enter fn -> Some (fun k -> match Fiber_context.get_error fiber with | Some e -> discontinue k e @@ -467,22 +453,6 @@ let run ~extra_effects st main arg = Eio_unix.Private.Thread_pool.submit st.thread_pool ~ctx:fiber ~enqueue fn; schedule st ) - | Alloc -> Some (fun k -> - match st.mem with - | None -> continue k None - | Some mem -> - match Uring.Region.alloc mem with - | buf -> continue k (Some buf) - | exception Uring.Region.No_space -> continue k None - ) - | Alloc_or_wait -> Some (fun k -> - let k = { Suspended.k; fiber } in - alloc_buf_or_wait st k - ) - | Free buf -> Some (fun k -> - free_buf st buf; - continue k () - ) | e -> extra_effects.effc e } in