Skip to content

Commit

Permalink
eio_linux: call submit as needed
Browse files Browse the repository at this point in the history
We may fail to submit a job because the SQE queue is full. Previously
we would wait until some existing request completed, but that might
never happen. Instead, we just flush the SQE queue and retry.

Fixes ocaml-multicore#409.
  • Loading branch information
talex5 committed Jan 31, 2023
1 parent 789c6d2 commit 2601ce4
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 20 deletions.
2 changes: 1 addition & 1 deletion dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
(logs (>= 0.7.0))
(fmt (>= 0.8.9))
(cmdliner (and (>= 1.1.0) :with-test))
(uring (>= 0.4))))
(uring (>= 0.5))))
(package
(name eio_luv)
(synopsis "Eio implementation using luv (libuv)")
Expand Down
2 changes: 1 addition & 1 deletion eio_linux.opam
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ depends: [
"logs" {>= "0.7.0"}
"fmt" {>= "0.8.9"}
"cmdliner" {>= "1.1.0" & with-test}
"uring" {>= "0.4"}
"uring" {>= "0.5"}
"odoc" {with-doc}
]
build: [
Expand Down
15 changes: 11 additions & 4 deletions examples/hello/main.ml
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
let main ~stdout =
Eio.Flow.copy_string "Hello, world!\n" stdout
open Eio.Std

let () =
Eio_main.run @@ fun env ->
main ~stdout:(Eio.Stdenv.stdout env)
Eio_linux.run ~queue_depth:4 @@ fun _stdenv ->
Switch.run @@ fun sw ->
for _ = 1 to 10 do
Fiber.fork ~sw (fun () ->
let r, _w = Eio_unix.pipe sw in
ignore (Eio.Flow.single_read r (Cstruct.create 1) : int);
assert false
)
done;
raise Exit
35 changes: 21 additions & 14 deletions lib_eio_linux/eio_linux.ml
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,18 @@ type _ Effect.t += Enter : (t -> 'a Suspended.t -> unit) -> 'a Effect.t
type _ Effect.t += Cancel : io_job Uring.job -> unit Effect.t
let enter fn = Effect.perform (Enter fn)

let rec enqueue_job t fn =
match fn () with
| Some _ as r -> r
| None ->
if Uring.submit t.uring > 0 then enqueue_job t fn
else None

(* Cancellations always come from the same domain, so no need to send wake events here. *)
let rec enqueue_cancel job st =
let rec enqueue_cancel job t =
Ctf.label "cancel";
match Uring.cancel st.uring job Cancel_job with
| None -> Queue.push (fun st -> enqueue_cancel job st) st.io_q
match enqueue_job t (fun () -> Uring.cancel t.uring job Cancel_job) with
| None -> Queue.push (fun t -> enqueue_cancel job t) t.io_q
| Some _ -> ()

let cancel job = Effect.perform (Cancel job)
Expand All @@ -261,15 +268,15 @@ let cancel job = Effect.perform (Cancel job)
If the operation completes before Linux processes the cancellation, we get [ENOENT], which we ignore. *)

(* [with_cancel_hook ~action st fn] calls [fn] to create a job,
(* [with_cancel_hook ~action t fn] calls [fn] to create a job,
then sets the fiber's cancel function to cancel it.
If [action] is already cancelled, it schedules [action] to be discontinued.
@return Whether to retry the operation later, once there is space. *)
let with_cancel_hook ~action st fn =
let with_cancel_hook ~action t fn =
match Fiber_context.get_error action.Suspended.fiber with
| Some ex -> enqueue_failed_thread st action ex; false
| Some ex -> enqueue_failed_thread t action ex; false
| None ->
match fn () with
match enqueue_job t fn with
| None -> true
| Some job ->
Fiber_context.set_cancel_fn action.fiber (fun _ -> cancel job);
Expand Down Expand Up @@ -364,11 +371,11 @@ let rec enqueue_poll_add_unix fd poll_mask st action cb =
if retry then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_poll_add_unix fd poll_mask st action cb) st.io_q

let rec enqueue_close st action fd =
let rec enqueue_close t action fd =
Ctf.label "close";
let subm = Uring.close st.uring fd (Job_no_cancel action) in
let subm = enqueue_job t (fun () -> Uring.close t.uring fd (Job_no_cancel action)) in
if subm = None then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_close st action fd) st.io_q
Queue.push (fun t -> enqueue_close t action fd) t.io_q

let enqueue_write st action (file_offset,fd,buf,len) =
let file_offset =
Expand Down Expand Up @@ -483,12 +490,12 @@ let rec enqueue_accept fd client_addr st action =
Queue.push (fun st -> enqueue_accept fd client_addr st action) st.io_q
)

let rec enqueue_noop st action =
let rec enqueue_noop t action =
Ctf.label "noop";
let retry = (Uring.noop st.uring (Job_no_cancel action) = None) in
if retry then (
let job = enqueue_job t (fun () -> Uring.noop t.uring (Job_no_cancel action)) in
if job = None then (
(* wait until an sqe is available *)
Queue.push (fun st -> enqueue_noop st action) st.io_q
Queue.push (fun t -> enqueue_noop t action) t.io_q
)

let submit_pending_io st =
Expand Down
16 changes: 16 additions & 0 deletions lib_eio_linux/tests/test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,21 @@ let test_iovec () =
);
Alcotest.(check string) "Transfer correct" "Got [foo] and [bar]" (Cstruct.to_string message)

(* We fill the SQE buffer and need to submit early. *)
let test_no_sqe () =
try
Eio_linux.run ~queue_depth:4 @@ fun _stdenv ->
Switch.run @@ fun sw ->
for _ = 1 to 8 do
Fiber.fork ~sw (fun () ->
let r, _w = Eio_unix.pipe sw in
ignore (Eio.Flow.single_read r (Cstruct.create 1) : int);
assert false
)
done;
raise Exit
with Exit -> ()

let () =
let open Alcotest in
run "eio_linux" [
Expand All @@ -119,5 +134,6 @@ let () =
test_case "poll_add" `Quick test_poll_add;
test_case "poll_add_busy" `Quick test_poll_add_busy;
test_case "iovec" `Quick test_iovec;
test_case "no-sqe" `Quick test_no_sqe;
];
]

0 comments on commit 2601ce4

Please sign in to comment.