From b240bf548cf7dcb17dc3f90a6142d23fa789061f Mon Sep 17 00:00:00 2001 From: Hannes Mehnert Date: Thu, 21 Dec 2023 18:20:03 +0100 Subject: [PATCH 1/6] move Mirage_flow.stats (and pp_stats) to Mirage_flow_combinators --- combinators/mirage_flow_combinators.ml | 52 ++++++++++++++++++++----- combinators/mirage_flow_combinators.mli | 18 ++++++++- src/mirage_flow.ml | 34 ---------------- src/mirage_flow.mli | 14 ------- 4 files changed, 59 insertions(+), 59 deletions(-) diff --git a/combinators/mirage_flow_combinators.ml b/combinators/mirage_flow_combinators.ml index 0fb1eae..4da4205 100644 --- a/combinators/mirage_flow_combinators.ml +++ b/combinators/mirage_flow_combinators.ml @@ -21,6 +21,40 @@ open Lwt.Infix let src = Logs.Src.create "mirage-flow-combinators" module Log = (val Logs.src_log src : Logs.LOG) +type stats = { + read_bytes: int64; + read_ops: int64; + write_bytes: int64; + write_ops: int64; + duration: int64; +} + +let kib = 1024L +let ( ** ) = Int64.mul +let mib = kib ** 1024L +let gib = mib ** 1024L +let tib = gib ** 1024L + +let suffix = [ + kib, "KiB"; + mib, "MiB"; + gib, "GiB"; + tib, "TiB"; +] + +let add_suffix x = + List.fold_left (fun acc (y, label) -> + if Int64.div x y > 0L + then Printf.sprintf "%.1f %s" Int64.((to_float x) /. (to_float y)) label + else acc + ) (Printf.sprintf "%Ld bytes" x) suffix + +let pp_stats ppf s = + Fmt.pf ppf "%s bytes at %s/nanosec and %Lu IOPS/nanosec" + (add_suffix s.read_bytes) + (add_suffix Int64.(div s.read_bytes s.duration)) + (Int64.div s.read_ops s.duration) + module type CONCRETE = Mirage_flow.S with type error = [ `Msg of string ] and type write_error = [ Mirage_flow.write_error | `Msg of string ] @@ -55,7 +89,7 @@ end type time = int64 -type 'a stats = { +type 'a stats_lwt = { read_bytes: int64 ref; read_ops: int64 ref; write_bytes: int64 ref; @@ -66,15 +100,15 @@ type 'a stats = { t: (unit, 'a) result Lwt.t; } -let stats t = +let stats_lwt t = let duration : int64 = match !(t.finish) with | None -> Int64.sub (t.time ()) t.start | Some x -> Int64.sub x t.start in { - Mirage_flow.read_bytes = !(t.read_bytes); - read_ops = !(t.read_ops); - write_bytes = !(t.write_bytes); - write_ops = !(t.write_ops); + read_bytes = !(t.read_bytes); + read_ops = !(t.read_ops); + write_bytes = !(t.write_bytes); + write_ops = !(t.write_ops); duration; } @@ -131,7 +165,7 @@ struct let copy ~src:a ~dst:b = let t = start a b in wait t >|= function - | Ok () -> Ok (stats t) + | Ok () -> Ok (stats_lwt t) | Error e -> Error e end @@ -161,7 +195,7 @@ struct A_to_B.wait t >>= fun result -> A.shutdown a `read >>= fun () -> B.shutdown b `write >|= fun () -> - let stats = stats t in + let stats = stats_lwt t in match result with | Ok () -> Ok stats | Error e -> Error e @@ -171,7 +205,7 @@ struct B_to_A.wait t >>= fun result -> B.shutdown b `read >>= fun () -> A.shutdown a `write >|= fun () -> - let stats = stats t in + let stats = stats_lwt t in match result with | Ok () -> Ok stats | Error e -> Error e diff --git a/combinators/mirage_flow_combinators.mli b/combinators/mirage_flow_combinators.mli index 52ad341..2fa82b0 100644 --- a/combinators/mirage_flow_combinators.mli +++ b/combinators/mirage_flow_combinators.mli @@ -22,6 +22,20 @@ {e Release %%VERSION%% } *) +(** {1 Copy stats} *) + +type stats = { + read_bytes: int64; + read_ops: int64; + write_bytes: int64; + write_ops: int64; + duration: int64; +} +(** The type for I/O statistics from a copy operation. *) + +val pp_stats: stats Fmt.t +(** [pp_stats] is the pretty-printer for flow stats. *) + (** [CONCRETE] expose the private row as [`Msg str] errors, using [pp_error] and [pp_write_error]. *) module type CONCRETE = Mirage_flow.S @@ -40,7 +54,7 @@ module Copy (Clock: Mirage_clock.MCLOCK) (A: Mirage_flow.S) (B: Mirage_flow.S): val pp_error: error Fmt.t (** [pp_error] pretty-prints errors. *) - val copy: src:A.flow -> dst:B.flow -> (Mirage_flow.stats, error) result Lwt.t + val copy: src:A.flow -> dst:B.flow -> (stats, error) result Lwt.t (** [copy source destination] copies data from [source] to [destination] using the clock to compute a transfer rate. On successful completion, some statistics are returned. On failure we @@ -58,7 +72,7 @@ sig (** [pp_error] pretty-prints errors. *) val proxy: A.flow -> B.flow -> - ((Mirage_flow.stats * Mirage_flow.stats), error) result Lwt.t + ((stats * stats), error) result Lwt.t (** [proxy a b] proxies data between [a] and [b] until both sides close. If either direction encounters an error then so will [proxy]. If both directions succeed, then return I/O diff --git a/src/mirage_flow.ml b/src/mirage_flow.ml index 2ff9178..42485bb 100644 --- a/src/mirage_flow.ml +++ b/src/mirage_flow.ml @@ -39,37 +39,3 @@ module type S = sig val shutdown : flow -> [ `read | `write | `read_write ] -> unit Lwt.t val close: flow -> unit Lwt.t end - -type stats = { - read_bytes: int64; - read_ops: int64; - write_bytes: int64; - write_ops: int64; - duration: int64; -} - -let kib = 1024L -let ( ** ) = Int64.mul -let mib = kib ** 1024L -let gib = mib ** 1024L -let tib = gib ** 1024L - -let suffix = [ - kib, "KiB"; - mib, "MiB"; - gib, "GiB"; - tib, "TiB"; -] - -let add_suffix x = - List.fold_left (fun acc (y, label) -> - if Int64.div x y > 0L - then Printf.sprintf "%.1f %s" Int64.((to_float x) /. (to_float y)) label - else acc - ) (Printf.sprintf "%Ld bytes" x) suffix - -let pp_stats ppf s = - Fmt.pf ppf "%s bytes at %s/nanosec and %Lu IOPS/nanosec" - (add_suffix s.read_bytes) - (add_suffix Int64.(div s.read_bytes s.duration)) - (Int64.div s.read_ops s.duration) diff --git a/src/mirage_flow.mli b/src/mirage_flow.mli index 378e261..c41c87d 100644 --- a/src/mirage_flow.mli +++ b/src/mirage_flow.mli @@ -96,17 +96,3 @@ module type S = sig will not do anything (esp. not raising an exception), but it may log an error. *) end - -(** {1 Copy stats} *) - -type stats = { - read_bytes: int64; - read_ops: int64; - write_bytes: int64; - write_ops: int64; - duration: int64; -} -(** The type for I/O statistics from a copy operation. *) - -val pp_stats: stats Fmt.t -(** [pp_stats] is the pretty-printer for flow stats. *) From 205304f0c4d474686e394f83260c9e2fdb718d42 Mon Sep 17 00:00:00 2001 From: Hannes Mehnert Date: Thu, 21 Dec 2023 18:33:12 +0100 Subject: [PATCH 2/6] adjust documentation after discussion with @reynir @dinosaure (1) close/shutdown is recursive (2) read/write errors do not lead to closed sockets --- src/mirage_flow.mli | 66 ++++++++++++++++++++++++++------------------- 1 file changed, 39 insertions(+), 27 deletions(-) diff --git a/src/mirage_flow.mli b/src/mirage_flow.mli index c41c87d..8ac7f3d 100644 --- a/src/mirage_flow.mli +++ b/src/mirage_flow.mli @@ -50,49 +50,61 @@ module type S = sig (** [pp_write_error] is the pretty-printer for write errors. *) type flow - (** The type for flows. A flow represents the state of a single - reliable stream that is connected to an endpoint. *) + (** The type for flows. A flow represents the state of a single reliable + stream that is connected to an endpoint. *) val read: flow -> (Cstruct.t or_eof, error) result Lwt.t - (** [read flow] blocks until some data is available and returns a - fresh buffer containing it. + (** [read flow] blocks until some data is available and returns a fresh buffer + containing it. The returned buffer will be of a size convenient to the flow implementation, but will always have at least 1 byte. - If the remote endpoint calls [close] then calls to [read] will - keep returning data until all the in-flight data has been read. - [read flow] will return [`Eof] when the remote endpoint has - called [close] and when there is no more in-flight data. - *) + When [read] returns [`Eof] or an error, the [flow] should be [close]d (or + [shutdown]ed) by the client. Once [read] returned [`Eof] or an error, no + subsequent [read] call will be successful. *) val write: flow -> Cstruct.t -> (unit, write_error) result Lwt.t - (** [write flow buffer] writes a buffer to the flow. There is no - indication when the buffer has actually been read and, therefore, - it must not be reused. The contents may be transmitted in - separate packets, depending on the underlying transport. The - result [Ok ()] indicates success, [Error `Closed] indicates that the - connection is now closed and therefore the data could not be - written. Other errors are possible. *) + (** [write flow buffer] writes a buffer to the flow. There is no indication + when the buffer has actually been read and, therefore, it must not be + reused. The contents may be transmitted in separate packets, depending on + the underlying transport. The result [Ok ()] indicates success, + [Error `Closed] indicates that the connection is now closed and therefore + the data could not be written. Other errors are possible. + + If [write] returns an error, [close] (or [shutdown]) should be called on + the [flow] by the client. Once [write] returned an error, no subsequent + [write] or [writev] call will be successful. *) val writev: flow -> Cstruct.t list -> (unit, write_error) result Lwt.t - (** [writev flow buffers] writes a sequence of buffers to the flow. - There is no indication when the buffers have actually been read and, - therefore, they must not be reused. The - result [Ok ()] indicates success, [Error `Closed] indicates that the - connection is now closed and therefore the data could not be - written. Other errors are possible. *) + (** [writev flow buffers] writes a sequence of buffers to the flow. There is + no indication when the buffers have actually been read and, therefore, + they must not be reused. The result [Ok ()] indicates success, + [Error `Closed] indicates that the connection is now closed and therefore + the data could not be written. Other errors are possible. + + If [writev] returns an error, [close] (or [shutdown]) should be called on + the [flow] by the client. Once [writev] returned an error, no subsequent + [writev] or [write] call will be successful. *) val shutdown : flow -> [ `read | `write | `read_write ] -> unit Lwt.t (** [shutdown flow mode] shuts down the [flow] for the specific [mode]: - A flow which is [shutdown `read] (or [`read_write] will never be [read] - again (future calls will return [`Eof]); a flow which is [shutdown `write] - (or [`read_write]) flushes all pending writes and signals the remote - endpoint there won't be any future [write]. In TCP, a FIN is sent. *) + A flow which is [shutdown `read] (or [`read_write]) will never be [read] + again (subsequent calls will return [`Eof]); a flow which is + [shutdown `write] (or [`read_write]) flushes all pending writes and + signals the remote endpoint there won't be any future [write] or [writev] + calls (subsequent calls will return [`Closed]). E.g. in TCP, the + signalling is done by sending a segment with the FIN flag. + + If this [flow] is layered upon another [flow'] (e.g. TLS over TCP), + [shutdown] on the underlying [flow'] is executed. *) val close: flow -> unit Lwt.t (** [close flow] terminates the [flow] and frees all associated data. Any subsequent [read] or [write] will return an error. A subsequent [close] will not do anything (esp. not raising an exception), but it may log an - error. *) + error. + + If this [flow] is layered upon another [flow'] (e.g. TLS over TCP), + [close] on the underlying [flow'] is executed. *) end From 7a4eb61795da33ab2613dd3e0aa9fb992bcf7567 Mon Sep 17 00:00:00 2001 From: Hannes Mehnert Date: Thu, 21 Dec 2023 18:37:07 +0100 Subject: [PATCH 3/6] tweaks --- src/mirage_flow.mli | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/mirage_flow.mli b/src/mirage_flow.mli index 8ac7f3d..28b8389 100644 --- a/src/mirage_flow.mli +++ b/src/mirage_flow.mli @@ -60,18 +60,22 @@ module type S = sig The returned buffer will be of a size convenient to the flow implementation, but will always have at least 1 byte. - When [read] returns [`Eof] or an error, the [flow] should be [close]d (or - [shutdown]ed) by the client. Once [read] returned [`Eof] or an error, no - subsequent [read] call will be successful. *) + When [read] returns [`Eof] or an error, [close] (or [shutdown]) should be + called on the [flow] by the client. Once [read] returned [`Eof] or an + error, no subsequent [read] call will be successful. *) val write: flow -> Cstruct.t -> (unit, write_error) result Lwt.t (** [write flow buffer] writes a buffer to the flow. There is no indication - when the buffer has actually been read and, therefore, it must not be + when the buffer has actually been sent and, therefore, it must not be reused. The contents may be transmitted in separate packets, depending on the underlying transport. The result [Ok ()] indicates success, [Error `Closed] indicates that the connection is now closed and therefore the data could not be written. Other errors are possible. + The call to [write] blocks until the buffer has been accepted by the + implementation (if a partial write occured, write will wait until the + remainder of the buffer has been accepted by the implementation). + If [write] returns an error, [close] (or [shutdown]) should be called on the [flow] by the client. Once [write] returned an error, no subsequent [write] or [writev] call will be successful. *) From e8b4b90976e28bd55e28dcb286538f8ab58baf21 Mon Sep 17 00:00:00 2001 From: Hannes Mehnert Date: Fri, 22 Dec 2023 13:32:40 +0100 Subject: [PATCH 4/6] remove the shutdown being done on the underlying layer as well, since that's not what e.g. ssh does --- src/mirage_flow.mli | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/mirage_flow.mli b/src/mirage_flow.mli index 28b8389..e40b92e 100644 --- a/src/mirage_flow.mli +++ b/src/mirage_flow.mli @@ -98,10 +98,7 @@ module type S = sig [shutdown `write] (or [`read_write]) flushes all pending writes and signals the remote endpoint there won't be any future [write] or [writev] calls (subsequent calls will return [`Closed]). E.g. in TCP, the - signalling is done by sending a segment with the FIN flag. - - If this [flow] is layered upon another [flow'] (e.g. TLS over TCP), - [shutdown] on the underlying [flow'] is executed. *) + signalling is done by sending a segment with the FIN flag. *) val close: flow -> unit Lwt.t (** [close flow] terminates the [flow] and frees all associated data. Any From 167452f475275a4d3acf2ba7c61418a9a48044cb Mon Sep 17 00:00:00 2001 From: Hannes Mehnert Date: Thu, 8 Feb 2024 12:01:54 +0100 Subject: [PATCH 5/6] add the partial write to writev as well --- src/mirage_flow.mli | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/mirage_flow.mli b/src/mirage_flow.mli index e40b92e..4c7993a 100644 --- a/src/mirage_flow.mli +++ b/src/mirage_flow.mli @@ -73,7 +73,7 @@ module type S = sig the data could not be written. Other errors are possible. The call to [write] blocks until the buffer has been accepted by the - implementation (if a partial write occured, write will wait until the + implementation (if a partial write occured, [write] will wait until the remainder of the buffer has been accepted by the implementation). If [write] returns an error, [close] (or [shutdown]) should be called on @@ -82,11 +82,15 @@ module type S = sig val writev: flow -> Cstruct.t list -> (unit, write_error) result Lwt.t (** [writev flow buffers] writes a sequence of buffers to the flow. There is - no indication when the buffers have actually been read and, therefore, + no indication when the buffers have actually been sent and, therefore, they must not be reused. The result [Ok ()] indicates success, [Error `Closed] indicates that the connection is now closed and therefore the data could not be written. Other errors are possible. + The call to [writev] blocks until the buffers have been accepted by the + implementation (if a partial write occured, [writev] will wait until all + buffers have been accepted by the implementation). + If [writev] returns an error, [close] (or [shutdown]) should be called on the [flow] by the client. Once [writev] returned an error, no subsequent [writev] or [write] call will be successful. *) From 6c5b1dfca8fddf5135a45b55703264af74936b80 Mon Sep 17 00:00:00 2001 From: Hannes Mehnert Date: Thu, 8 Feb 2024 12:19:15 +0100 Subject: [PATCH 6/6] tweak (address @reynir comment) --- src/mirage_flow.mli | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/mirage_flow.mli b/src/mirage_flow.mli index 4c7993a..8bf565c 100644 --- a/src/mirage_flow.mli +++ b/src/mirage_flow.mli @@ -72,7 +72,7 @@ module type S = sig [Error `Closed] indicates that the connection is now closed and therefore the data could not be written. Other errors are possible. - The call to [write] blocks until the buffer has been accepted by the + The promise is resolved when the buffer has been accepted by the implementation (if a partial write occured, [write] will wait until the remainder of the buffer has been accepted by the implementation). @@ -87,7 +87,7 @@ module type S = sig [Error `Closed] indicates that the connection is now closed and therefore the data could not be written. Other errors are possible. - The call to [writev] blocks until the buffers have been accepted by the + The promise is resolved when the buffers have been accepted by the implementation (if a partial write occured, [writev] will wait until all buffers have been accepted by the implementation). @@ -102,7 +102,11 @@ module type S = sig [shutdown `write] (or [`read_write]) flushes all pending writes and signals the remote endpoint there won't be any future [write] or [writev] calls (subsequent calls will return [`Closed]). E.g. in TCP, the - signalling is done by sending a segment with the FIN flag. *) + signalling is done by sending a segment with the FIN flag. + + If this [flow] is layered upon another [flow'] (e.g. TLS over TCP), + and the internal state after [shutdown] is [`Closed], [close] on the + underlying [flow'] is executed. *) val close: flow -> unit Lwt.t (** [close flow] terminates the [flow] and frees all associated data. Any