Skip to content

Commit

Permalink
Improve stream benchmark
Browse files Browse the repository at this point in the history
Previously the multi-domain test only had one producer and one consumer.
It now tries with various combinations up to 4 producers with 4
consumers. Each producer also submits items from many fibers at the same
time. This is more like a typical use of a stream, where IO domains are
handling many requests by submitting CPU-intensive jobs to a pool of
batch processing domains.

It now also includes some spinner fibers, to avoid measuring domain
resume time. That previously dominated the results (though it happens
less anyway now that the senders run multiple fibers).
  • Loading branch information
talex5 committed Jan 23, 2023
1 parent 494d1d1 commit fa9987a
Showing 1 changed file with 69 additions and 26 deletions.
95 changes: 69 additions & 26 deletions bench/bench_stream.ml
Original file line number Diff line number Diff line change
@@ -1,44 +1,87 @@
(* Some sender domains each run a bunch of fibers submitting items to a stream.
Some receiver domains each run a single fiber accepting items from the stream.
It also tests the single-domain case. *)

open Eio.Std

let run_sender ~n_iters stream =
for i = 1 to n_iters do
Eio.Stream.add stream i
let n_sender_fibers = 10 (* Concurrent sending fibers per sending domain *)

(* Simulate other work in the domain, and also prevent it from going to sleep.
Otherwise, we're just measuring how long it takes the OS to wake a sleeping thread. *)
let rec spin () =
Fiber.yield ();
spin ()

(* [n_fibers] fibers each send values [1..n_iters] to [stream]. *)
let run_sender ~n_fibers ~n_iters stream =
Switch.run @@ fun sw ->
Fiber.fork_daemon ~sw spin;
for _ = 1 to n_fibers do
Fiber.fork ~sw (fun () ->
for i = 1 to n_iters do
Eio.Stream.add stream i
done
)
done

let run_bench ~domain_mgr ~clock ~use_domains ~n_iters ~capacity =
(* Read [n_iters] values from [stream] and add them to [total] (at the end). *)
let run_recv ~n_iters ~total stream =
Switch.run @@ fun sw ->
Fiber.fork_daemon ~sw spin;
let rec aux acc = function
| 0 -> acc
| i -> aux (acc + Eio.Stream.take stream) (i - 1) in
ignore (Atomic.fetch_and_add total (aux 0 n_iters) : int)

(* Run the tests using [n_sender_domains] additional domains to send (0 to send
and receive in a single domain). When [n_sender_domains > 0], we also use
that many receiver domains. *)
let run_bench ~domain_mgr ~clock ~n_send_domains ~n_iters ~capacity =
let stream = Eio.Stream.create capacity in
let total = Atomic.make 0 in (* Total received (sanity check at the end) *)
let n_senders = max 1 n_send_domains in
let n_iters_total = (* Total number of items to be sent through [stream] *)
n_iters * n_sender_fibers * n_senders
in
Gc.full_major ();
let _minor0, prom0, _major0 = Gc.counters () in
let t0 = Eio.Time.now clock in
Fiber.both
(fun () ->
if use_domains then (
Eio.Domain_manager.run domain_mgr @@ fun () ->
run_sender ~n_iters stream
) else (
run_sender ~n_iters stream
)
)
(fun () ->
for i = 1 to n_iters do
let j = Eio.Stream.take stream in
assert (i = j)
done
Switch.run (fun sw ->
let run_sender () = run_sender ~n_fibers:n_sender_fibers ~n_iters stream in
if n_send_domains > 0 then (
for _ = 1 to n_send_domains do
Fiber.fork ~sw (fun () -> Eio.Domain_manager.run domain_mgr run_sender)
done
) else (
Fiber.fork ~sw run_sender
);
let run_recv () = run_recv ~n_iters:(n_iters * n_sender_fibers) ~total stream in
for _ = 1 to n_senders - 1 do
Fiber.fork ~sw @@ fun () ->
Eio.Domain_manager.run domain_mgr run_recv
done;
Fiber.fork ~sw run_recv
);
let t1 = Eio.Time.now clock in
let total = Atomic.get total in
let expected_total = n_senders * n_sender_fibers * (n_iters * (1 + n_iters) / 2) in
assert (total = expected_total);
let time_total = t1 -. t0 in
let time_per_iter = time_total /. float n_iters in
let time_per_iter = time_total /. float n_iters_total in
let _minor1, prom1, _major1 = Gc.counters () in
let prom = prom1 -. prom0 in
Printf.printf "%11b, %8d, %8d, %7.2f, %13.4f\n%!" use_domains n_iters capacity (1e9 *. time_per_iter) (prom /. float n_iters)
Printf.printf "%14d, %8d, %8d, %7.2f, %13.4f\n%!" n_send_domains n_iters_total capacity (1e9 *. time_per_iter) (prom /. float n_iters_total)

let main ~domain_mgr ~clock =
Printf.printf "use_domains, n_iters, capacity, ns/iter, promoted/iter\n%!";
[false, 10_000_000;
true, 1_000_000]
|> List.iter (fun (use_domains, n_iters) ->
[0; 1; 10; 100; 1000] |> List.iter (fun capacity ->
run_bench ~domain_mgr ~clock ~use_domains ~n_iters ~capacity
Printf.printf "n_send_domains, n_iters, capacity, ns/iter, promoted/iter\n%!";
[0, 100_000;
1, 100_000;
2, 100_000;
4, 100_000;
]
|> List.iter (fun (n_send_domains, n_iters) ->
[0; 1; 100] |> List.iter (fun capacity ->
run_bench ~domain_mgr ~clock ~n_send_domains ~n_iters ~capacity
)
)

Expand Down

0 comments on commit fa9987a

Please sign in to comment.