Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

documentation updates #51

Merged
merged 6 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 43 additions & 9 deletions combinators/mirage_flow_combinators.ml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,40 @@ open Lwt.Infix
let src = Logs.Src.create "mirage-flow-combinators"
module Log = (val Logs.src_log src : Logs.LOG)

type stats = {
read_bytes: int64;
read_ops: int64;
write_bytes: int64;
write_ops: int64;
duration: int64;
}

let kib = 1024L
let ( ** ) = Int64.mul
let mib = kib ** 1024L
let gib = mib ** 1024L
let tib = gib ** 1024L

let suffix = [
kib, "KiB";
mib, "MiB";
gib, "GiB";
tib, "TiB";
]

let add_suffix x =
List.fold_left (fun acc (y, label) ->
if Int64.div x y > 0L
then Printf.sprintf "%.1f %s" Int64.((to_float x) /. (to_float y)) label
else acc
) (Printf.sprintf "%Ld bytes" x) suffix

let pp_stats ppf s =
Fmt.pf ppf "%s bytes at %s/nanosec and %Lu IOPS/nanosec"
(add_suffix s.read_bytes)
(add_suffix Int64.(div s.read_bytes s.duration))
(Int64.div s.read_ops s.duration)

module type CONCRETE = Mirage_flow.S
with type error = [ `Msg of string ]
and type write_error = [ Mirage_flow.write_error | `Msg of string ]
Expand Down Expand Up @@ -55,7 +89,7 @@ end

type time = int64

type 'a stats = {
type 'a stats_lwt = {
read_bytes: int64 ref;
read_ops: int64 ref;
write_bytes: int64 ref;
Expand All @@ -66,15 +100,15 @@ type 'a stats = {
t: (unit, 'a) result Lwt.t;
}

let stats t =
let stats_lwt t =
let duration : int64 = match !(t.finish) with
| None -> Int64.sub (t.time ()) t.start
| Some x -> Int64.sub x t.start
in {
Mirage_flow.read_bytes = !(t.read_bytes);
read_ops = !(t.read_ops);
write_bytes = !(t.write_bytes);
write_ops = !(t.write_ops);
read_bytes = !(t.read_bytes);
read_ops = !(t.read_ops);
write_bytes = !(t.write_bytes);
write_ops = !(t.write_ops);
duration;
}

Expand Down Expand Up @@ -131,7 +165,7 @@ struct
let copy ~src:a ~dst:b =
let t = start a b in
wait t >|= function
| Ok () -> Ok (stats t)
| Ok () -> Ok (stats_lwt t)
| Error e -> Error e

end
Expand Down Expand Up @@ -161,7 +195,7 @@ struct
A_to_B.wait t >>= fun result ->
A.shutdown a `read >>= fun () ->
B.shutdown b `write >|= fun () ->
let stats = stats t in
let stats = stats_lwt t in
match result with
| Ok () -> Ok stats
| Error e -> Error e
Expand All @@ -171,7 +205,7 @@ struct
B_to_A.wait t >>= fun result ->
B.shutdown b `read >>= fun () ->
A.shutdown a `write >|= fun () ->
let stats = stats t in
let stats = stats_lwt t in
match result with
| Ok () -> Ok stats
| Error e -> Error e
Expand Down
18 changes: 16 additions & 2 deletions combinators/mirage_flow_combinators.mli
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,20 @@

{e Release %%VERSION%% } *)

(** {1 Copy stats} *)

type stats = {
read_bytes: int64;
read_ops: int64;
write_bytes: int64;
write_ops: int64;
duration: int64;
}
(** The type for I/O statistics from a copy operation. *)

val pp_stats: stats Fmt.t
(** [pp_stats] is the pretty-printer for flow stats. *)

