forked from ocaml/dune
-
Notifications
You must be signed in to change notification settings - Fork 0
/
fiber.mli
258 lines (180 loc) · 7.4 KB
/
fiber.mli
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
(** Concurrency library *)
open! Stdune
(** {1 Generals} *)
(** Type of fiber. A fiber represent a suspended computation. Note that using
the same fiber twice will execute it twice, which is probably not what you
want. To share the result of a fiber, use an [Ivar.t]. *)
type 'a t
(** Create a fiber that has already terminated. *)
val return : 'a -> 'a t
(** Converts a thunk to a fiber, making sure the thunk runs in the context of
the fiber (rather than applied in the current context).
Equivalent to [(>>=) (return ())], but more explicit. *)
val of_thunk : (unit -> 'a t) -> 'a t
(** Fiber that never completes. *)
val never : 'a t
module O : sig
(** [>>>] is a sequencing operator. [a >>> b] is the fiber that first executes
[a] and then [b]. *)
val ( >>> ) : unit t -> 'a t -> 'a t
(** [>>=] is similar to [>>>] except that the result of the first fiber is
used to create the second one. *)
val ( >>= ) : 'a t -> ('a -> 'b t) -> 'b t
(** [t >>| f] is the same as [t >>= fun x -> return (f x)] but slightly more
efficient. *)
val ( >>| ) : 'a t -> ('a -> 'b) -> 'b t
val ( let* ) : 'a t -> ('a -> 'b t) -> 'b t
val ( let+ ) : 'a t -> ('a -> 'b) -> 'b t
end
val map : 'a t -> f:('a -> 'b) -> 'b t
val bind : 'a t -> f:('a -> 'b t) -> 'b t
(** {1 Forking execution} *)
module Future : sig
type 'a fiber
(** A future represent a promise that will eventually yield a value. It is
used to represent the result of a fiber running in the background. *)
type 'a t
(** Wait for the given future to yield a value. *)
val wait : 'a t -> 'a fiber
(** Return [Some x] if [t] has already returned. *)
val peek : 'a t -> 'a option
end
with type 'a fiber := 'a t
(** [fork f] creates a sub-fiber and return a [Future.t] to wait its result. *)
val fork : (unit -> 'a t) -> 'a Future.t t
(** [nfork l] is similar to [fork] but creates [n] sub-fibers. *)
val nfork : (unit -> 'a t) list -> 'a Future.t list t
(** [nfork_map l ~f] is the same as [nfork (List.map l ~f:(fun x () -> f x))]
but more efficient. *)
val nfork_map : 'a list -> f:('a -> 'b t) -> 'b Future.t list t
(** {1 Joining} *)
(** The following combinators are helpers to combine the result of several
fibers into one. Note that they do not introduce parallelism. *)
val both : 'a t -> 'b t -> ('a * 'b) t
val sequential_map : 'a list -> f:('a -> 'b t) -> 'b list t
val sequential_iter : 'a list -> f:('a -> unit t) -> unit t
(** {1 Forking + joining} *)
(** The following functions combine forking 2 or more fibers followed by joining
the results. For every function, we give an equivalent implementation using
the more basic functions as documentation. Note however that these functions
are implemented as primitives and so are more efficient that the suggested
implementation. *)
(** For two fibers and wait for their results:
{[
let fork_and_join f g =
fork f >>= fun a ->
fork g >>= fun b -> both (Future.wait a) (Future.wait b)
]} *)
val fork_and_join : (unit -> 'a t) -> (unit -> 'b t) -> ('a * 'b) t
(** Same but assume the first fiber returns [unit]:
{[
let fork_and_join_unit f g =
fork f >>= fun a ->
fork g >>= fun b -> Future.wait a >>> Future.wait b
]} *)
val fork_and_join_unit : (unit -> unit t) -> (unit -> 'a t) -> 'a t
(** Map a list in parallel:
{[
let parallel_map l ~f =
nfork_map l ~f >>= fun futures -> all (List.map futures ~f:Future.wait)
]} *)
val parallel_map : 'a list -> f:('a -> 'b t) -> 'b list t
(** Iter over a list in parallel:
{[
let parallel_iter l ~f =
nfork_map l ~f >>= fun futures ->
all_unit (List.map futures ~f:Future.wait)
]} *)
val parallel_iter : 'a list -> f:('a -> unit t) -> unit t
(** {1 Execute once fibers} *)
module Once : sig
type 'a fiber = 'a t
type 'a t
val create : (unit -> 'a fiber) -> 'a t
(** [get t] returns the value of [t]. If [get] was never called before on this
[t], it is executed at this point, otherwise returns a fiber that waits
for the fiber from the first call to [get t] to terminate. *)
val get : 'a t -> 'a fiber
(** [peek t] returns [Some v] if [get t] has already been called and has
yielded a value [v]. *)
val peek : 'a t -> 'a option
val peek_exn : 'a t -> 'a
end
with type 'a fiber := 'a t
(** {1 Local storage} *)
(** Variables local to a fiber *)
module Var : sig
type 'a fiber = 'a t
type 'a t
(** Create a new variable *)
val create : unit -> 'a t
(** [get var] reads the value of [var]. *)
val get : 'a t -> 'a option
(** Same as [get] but raises if [var] is unset. *)
val get_exn : 'a t -> 'a
(** [set var value fiber] sets [var] to [value] during the execution of
[fiber].
For instance, the following fiber always evaluate to [true]:
{[ set v x (get_exn v >>| fun y -> x = y) ]} *)
val set : 'a t -> 'a -> (unit -> 'b fiber) -> 'b fiber
val set_sync : 'a t -> 'a -> (unit -> 'b) -> 'b
val unset : 'a t -> (unit -> 'b fiber) -> 'b fiber
val unset_sync : 'a t -> (unit -> 'b) -> 'b
end
with type 'a fiber := 'a t
(** {1 Error handling} *)
(** [with_error_handler f ~on_error] calls [on_error] for every exception raised
during the execution of [f]. This include exceptions raised when calling
[f ()] or during the execution of fibers after [f ()] has returned.
Exceptions raised by [on_error] are passed on to the parent error handler.
It is guaranteed that after the fiber has returned a value, [on_error] will
never be called. *)
val with_error_handler :
(unit -> 'a t) -> on_error:(Exn_with_backtrace.t -> unit) -> 'a t
(** [fold_errors f ~init ~on_error] calls [on_error] for every exception raised
during the execution of [f]. This include exceptions raised when calling
[f ()] or during the execution of fibers after [f ()] has returned.
Exceptions raised by [on_error] are passed on to the parent error handler. *)
val fold_errors :
(unit -> 'a t)
-> init:'b
-> on_error:(Exn_with_backtrace.t -> 'b -> 'b)
-> ('a, 'b) Result.t t
(** [collect_errors f] is:
[fold_errors f ~init:\[\] ~on_error:(fun e l -> e :: l)] *)
val collect_errors :
(unit -> 'a t) -> ('a, Exn_with_backtrace.t list) Result.t t
(** [finalize f ~finally] runs [finally] after [f ()] has terminated, whether it
fails or succeeds. *)
val finalize : (unit -> 'a t) -> finally:(unit -> unit t) -> 'a t
(** {1 Synchronization} *)
(** Write once variables *)
module Ivar : sig
type 'a fiber = 'a t
(** A ivar is a synchronization variable that can be written only once. *)
type 'a t
(** Create a new empty ivar. *)
val create : unit -> 'a t
(** Read the contents of the ivar. *)
val read : 'a t -> 'a fiber
(** Fill the ivar with the following value. This can only be called once for a
given ivar. *)
val fill : 'a t -> 'a -> unit fiber
(** Return [Some x] is [fill t x] has been called previously. *)
val peek : 'a t -> 'a option
end
with type 'a fiber := 'a t
module Mutex : sig
type 'a fiber = 'a t
type t
val create : unit -> t
val with_lock : t -> (unit -> 'a fiber) -> 'a fiber
end
with type 'a fiber := 'a t
(** {1 Running fibers} *)
(** Wait for one iteration of the scheduler *)
val yield : unit -> unit t
(** [run t] runs a fiber until it (and all the fibers it forked) terminate.
Returns the result if it's determined in the end, otherwise raises [Never]. *)
val run : 'a t -> 'a
exception Never