From 0f48f27e05a2e1468391776b7987155ea6b8a4f7 Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Wed, 4 Sep 2024 14:18:40 +0100 Subject: [PATCH] eio_linux: allow alloc_fixed_or_wait to be cancelled --- lib_eio_linux/low_level.ml | 11 ++++++---- lib_eio_linux/sched.ml | 6 +++--- lib_eio_linux/tests/test.ml | 42 ++++++++++++++++++++++++++++--------- 3 files changed, 42 insertions(+), 17 deletions(-) diff --git a/lib_eio_linux/low_level.ml b/lib_eio_linux/low_level.ml index 6967fb80a..b51e97238 100644 --- a/lib_eio_linux/low_level.ml +++ b/lib_eio_linux/low_level.ml @@ -226,13 +226,16 @@ let alloc_fixed_or_wait () = | exception Uring.Region.No_space -> let id = Eio.Private.Trace.mint_id () in let trigger = Eio.Private.Single_waiter.create () in - Queue.push trigger s.mem_q; - (* todo: remove protect; but needs to remove from queue on cancel *) - Eio.Private.Single_waiter.await_protect trigger "alloc_fixed_or_wait" id + let node = Lwt_dllist.add_r trigger s.mem_q in + try + Eio.Private.Single_waiter.await trigger "alloc_fixed_or_wait" id + with ex -> + Lwt_dllist.remove node; + raise ex let free_fixed buf = let s = Sched.get () in - match Queue.take_opt s.mem_q with + match Lwt_dllist.take_opt_l s.mem_q with | None -> Uring.Region.free buf | Some k -> Eio.Private.Single_waiter.wake k (Ok buf) diff --git a/lib_eio_linux/sched.ml b/lib_eio_linux/sched.ml index 7d9e4b27b..80fc63f4a 100644 --- a/lib_eio_linux/sched.ml +++ b/lib_eio_linux/sched.ml @@ -50,7 +50,7 @@ type t = { uring: io_job Uring.t; mem: Uring.Region.t option; io_q: (t -> unit) Queue.t; (* waiting for room on [uring] *) - mem_q : Uring.Region.chunk Eio.Private.Single_waiter.t Queue.t; + mem_q : Uring.Region.chunk Eio.Private.Single_waiter.t Lwt_dllist.t; (* The queue of runnable fibers ready to be resumed. Note: other domains can also add work items here. *) run_q : runnable Lf_queue.t; @@ -247,7 +247,7 @@ let rec schedule ({run_q; sleep_q; mem_q; uring; _} as st) : [`Exit_scheduler] = ) else if timeout = None && Uring.active_ops uring = 0 then ( (* Nothing further can happen at this point. If there are no events in progress but also still no memory available, something has gone wrong! *) - assert (Queue.length mem_q = 0); + assert (Lwt_dllist.length mem_q = 0); Lf_queue.close st.run_q; (* Just to catch bugs if something tries to enqueue later *) `Exit_scheduler ) else ( @@ -536,7 +536,7 @@ let with_sched ?(fallback=no_fallback) config fn = Lf_queue.push run_q IO; let sleep_q = Zzz.create () in let io_q = Queue.create () in - let mem_q = Queue.create () in + let mem_q = Lwt_dllist.create () in with_eventfd @@ fun eventfd -> let thread_pool = Eio_unix.Private.Thread_pool.create ~sleep_q in fn { mem; uring; run_q; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q; thread_pool } diff --git a/lib_eio_linux/tests/test.ml b/lib_eio_linux/tests/test.ml index 7c7e31aad..5ebadce91 100644 --- a/lib_eio_linux/tests/test.ml +++ b/lib_eio_linux/tests/test.ml @@ -211,19 +211,41 @@ let test_signal_race () = (fun () -> Eio.Condition.await_no_mutex cond) (fun () -> ignore (Unix.setitimer ITIMER_REAL { it_interval = 0.; it_value = 0.001 } : Unix.interval_timer_status)) +let test_alloc_fixed_or_wait () = + Eio_linux.run ~n_blocks:1 @@ fun _env -> + let block = Eio_linux.Low_level.alloc_fixed_or_wait () in + (* We have to wait for the block, but get cancelled while waiting. *) + begin + try + Fiber.both + (fun () -> ignore (Eio_linux.Low_level.alloc_fixed_or_wait ())) + (fun () -> raise Exit); + with Exit -> () + end; + (* We have to wait for the block, and get it when the old one is freed. *) + Fiber.both + (fun () -> + let x = Eio_linux.Low_level.alloc_fixed_or_wait () in + Eio_linux.Low_level.free_fixed x + ) + (fun () -> + Eio_linux.Low_level.free_fixed block + ) + let () = let open Alcotest in run "eio_linux" [ "io", [ - test_case "copy" `Quick test_copy; - test_case "direct_copy" `Quick test_direct_copy; - test_case "poll_add" `Quick test_poll_add; - test_case "poll_add_busy" `Quick test_poll_add_busy; - test_case "iovec" `Quick test_iovec; - test_case "no_sqe" `Quick test_no_sqe; - test_case "read_exact" `Quick test_read_exact; - test_case "expose_backend" `Quick test_expose_backend; - test_case "statx" `Quick test_statx; - test_case "signal_race" `Quick test_signal_race; + test_case "copy" `Quick test_copy; + test_case "direct_copy" `Quick test_direct_copy; + test_case "poll_add" `Quick test_poll_add; + test_case "poll_add_busy" `Quick test_poll_add_busy; + test_case "iovec" `Quick test_iovec; + test_case "no_sqe" `Quick test_no_sqe; + test_case "read_exact" `Quick test_read_exact; + test_case "expose_backend" `Quick test_expose_backend; + test_case "statx" `Quick test_statx; + test_case "signal_race" `Quick test_signal_race; + test_case "alloc-fixed-or-wait" `Quick test_alloc_fixed_or_wait; ]; ]