From 68e934c6cfbabab75679242ddb81de9a047ed1a8 Mon Sep 17 00:00:00 2001 From: Rudi Grinberg Date: Sat, 6 Apr 2024 22:00:53 +0100 Subject: [PATCH] feature(fiber): add [Fiber.map_reduce_seq] (#14) Map reduce a sequence in parallel without intermediate data structures Signed-off-by: Rudi Grinberg --- CHANGES.md | 2 ++ fiber/src/core.ml | 15 ++++++++ fiber/src/fiber.mli | 19 ++++++---- fiber/test/map_reduce_tests.ml | 65 ++++++++++++++++++++++++++++++++++ 4 files changed, 95 insertions(+), 6 deletions(-) create mode 100644 fiber/test/map_reduce_tests.ml diff --git a/CHANGES.md b/CHANGES.md index aeda906..c376d54 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,7 @@ # Unreleased +- Add [Fiber.map_reduce_seq] (#14, @rgrinberg) + - Make [Fiber.both] concurrent (#32, @rgrinberg) - Add [Fiber.Lazy] (#36, #rleshchinskiy) diff --git a/fiber/src/core.ml b/fiber/src/core.ml index 472ce5b..c429000 100644 --- a/fiber/src/core.ml +++ b/fiber/src/core.ml @@ -350,6 +350,21 @@ let rec sequential_iter_seq (seq : _ Seq.t) ~f = sequential_iter_seq seq ~f ;; +let map_reduce_seq (seq : _ Seq.t) ~f ~empty ~commutative_combine k = + match seq () with + | Seq.Nil -> k empty + | Cons (x, xs) -> + let current = ref empty in + let running = ref 1 in + let f a = + f a (fun b -> + current := commutative_combine !current b; + decr running; + if !running = 0 then k !current else end_of_fiber) + in + nfork_seq running x xs f +;; + let parallel_iter_set (type a s) (module S : Set.S with type elt = a and type t = s) diff --git a/fiber/src/fiber.mli b/fiber/src/fiber.mli index 5020dce..c77e4b2 100644 --- a/fiber/src/fiber.mli +++ b/fiber/src/fiber.mli @@ -278,6 +278,13 @@ end val repeat_while : f:('a -> 'a option t) -> init:'a -> unit t +val map_reduce_seq + : 'a Seq.t + -> f:('a -> 'm t) + -> empty:'m + -> commutative_combine:('m -> 'm -> 'm) + -> 'm t + module Stream : sig (** Destructive streams that can be composed to pipelines. @@ -459,29 +466,29 @@ end module Lazy : sig (** An asynchronous computation which is executed once only when forced. *) type 'a t - + (** Create an already evaluated lazy computation. *) val of_value : 'a -> 'a t - + (** An already evaluated lazy computation of unit type (a more efficient shortcut for [of_value ()]. *) val unit : unit t - + (** Create a lazy computation from a thunk which will only be executed when forced. *) val create : (unit -> 'a fiber) -> 'a t - + (** Check if a lazy computation has successfully finished. Note that this does not force the computation and a [false] result does not guarantee that the computation hasn't finished. *) val is_value : 'a t -> bool - + (** Force the lazy computation and return its result or reraise its exceptions. *) val force : 'a t -> 'a fiber (** Concurrently force multiple lazy computation and wait until they all finish, reraising any exceptions. *) val force_all_unit : unit t list -> unit fiber -end +end module Expert : sig (** This module offers no safety protections. It is only needed for maximizing diff --git a/fiber/test/map_reduce_tests.ml b/fiber/test/map_reduce_tests.ml new file mode 100644 index 0000000..ae466ba --- /dev/null +++ b/fiber/test/map_reduce_tests.ml @@ -0,0 +1,65 @@ +open Stdune +open Fiber.O + +let printf = Printf.printf +let print_dyn dyn = Dyn.to_string dyn |> print_endline +let () = Printexc.record_backtrace false + +module Scheduler = struct + let t = Test_scheduler.create () + let yield () = Test_scheduler.yield t + let run f = Test_scheduler.run t f +end + +let%expect_test "map_reduce_seq" = + let test = + let+ res = + Fiber.map_reduce_seq + (List.to_seq [ 1; 2; 3 ]) + ~f:(fun x -> + printfn "x: %d" x; + Fiber.return x) + ~empty:0 + ~commutative_combine:( + ) + in + printfn "final: %d" res + in + Scheduler.run test; + [%expect {| + x: 1 + x: 2 + x: 3 + final: 6 |}]; + let test = + let ivars = List.init 3 ~f:(fun _ -> Fiber.Ivar.create ()) in + Fiber.fork_and_join_unit + (fun () -> + let+ res = + Fiber.map_reduce_seq + (List.to_seq ivars) + ~f:(fun ivar -> + let+ x = Fiber.Ivar.read ivar in + printfn "x: %d" x; + x) + ~empty:0 + ~commutative_combine:( + ) + in + printfn "final: %d" res) + (fun () -> + let i = ref 0 in + Fiber.parallel_iter ivars ~f:(fun ivar -> + incr i; + printfn "filling ivar %d" !i; + Fiber.Ivar.fill ivar !i)) + in + Scheduler.run test; + [%expect + {| + filling ivar 1 + filling ivar 2 + filling ivar 3 + x: 1 + x: 2 + x: 3 + final: 6 |}] +;;