From 4995c2c8288333616d539adef4e775c7a79db0a1 Mon Sep 17 00:00:00 2001 From: Christiano Haesbaert Date: Thu, 8 Sep 2022 17:43:00 +0200 Subject: [PATCH] Add Flow.write This provides an optimised alternative to copy in the case where you are writing from a buffer. The default implementation simply call copy. Co-authored-by: Thomas Leonard --- lib_eio/flow.ml | 10 ++++++++-- lib_eio/flow.mli | 9 +++++++++ lib_eio_linux/eio_linux.ml | 2 ++ lib_eio_luv/eio_luv.ml | 8 ++++++++ tests/flow.md | 11 +++++++++++ tests/network.md | 30 ++++++++++++++++++++++++++++++ 6 files changed, 68 insertions(+), 2 deletions(-) diff --git a/lib_eio/flow.ml b/lib_eio/flow.ml index 862d2b93f..035f58430 100644 --- a/lib_eio/flow.ml +++ b/lib_eio/flow.ml @@ -70,11 +70,14 @@ let string_source s : source = len end -class virtual sink = object (_ : ) +class virtual sink = object (self : ) method probe _ = None method virtual copy : 'a. (#source as 'a) -> unit + method write bufs = self#copy (cstruct_source bufs) end +let write (t : #sink) (bufs : Cstruct.t list) = t#write bufs + let copy (src : #source) (dst : #sink) = dst#copy src let copy_string s = copy (string_source s) @@ -91,10 +94,13 @@ let buffer_sink b = Buffer.add_string b (Cstruct.to_string ~len:got buf) done with End_of_file -> () + + method! write bufs = + List.iter (fun buf -> Buffer.add_bytes b (Cstruct.to_bytes buf)) bufs end class virtual two_way = object (_ : ) - method probe _ = None + inherit sink method read_methods = [] method virtual shutdown : shutdown_command -> unit diff --git a/lib_eio/flow.mli b/lib_eio/flow.mli index ed2e4286b..5a8e52485 100644 --- a/lib_eio/flow.mli +++ b/lib_eio/flow.mli @@ -62,8 +62,17 @@ type read_method += Read_source_buffer of ((Cstruct.t list -> int) -> unit) class virtual sink : object inherit Generic.t method virtual copy : 'a. (#source as 'a) -> unit + + method write : Cstruct.t list -> unit + (** The default implementation is [copy (cstruct_source ...)], but it can be overridden for speed. *) end +val write : #sink -> Cstruct.t list -> unit +(** [write dst bufs] writes all bytes from [bufs]. + + This is a low level API, consider using {!copy} if possible as it + may allow optimizations. *) + val copy : #source -> #sink -> unit (** [copy src dst] copies data from [src] to [dst] until end-of-file. *) diff --git a/lib_eio_linux/eio_linux.ml b/lib_eio_linux/eio_linux.ml index c6821e905..dd1caf7aa 100644 --- a/lib_eio_linux/eio_linux.ml +++ b/lib_eio_linux/eio_linux.ml @@ -1044,6 +1044,8 @@ let flow fd = method read_methods = [] + method write bufs = Low_level.writev fd bufs + method copy src = match get_fd_opt src with | Some src -> fast_copy_try_splice src fd diff --git a/lib_eio_luv/eio_luv.ml b/lib_eio_luv/eio_luv.ml index a562ed3ce..442a91d1d 100644 --- a/lib_eio_luv/eio_luv.ml +++ b/lib_eio_luv/eio_luv.ml @@ -597,6 +597,10 @@ let flow fd = object (_ : ) method read_methods = [] + method write bufs = + let bufs = List.map Cstruct.to_bigarray bufs in + File.write fd bufs |> or_raise + method copy src = let buf = Luv.Buffer.create 4096 in try @@ -624,6 +628,10 @@ let socket sock = object let buf = Cstruct.to_bigarray buf in Stream.read_into sock buf + method! write bufs = + let bufs = List.map Cstruct.to_bigarray bufs in + Stream.write sock bufs + method copy src = let buf = Luv.Buffer.create 4096 in try diff --git a/tests/flow.md b/tests/flow.md index 07196bc32..8cf8caf8d 100644 --- a/tests/flow.md +++ b/tests/flow.md @@ -107,3 +107,14 @@ Copying from src using `Read_source_buffer`: +dst: wrote (rsb) ["bar"] - : unit = () ``` + +## write + +```ocaml +# run @@ fun () -> + let dst = Eio_mock.Flow.make "dst" in + Eio_mock.Flow.on_copy_bytes dst [`Return 6]; + Eio.Flow.write dst [Cstruct.of_string "foobar"];; ++dst: wrote "foobar" +- : unit = () +``` diff --git a/tests/network.md b/tests/network.md index aceae0495..9b5f7c205 100644 --- a/tests/network.md +++ b/tests/network.md @@ -695,3 +695,33 @@ Both attempts time out: +mock time is now 20 Exception: Eio__Net.Connection_failure Eio__Time.Timeout. ``` + +## read/write on SOCK_DGRAM + +```ocaml +# Eio_main.run @@ fun _ -> + Switch.run @@ fun sw -> + let a, b = Eio_unix.socketpair ~sw ~domain:Unix.PF_UNIX ~ty:Unix.SOCK_DGRAM () in + ignore (Eio_unix.FD.peek a : Unix.file_descr); + ignore (Eio_unix.FD.peek b : Unix.file_descr); + let l = [ "foo"; "bar"; "foobar"; "cellar door" ] in + let buf = Cstruct.create 32 in + let write bufs = Eio.Flow.write a (List.map Cstruct.of_string bufs) in + let read () = + let n = Eio.Flow.read b buf in + traceln "Got: %d bytes: %S" n Cstruct.(to_string (sub buf 0 n)) + in + List.iter (fun sbuf -> write [sbuf]) l; + List.iter (fun _ -> read ()) l; + write ["abaca"; "bb"]; + read (); + Eio.Flow.close a; + Eio.Flow.close b;; ++Got: 3 bytes: "foo" ++Got: 3 bytes: "bar" ++Got: 6 bytes: "foobar" ++Got: 11 bytes: "cellar door" ++Got: 7 bytes: "abacabb" +- : unit = () +``` +