Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Flow.write and write_from #318

Merged
merged 1 commit into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions lib_eio/flow.ml
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,14 @@ let string_source s : source =
len
end

class virtual sink = object (_ : <Generic.t; ..>)
class virtual sink = object (self : <Generic.t; ..>)
method probe _ = None
method virtual copy : 'a. (#source as 'a) -> unit
method write bufs = self#copy (cstruct_source bufs)
end

let write (t : #sink) (bufs : Cstruct.t list) = t#write bufs

let copy (src : #source) (dst : #sink) = dst#copy src

let copy_string s = copy (string_source s)
Expand All @@ -91,10 +94,13 @@ let buffer_sink b =
Buffer.add_string b (Cstruct.to_string ~len:got buf)
done
with End_of_file -> ()

method! write bufs =
List.iter (fun buf -> Buffer.add_bytes b (Cstruct.to_bytes buf)) bufs
end

class virtual two_way = object (_ : <source; sink; ..>)
method probe _ = None
inherit sink
method read_methods = []

method virtual shutdown : shutdown_command -> unit
Expand Down
9 changes: 9 additions & 0 deletions lib_eio/flow.mli
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,17 @@ type read_method += Read_source_buffer of ((Cstruct.t list -> int) -> unit)
class virtual sink : object
inherit Generic.t
method virtual copy : 'a. (#source as 'a) -> unit

method write : Cstruct.t list -> unit
(** The default implementation is [copy (cstruct_source ...)], but it can be overridden for speed. *)
end

val write : #sink -> Cstruct.t list -> unit
(** [write dst bufs] writes all bytes from [bufs].

This is a low level API, consider using {!copy} if possible as it
may allow optimizations. *)

val copy : #source -> #sink -> unit
(** [copy src dst] copies data from [src] to [dst] until end-of-file. *)

Expand Down
2 changes: 2 additions & 0 deletions lib_eio_linux/eio_linux.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,8 @@ let flow fd =

method read_methods = []

method write bufs = Low_level.writev fd bufs

method copy src =
match get_fd_opt src with
| Some src -> fast_copy_try_splice src fd
Expand Down
8 changes: 8 additions & 0 deletions lib_eio_luv/eio_luv.ml
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,10 @@ let flow fd = object (_ : <source; sink; ..>)

method read_methods = []

method write bufs =
let bufs = List.map Cstruct.to_bigarray bufs in
File.write fd bufs |> or_raise

method copy src =
let buf = Luv.Buffer.create 4096 in
try
Expand Down Expand Up @@ -624,6 +628,10 @@ let socket sock = object
let buf = Cstruct.to_bigarray buf in
Stream.read_into sock buf

method! write bufs =
let bufs = List.map Cstruct.to_bigarray bufs in
Stream.write sock bufs

method copy src =
let buf = Luv.Buffer.create 4096 in
try
Expand Down
11 changes: 11 additions & 0 deletions tests/flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,14 @@ Copying from src using `Read_source_buffer`:
+dst: wrote (rsb) ["bar"]
- : unit = ()
```

## write

```ocaml
# run @@ fun () ->
let dst = Eio_mock.Flow.make "dst" in
Eio_mock.Flow.on_copy_bytes dst [`Return 6];
Eio.Flow.write dst [Cstruct.of_string "foobar"];;
+dst: wrote "foobar"
- : unit = ()
```
30 changes: 30 additions & 0 deletions tests/network.md
Original file line number Diff line number Diff line change
Expand Up @@ -695,3 +695,33 @@ Both attempts time out:
+mock time is now 20
Exception: Eio__Net.Connection_failure Eio__Time.Timeout.
```

## read/write on SOCK_DGRAM

```ocaml
# Eio_main.run @@ fun _ ->
Switch.run @@ fun sw ->
let a, b = Eio_unix.socketpair ~sw ~domain:Unix.PF_UNIX ~ty:Unix.SOCK_DGRAM () in
ignore (Eio_unix.FD.peek a : Unix.file_descr);
ignore (Eio_unix.FD.peek b : Unix.file_descr);
let l = [ "foo"; "bar"; "foobar"; "cellar door" ] in
let buf = Cstruct.create 32 in
let write bufs = Eio.Flow.write a (List.map Cstruct.of_string bufs) in
let read () =
let n = Eio.Flow.read b buf in
traceln "Got: %d bytes: %S" n Cstruct.(to_string (sub buf 0 n))
in
List.iter (fun sbuf -> write [sbuf]) l;
List.iter (fun _ -> read ()) l;
write ["abaca"; "bb"];
read ();
Eio.Flow.close a;
Eio.Flow.close b;;
+Got: 3 bytes: "foo"
+Got: 3 bytes: "bar"
+Got: 6 bytes: "foobar"
+Got: 11 bytes: "cellar door"
+Got: 7 bytes: "abacabb"
- : unit = ()
```