From 2601ce43efc977959c4e6474d0fcfa2ad55be937 Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Tue, 31 Jan 2023 09:57:36 +0000 Subject: [PATCH] eio_linux: call submit as needed We may fail to submit a job because the SQE queue is full. Previously we would wait until some existing request completed, but that might never happen. Instead, we just flush the SQE queue and retry. Fixes #409. --- dune-project | 2 +- eio_linux.opam | 2 +- examples/hello/main.ml | 15 +++++++++++---- lib_eio_linux/eio_linux.ml | 35 +++++++++++++++++++++-------------- lib_eio_linux/tests/test.ml | 16 ++++++++++++++++ 5 files changed, 50 insertions(+), 20 deletions(-) diff --git a/dune-project b/dune-project index 678cabfb4..a2efcaee2 100644 --- a/dune-project +++ b/dune-project @@ -39,7 +39,7 @@ (logs (>= 0.7.0)) (fmt (>= 0.8.9)) (cmdliner (and (>= 1.1.0) :with-test)) - (uring (>= 0.4)))) + (uring (>= 0.5)))) (package (name eio_luv) (synopsis "Eio implementation using luv (libuv)") diff --git a/eio_linux.opam b/eio_linux.opam index 184b12174..6cdc8612c 100644 --- a/eio_linux.opam +++ b/eio_linux.opam @@ -16,7 +16,7 @@ depends: [ "logs" {>= "0.7.0"} "fmt" {>= "0.8.9"} "cmdliner" {>= "1.1.0" & with-test} - "uring" {>= "0.4"} + "uring" {>= "0.5"} "odoc" {with-doc} ] build: [ diff --git a/examples/hello/main.ml b/examples/hello/main.ml index 3603f2118..592f9a2b8 100644 --- a/examples/hello/main.ml +++ b/examples/hello/main.ml @@ -1,6 +1,13 @@ -let main ~stdout = - Eio.Flow.copy_string "Hello, world!\n" stdout +open Eio.Std let () = - Eio_main.run @@ fun env -> - main ~stdout:(Eio.Stdenv.stdout env) + Eio_linux.run ~queue_depth:4 @@ fun _stdenv -> + Switch.run @@ fun sw -> + for _ = 1 to 10 do + Fiber.fork ~sw (fun () -> + let r, _w = Eio_unix.pipe sw in + ignore (Eio.Flow.single_read r (Cstruct.create 1) : int); + assert false + ) + done; + raise Exit diff --git a/lib_eio_linux/eio_linux.ml b/lib_eio_linux/eio_linux.ml index e951f5fab..07398d583 100644 --- a/lib_eio_linux/eio_linux.ml +++ b/lib_eio_linux/eio_linux.ml @@ -234,11 +234,18 @@ type _ Effect.t += Enter : (t -> 'a Suspended.t -> unit) -> 'a Effect.t type _ Effect.t += Cancel : io_job Uring.job -> unit Effect.t let enter fn = Effect.perform (Enter fn) +let rec enqueue_job t fn = + match fn () with + | Some _ as r -> r + | None -> + if Uring.submit t.uring > 0 then enqueue_job t fn + else None + (* Cancellations always come from the same domain, so no need to send wake events here. *) -let rec enqueue_cancel job st = +let rec enqueue_cancel job t = Ctf.label "cancel"; - match Uring.cancel st.uring job Cancel_job with - | None -> Queue.push (fun st -> enqueue_cancel job st) st.io_q + match enqueue_job t (fun () -> Uring.cancel t.uring job Cancel_job) with + | None -> Queue.push (fun t -> enqueue_cancel job t) t.io_q | Some _ -> () let cancel job = Effect.perform (Cancel job) @@ -261,15 +268,15 @@ let cancel job = Effect.perform (Cancel job) If the operation completes before Linux processes the cancellation, we get [ENOENT], which we ignore. *) -(* [with_cancel_hook ~action st fn] calls [fn] to create a job, +(* [with_cancel_hook ~action t fn] calls [fn] to create a job, then sets the fiber's cancel function to cancel it. If [action] is already cancelled, it schedules [action] to be discontinued. @return Whether to retry the operation later, once there is space. *) -let with_cancel_hook ~action st fn = +let with_cancel_hook ~action t fn = match Fiber_context.get_error action.Suspended.fiber with - | Some ex -> enqueue_failed_thread st action ex; false + | Some ex -> enqueue_failed_thread t action ex; false | None -> - match fn () with + match enqueue_job t fn with | None -> true | Some job -> Fiber_context.set_cancel_fn action.fiber (fun _ -> cancel job); @@ -364,11 +371,11 @@ let rec enqueue_poll_add_unix fd poll_mask st action cb = if retry then (* wait until an sqe is available *) Queue.push (fun st -> enqueue_poll_add_unix fd poll_mask st action cb) st.io_q -let rec enqueue_close st action fd = +let rec enqueue_close t action fd = Ctf.label "close"; - let subm = Uring.close st.uring fd (Job_no_cancel action) in + let subm = enqueue_job t (fun () -> Uring.close t.uring fd (Job_no_cancel action)) in if subm = None then (* wait until an sqe is available *) - Queue.push (fun st -> enqueue_close st action fd) st.io_q + Queue.push (fun t -> enqueue_close t action fd) t.io_q let enqueue_write st action (file_offset,fd,buf,len) = let file_offset = @@ -483,12 +490,12 @@ let rec enqueue_accept fd client_addr st action = Queue.push (fun st -> enqueue_accept fd client_addr st action) st.io_q ) -let rec enqueue_noop st action = +let rec enqueue_noop t action = Ctf.label "noop"; - let retry = (Uring.noop st.uring (Job_no_cancel action) = None) in - if retry then ( + let job = enqueue_job t (fun () -> Uring.noop t.uring (Job_no_cancel action)) in + if job = None then ( (* wait until an sqe is available *) - Queue.push (fun st -> enqueue_noop st action) st.io_q + Queue.push (fun t -> enqueue_noop t action) t.io_q ) let submit_pending_io st = diff --git a/lib_eio_linux/tests/test.ml b/lib_eio_linux/tests/test.ml index 8f7b265e8..273e1438b 100644 --- a/lib_eio_linux/tests/test.ml +++ b/lib_eio_linux/tests/test.ml @@ -110,6 +110,21 @@ let test_iovec () = ); Alcotest.(check string) "Transfer correct" "Got [foo] and [bar]" (Cstruct.to_string message) +(* We fill the SQE buffer and need to submit early. *) +let test_no_sqe () = + try + Eio_linux.run ~queue_depth:4 @@ fun _stdenv -> + Switch.run @@ fun sw -> + for _ = 1 to 8 do + Fiber.fork ~sw (fun () -> + let r, _w = Eio_unix.pipe sw in + ignore (Eio.Flow.single_read r (Cstruct.create 1) : int); + assert false + ) + done; + raise Exit + with Exit -> () + let () = let open Alcotest in run "eio_linux" [ @@ -119,5 +134,6 @@ let () = test_case "poll_add" `Quick test_poll_add; test_case "poll_add_busy" `Quick test_poll_add_busy; test_case "iovec" `Quick test_iovec; + test_case "no-sqe" `Quick test_no_sqe; ]; ]