Skip to content

Commit

Permalink
Merge pull request ocaml-multicore#752 from talex5/linux-get-sched
Browse files Browse the repository at this point in the history
eio_linux: refactor fixed buffer code
  • Loading branch information
talex5 authored Sep 5, 2024
2 parents d47b5e2 + cc2cd3d commit 0f6b65d
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 41 deletions.
1 change: 1 addition & 0 deletions lib_eio/core/eio__core.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module Private = struct
module Suspend = Suspend
module Cells = Cells
module Broadcast = Broadcast
module Single_waiter = Single_waiter
module Trace = Trace
module Fiber_context = Cancel.Fiber_context
module Debug = Debug
Expand Down
1 change: 1 addition & 0 deletions lib_eio/core/eio__core.mli
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,7 @@ module Private : sig

module Cells = Cells
module Broadcast = Broadcast
module Single_waiter = Single_waiter

(** Every fiber has an associated context. *)
module Fiber_context : sig
Expand Down
33 changes: 28 additions & 5 deletions lib_eio_linux/low_level.ml
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,34 @@ let write ?file_offset:off fd buf len =
raise @@ Err.wrap (Uring.error_of_errno res) "write" ""
)

let alloc_fixed () = Effect.perform Sched.Alloc

let alloc_fixed_or_wait () = Effect.perform Sched.Alloc_or_wait

let free_fixed buf = Effect.perform (Sched.Free buf)
let alloc_fixed () =
let s = Sched.get () in
match s.mem with
| None -> None
| Some mem ->
match Uring.Region.alloc mem with
| buf -> Some buf
| exception Uring.Region.No_space -> None

let alloc_fixed_or_wait () =
let s = Sched.get () in
match s.mem with
| None -> failwith "No fixed buffer available"
| Some mem ->
match Uring.Region.alloc mem with
| buf -> buf
| exception Uring.Region.No_space ->
let id = Eio.Private.Trace.mint_id () in
let trigger = Eio.Private.Single_waiter.create () in
Queue.push trigger s.mem_q;
(* todo: remove protect; but needs to remove from queue on cancel *)
Eio.Private.Single_waiter.await_protect trigger "alloc_fixed_or_wait" id

let free_fixed buf =
let s = Sched.get () in
match Queue.take_opt s.mem_q with
| None -> Uring.Region.free buf
| Some k -> Eio.Private.Single_waiter.wake k (Ok buf)

let splice src ~dst ~len =
Fd.use_exn "splice-src" src @@ fun src ->
Expand Down
42 changes: 6 additions & 36 deletions lib_eio_linux/sched.ml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type t = {
uring: io_job Uring.t;
mem: Uring.Region.t option;
io_q: (t -> unit) Queue.t; (* waiting for room on [uring] *)
mem_q : Uring.Region.chunk Suspended.t Queue.t;
mem_q : Uring.Region.chunk Eio.Private.Single_waiter.t Queue.t;

(* The queue of runnable fibers ready to be resumed. Note: other domains can also add work items here. *)
run_q : runnable Lf_queue.t;
Expand All @@ -74,9 +74,9 @@ type t = {
type _ Effect.t +=
| Enter : (t -> 'a Suspended.t -> unit) -> 'a Effect.t
| Cancel : io_job Uring.job -> unit Effect.t
| Alloc : Uring.Region.chunk option Effect.t
| Alloc_or_wait : Uring.Region.chunk Effect.t
| Free : Uring.Region.chunk -> unit Effect.t
| Get : t Effect.t

let get () = Effect.perform Get

let wake_buffer =
let b = Bytes.create 8 in
Expand Down Expand Up @@ -339,21 +339,6 @@ and complete_rw_req st ({len; cur_off; action; _} as req) res =
| _, Exactly len -> Suspended.continue action len
| n, Upto _ -> Suspended.continue action n

let alloc_buf_or_wait st k =
match st.mem with
| None -> Suspended.discontinue k (Failure "No fixed buffer available")
| Some mem ->
match Uring.Region.alloc mem with
| buf -> Suspended.continue k buf
| exception Uring.Region.No_space ->
Queue.push k st.mem_q;
schedule st

let free_buf st buf =
match Queue.take_opt st.mem_q with
| None -> Uring.Region.free buf
| Some k -> enqueue_thread st k buf

let rec enqueue_poll_add fd poll_mask st action =
Trace.log "poll_add";
let retry = with_cancel_hook ~action st (fun () ->
Expand Down Expand Up @@ -411,8 +396,9 @@ let run ~extra_effects st main arg =
Fiber_context.destroy fiber;
Printexc.raise_with_backtrace ex (Printexc.get_raw_backtrace ())
);
effc = fun (type a) (e : a Effect.t) ->
effc = fun (type a) (e : a Effect.t) : ((a, _) continuation -> _) option ->
match e with
| Get -> Some (fun k -> continue k st)
| Enter fn -> Some (fun k ->
match Fiber_context.get_error fiber with
| Some e -> discontinue k e
Expand Down Expand Up @@ -467,22 +453,6 @@ let run ~extra_effects st main arg =
Eio_unix.Private.Thread_pool.submit st.thread_pool ~ctx:fiber ~enqueue fn;
schedule st
)
| Alloc -> Some (fun k ->
match st.mem with
| None -> continue k None
| Some mem ->
match Uring.Region.alloc mem with
| buf -> continue k (Some buf)
| exception Uring.Region.No_space -> continue k None
)
| Alloc_or_wait -> Some (fun k ->
let k = { Suspended.k; fiber } in
alloc_buf_or_wait st k
)
| Free buf -> Some (fun k ->
free_buf st buf;
continue k ()
)
| e -> extra_effects.effc e
}
in
Expand Down

0 comments on commit 0f6b65d

Please sign in to comment.