Skip to content

Commit

Permalink
Merge pull request #318 from haesbaert/fwrite
Browse files Browse the repository at this point in the history
Implement Flow.write and write_from
  • Loading branch information
haesbaert authored Oct 6, 2022
2 parents 4c743ba + 4995c2c commit 0f9d577
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 2 deletions.
10 changes: 8 additions & 2 deletions lib_eio/flow.ml
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,14 @@ let string_source s : source =
len
end

class virtual sink = object (_ : <Generic.t; ..>)
class virtual sink = object (self : <Generic.t; ..>)
method probe _ = None
method virtual copy : 'a. (#source as 'a) -> unit
method write bufs = self#copy (cstruct_source bufs)
end

let write (t : #sink) (bufs : Cstruct.t list) = t#write bufs

let copy (src : #source) (dst : #sink) = dst#copy src

let copy_string s = copy (string_source s)
Expand All @@ -91,10 +94,13 @@ let buffer_sink b =
Buffer.add_string b (Cstruct.to_string ~len:got buf)
done
with End_of_file -> ()

method! write bufs =
List.iter (fun buf -> Buffer.add_bytes b (Cstruct.to_bytes buf)) bufs
end

class virtual two_way = object (_ : <source; sink; ..>)
method probe _ = None
inherit sink
method read_methods = []

method virtual shutdown : shutdown_command -> unit
Expand Down
9 changes: 9 additions & 0 deletions lib_eio/flow.mli
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,17 @@ type read_method += Read_source_buffer of ((Cstruct.t list -> int) -> unit)
class virtual sink : object
inherit Generic.t
method virtual copy : 'a. (#source as 'a) -> unit

method write : Cstruct.t list -> unit
(** The default implementation is [copy (cstruct_source ...)], but it can be overridden for speed. *)
end

val write : #sink -> Cstruct.t list -> unit
(** [write dst bufs] writes all bytes from [bufs].
This is a low level API, consider using {!copy} if possible as it
may allow optimizations. *)

val copy : #source -> #sink -> unit
(** [copy src dst] copies data from [src] to [dst] until end-of-file. *)

Expand Down
2 changes: 2 additions & 0 deletions lib_eio_linux/eio_linux.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,8 @@ let flow fd =

method read_methods = []

method write bufs = Low_level.writev fd bufs

method copy src =
match get_fd_opt src with
| Some src -> fast_copy_try_splice src fd
Expand Down
8 changes: 8 additions & 0 deletions lib_eio_luv/eio_luv.ml
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,10 @@ let flow fd = object (_ : <source; sink; ..>)

method read_methods = []

method write bufs =
let bufs = List.map Cstruct.to_bigarray bufs in
File.write fd bufs |> or_raise

method copy src =
let buf = Luv.Buffer.create 4096 in
try
Expand Down Expand Up @@ -624,6 +628,10 @@ let socket sock = object
let buf = Cstruct.to_bigarray buf in
Stream.read_into sock buf

method! write bufs =
let bufs = List.map Cstruct.to_bigarray bufs in
Stream.write sock bufs

method copy src =
let buf = Luv.Buffer.create 4096 in
try
Expand Down
11 changes: 11 additions & 0 deletions tests/flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,14 @@ Copying from src using `Read_source_buffer`:
+dst: wrote (rsb) ["bar"]
- : unit = ()
```

## write

```ocaml
# run @@ fun () ->
let dst = Eio_mock.Flow.make "dst" in
Eio_mock.Flow.on_copy_bytes dst [`Return 6];
Eio.Flow.write dst [Cstruct.of_string "foobar"];;
+dst: wrote "foobar"
- : unit = ()
```
30 changes: 30 additions & 0 deletions tests/network.md
Original file line number Diff line number Diff line change
Expand Up @@ -695,3 +695,33 @@ Both attempts time out:
+mock time is now 20
Exception: Eio__Net.Connection_failure Eio__Time.Timeout.
```

## read/write on SOCK_DGRAM

```ocaml
# Eio_main.run @@ fun _ ->
Switch.run @@ fun sw ->
let a, b = Eio_unix.socketpair ~sw ~domain:Unix.PF_UNIX ~ty:Unix.SOCK_DGRAM () in
ignore (Eio_unix.FD.peek a : Unix.file_descr);
ignore (Eio_unix.FD.peek b : Unix.file_descr);
let l = [ "foo"; "bar"; "foobar"; "cellar door" ] in
let buf = Cstruct.create 32 in
let write bufs = Eio.Flow.write a (List.map Cstruct.of_string bufs) in
let read () =
let n = Eio.Flow.read b buf in
traceln "Got: %d bytes: %S" n Cstruct.(to_string (sub buf 0 n))
in
List.iter (fun sbuf -> write [sbuf]) l;
List.iter (fun _ -> read ()) l;
write ["abaca"; "bb"];
read ();
Eio.Flow.close a;
Eio.Flow.close b;;
+Got: 3 bytes: "foo"
+Got: 3 bytes: "bar"
+Got: 6 bytes: "foobar"
+Got: 11 bytes: "cellar door"
+Got: 7 bytes: "abacabb"
- : unit = ()
```

0 comments on commit 0f9d577

Please sign in to comment.