(** [CONCRETE] expose the private row as [`Msg str] errors, using
[pp_error] and [pp_write_error]. *)
module type CONCRETE = Mirage_flow.S
Expand All @@ -40,7 +54,7 @@ module Copy (Clock: Mirage_clock.MCLOCK) (A: Mirage_flow.S) (B: Mirage_flow.S):
val pp_error: error Fmt.t
(** [pp_error] pretty-prints errors. *)

val copy: src:A.flow -> dst:B.flow -> (Mirage_flow.stats, error) result Lwt.t
val copy: src:A.flow -> dst:B.flow -> (stats, error) result Lwt.t
(** [copy source destination] copies data from [source] to
[destination] using the clock to compute a transfer rate. On
successful completion, some statistics are returned. On failure we
Expand All @@ -58,7 +72,7 @@ sig
(** [pp_error] pretty-prints errors. *)

val proxy: A.flow -> B.flow ->
((Mirage_flow.stats * Mirage_flow.stats), error) result Lwt.t
((stats * stats), error) result Lwt.t
(** [proxy a b] proxies data between [a] and [b] until both
sides close. If either direction encounters an error then so
will [proxy]. If both directions succeed, then return I/O
Expand Down
34 changes: 0 additions & 34 deletions src/mirage_flow.ml
Original file line number Diff line number Diff line change
Expand Up @@ -39,37 +39,3 @@ module type S = sig
val shutdown : flow -> [ `read | `write | `read_write ] -> unit Lwt.t
val close: flow -> unit Lwt.t
end

type stats = {
read_bytes: int64;
read_ops: int64;
write_bytes: int64;
write_ops: int64;
duration: int64;
}

let kib = 1024L
let ( ** ) = Int64.mul
let mib = kib ** 1024L
let gib = mib ** 1024L
let tib = gib ** 1024L

let suffix = [
kib, "KiB";
mib, "MiB";
gib, "GiB";
tib, "TiB";
]

let add_suffix x =
List.fold_left (fun acc (y, label) ->
if Int64.div x y > 0L
then Printf.sprintf "%.1f %s" Int64.((to_float x) /. (to_float y)) label
else acc
) (Printf.sprintf "%Ld bytes" x) suffix

let pp_stats ppf s =
Fmt.pf ppf "%s bytes at %s/nanosec and %Lu IOPS/nanosec"
(add_suffix s.read_bytes)
(add_suffix Int64.(div s.read_bytes s.duration))
(Int64.div s.read_ops s.duration)
89 changes: 48 additions & 41 deletions src/mirage_flow.mli
Original file line number Diff line number Diff line change
Expand Up @@ -50,63 +50,70 @@ module type S = sig
(** [pp_write_error] is the pretty-printer for write errors. *)

type flow
(** The type for flows. A flow represents the state of a single
reliable stream that is connected to an endpoint. *)
(** The type for flows. A flow represents the state of a single reliable
stream that is connected to an endpoint. *)

val read: flow -> (Cstruct.t or_eof, error) result Lwt.t
(** [read flow] blocks until some data is available and returns a
fresh buffer containing it.
(** [read flow] blocks until some data is available and returns a fresh buffer
containing it.

The returned buffer will be of a size convenient to the flow
implementation, but will always have at least 1 byte.

If the remote endpoint calls [close] then calls to [read] will
keep returning data until all the in-flight data has been read.
[read flow] will return [`Eof] when the remote endpoint has
called [close] and when there is no more in-flight data.
*)
When [read] returns [`Eof] or an error, [close] (or [shutdown]) should be
called on the [flow] by the client. Once [read] returned [`Eof] or an
error, no subsequent [read] call will be successful. *)

val write: flow -> Cstruct.t -> (unit, write_error) result Lwt.t
(** [write flow buffer] writes a buffer to the flow. There is no
indication when the buffer has actually been read and, therefore,
it must not be reused. The contents may be transmitted in
separate packets, depending on the underlying transport. The
result [Ok ()] indicates success, [Error `Closed] indicates that the
connection is now closed and therefore the data could not be
written. Other errors are possible. *)
(** [write flow buffer] writes a buffer to the flow. There is no indication
when the buffer has actually been sent and, therefore, it must not be
reused. The contents may be transmitted in separate packets, depending on
the underlying transport. The result [Ok ()] indicates success,
[Error `Closed] indicates that the connection is now closed and therefore
the data could not be written. Other errors are possible.

The promise is resolved when the buffer has been accepted by the
implementation (if a partial write occured, [write] will wait until the
remainder of the buffer has been accepted by the implementation).

If [write] returns an error, [close] (or [shutdown]) should be called on
the [flow] by the client. Once [write] returned an error, no subsequent
[write] or [writev] call will be successful. *)

val writev: flow -> Cstruct.t list -> (unit, write_error) result Lwt.t
(** [writev flow buffers] writes a sequence of buffers to the flow.
There is no indication when the buffers have actually been read and,
therefore, they must not be reused. The
result [Ok ()] indicates success, [Error `Closed] indicates that the
connection is now closed and therefore the data could not be
written. Other errors are possible. *)
(** [writev flow buffers] writes a sequence of buffers to the flow. There is
no indication when the buffers have actually been sent and, therefore,
they must not be reused. The result [Ok ()] indicates success,
[Error `Closed] indicates that the connection is now closed and therefore
the data could not be written. Other errors are possible.

The promise is resolved when the buffers have been accepted by the
implementation (if a partial write occured, [writev] will wait until all
buffers have been accepted by the implementation).

If [writev] returns an error, [close] (or [shutdown]) should be called on
the [flow] by the client. Once [writev] returned an error, no subsequent
[writev] or [write] call will be successful. *)

val shutdown : flow -> [ `read | `write | `read_write ] -> unit Lwt.t
(** [shutdown flow mode] shuts down the [flow] for the specific [mode]:
A flow which is [shutdown `read] (or [`read_write] will never be [read]
again (future calls will return [`Eof]); a flow which is [shutdown `write]
(or [`read_write]) flushes all pending writes and signals the remote
endpoint there won't be any future [write]. In TCP, a FIN is sent. *)
A flow which is [shutdown `read] (or [`read_write]) will never be [read]
again (subsequent calls will return [`Eof]); a flow which is
[shutdown `write] (or [`read_write]) flushes all pending writes and
signals the remote endpoint there won't be any future [write] or [writev]
calls (subsequent calls will return [`Closed]). E.g. in TCP, the
signalling is done by sending a segment with the FIN flag.

If this [flow] is layered upon another [flow'] (e.g. TLS over TCP),
and the internal state after [shutdown] is [`Closed], [close] on the
underlying [flow'] is executed. *)

val close: flow -> unit Lwt.t
(** [close flow] terminates the [flow] and frees all associated data. Any
subsequent [read] or [write] will return an error. A subsequent [close]
will not do anything (esp. not raising an exception), but it may log an
error. *)
end

(** {1 Copy stats} *)
error.

type stats = {
read_bytes: int64;
read_ops: int64;
write_bytes: int64;
write_ops: int64;
duration: int64;
}
(** The type for I/O statistics from a copy operation. *)

val pp_stats: stats Fmt.t
(** [pp_stats] is the pretty-printer for flow stats. *)
If this [flow] is layered upon another [flow'] (e.g. TLS over TCP),
[close] on the underlying [flow'] is executed. *)
hannesm marked this conversation as resolved.
Show resolved Hide resolved
end