Skip to content

Commit

Permalink
Add [Fiber.Lazy]
Browse files Browse the repository at this point in the history
Signed-off-by: Rudi Grinberg <me@rgrinberg.com>
  • Loading branch information
rleshchinskiy authored and rgrinberg committed Apr 2, 2024
1 parent 8e0d4a6 commit a9afdf8
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 0 deletions.
1 change: 1 addition & 0 deletions fiber/src/fiber.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module Svar = Svar
module Throttle = Throttle
module Mutex = Mutex
module Scheduler = Scheduler
module Lazy = Lazy

let run =
let rec loop ~iter (s : _ Scheduler.step) =
Expand Down
28 changes: 28 additions & 0 deletions fiber/src/fiber.mli
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,34 @@ module Cancel : sig
-> ('a * 'b outcome) fiber
end

(** {1 Lazy fibers} *)
module Lazy : sig
(** An asynchronous computation which is executed once only when forced. *)
type 'a t

(** Create an already evaluated lazy computation. *)
val of_value : 'a -> 'a t

(** An already evaluated lazy computation of unit type (a more efficient shortcut for
[of_value ()]. *)
val unit : unit t

(** Create a lazy computation from a thunk which will only be executed when forced. *)
val create : (unit -> 'a fiber) -> 'a t

(** Check if a lazy computation has successfully finished. Note that this does not
force the computation and a [false] result does not guarantee that the computation
hasn't finished. *)
val is_value : 'a t -> bool

(** Force the lazy computation and return its result or reraise its exceptions. *)
val force : 'a t -> 'a fiber

(** Concurrently force multiple lazy computation and wait until they all finish,
reraising any exceptions. *)
val force_all_unit : unit t list -> unit fiber
end

module Expert : sig
(** This module offers no safety protections. It is only needed for maximizing
performance in certain situations *)
Expand Down
104 changes: 104 additions & 0 deletions fiber/src/lazy.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
open! Stdune
open Core
open Core.O

(** State of a lazy computation. Note that for efficiency, we have a separate variant for
successes ([Done]). We don't have one for errors since it doesn't seem worth it; we
get those from the [Ivar] instead. *)
type 'a state =
| (* Finished successfully *)
Done of 'a
| (* Might still be running *)
Running of ('a, Exns.t) Result.t Ivar.t
| (* Hasn't been forced yet *)
Init of (unit -> 'a Core.t)

type 'a t = 'a state ref

let of_value x = ref (Done x)
let unit = ref (Done ())
let create f = ref (Init f)

let prep t =
let v = Ivar.create () in
t := Running v;
v
;;

let execute t v f =
let* r = collect_errors_appendable_list f in
(match r with
| Ok x -> t := Done x
| Error _ -> ());
Ivar.fill v r
;;

let read v =
let* r = Ivar.read v in
match r with
| Ok x -> return x
| Error exns ->
(* Subsequent computations will always force the appendable list into a proper list
and therefore not share this work. For now, this doesn't matter to us. *)
Exns.reraise_all exns
;;

let force t =
let* () = return () in
match !t with
| Done x -> return x
| Running v -> read v
| Init f ->
let v = prep t in
let* () = execute t v f in
read v
;;

let is_value t =
match !t with
| Done _ -> true
| Running _ | Init _ -> false
;;

let force_all_unit =
let stop () = end_of_fiber in
(* Fork all computations that haven't been forced yet. Note that this should be
substantially more efficient that [parallel_map ~f:force] since we ignore
computations which have already been forced. *)
let start ts =
sequential_iter ts ~f:(fun t ->
match !t with
| Done _ | Running _ -> return ()
| Init f ->
let v = prep t in
fork (fun () -> (execute t v f) stop))
in
(* Wait for all computations, collecting all exceptions. *)
(* CR-someday rgrinberg: use [Appendable.t] for [acc] rather than [Appendable.t option]. *)
let rec collect acc = function
| [] -> return acc
| t :: ts ->
(match !t with
| Done _ -> collect acc ts
| Running v ->
let* r = Ivar.read v in
(match r with
| Ok _ -> collect acc ts
| Error exns ->
let exns =
match acc with
| None -> exns
| Some exns' -> Exns.combine exns exns'
in
collect (Some exns) ts)
| Init _ ->
(* We forked all computations previously so this should be impossible. *)
assert false)
in
fun ts ->
let* () = start ts in
let* r = collect None ts in
match r with
| None -> return ()
| Some exns -> Exns.reraise_all exns
;;

0 comments on commit a9afdf8

Please sign in to comment.