-
Notifications
You must be signed in to change notification settings - Fork 71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Flow.write and write_from #318
Conversation
lib_eio/flow.ml
Outdated
end | ||
|
||
let write (t : #sink) (buf : Cstruct.t) = t#write_from buf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the change above, we can provide both write_all
and write_single
here, like in File
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My first version had both, ha, I'll add it back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is libuv doesn't give us a way to do write_single
, it will queue the next write and attempt to write all of them.
If you're curious, it's in:
stream.c: uv_write()->uv_write2()->uv__write()->uv__try_write()-> SHORTCOUNT ?->uv__io_start()...
:
n = uv__try_write(stream,
&(req->bufs[req->write_index]),
req->nbufs - req->write_index,
req->send_handle);
/* Ensure the handle isn't sent again in case this is a partial write. */
if (n >= 0) {
req->send_handle = NULL;
if (uv__write_req_update(stream, req, n)) { /* haesbaert: true if NOT a shortcount */
uv__write_req_finish(req);
return; /* TODO(bnoordhuis) Start trying to write the next request. */
}
} else if (n != UV_EAGAIN)
break;
/* If this is a blocking stream, try again. */
if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
continue;
/* We're not done. haesbaert: shortcount falls here */
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
int uv_write(uv_write_t *req, uv_stream_t *handle, const uv_buf_t bufs[], unsigned int nbufs, uv_write_cb cb)
Write data to stream. Buffers are written in order. Example:
void cb(uv_write_t* req, int status) {
/* Logic which handles the write result */
}
uv_buf_t a[] = {
{ .base = "1", .len = 1 },
{ .base = "2", .len = 1 }
};
uv_buf_t b[] = {
{ .base = "3", .len = 1 },
{ .base = "4", .len = 1 }
};
uv_write_t req1;
uv_write_t req2;
/* writes "1234" */
uv_write(&req1, stream, a, 2, cb);
uv_write(&req2, stream, b, 2, cb);
NOTE:
The memory pointed to by the buffers must remain valid until the callback gets called. This also holds for uv_write2().
And no, it doesn't return bytes written, it's just 0 or error :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the option is to either lie (write_single = write_all) in libuv, and god knows how many writes were done, or not provide a write_single.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should copy
have a default implementation in terms of write_from
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
libuv is callback based. I think the return 0
just means the write operation was scheduled. The callback gets the number of bytes written.
https://github.com/aantron/luv/blob/master/src/stream.mli says:
The second argument passed to the callback is the number of bytes
written. libuv has an internal queue of writes, in part to implement retry.
This means that writes can be partial at the libuv (and Luv) API level, so
it is possible to receive both an [Error] result, and for some data to have
been successfully written.
However, for files it looks like write
behaves normally (either returns an error or a byte count, but not both).
So, yeah, maybe we shouldn't provide write_single
. But then we don't need a write method at all - Flow.write flow buf
can just be Flow.copy (cstruct_source [buf]) flow
.
Possibly we just need to add exception Write_error of int * exn
to report how many bytes got written, in case anyone cares.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
libuv is callback based. I think the
return 0
just means the write operation was scheduled. The callback gets the number of bytes written.
Even though it's callback based, the code tries to do a write before setting up an event, so we can reason about that only:
uv_write()->uv_write2()->empty_queue->uv__write()->uv__try_write()->uv__writev()->writev(2)
https://github.com/aantron/luv/blob/master/src/stream.mli says:
The second argument passed to the callback is the number of bytes
written. libuv has an internal queue of writes, in part to implement retry.
This means that writes can be partial at the libuv (and Luv) API level, so
it is possible to receive both an [Error] result, and for some data to have
been successfully written.
This is a bit bad worded, what he means is that you can get an error and you also get back how many bytes you successfully wrote before from previous writes, but the callback is only called once. There is no way to distinguish on success if the requested amount occurred in one or N calls.
With this I stand by my previous assessment (humbly), luv does not give us an API to do something in one write
or give us control back in-between writes. In a sense they really treat a stream as a stream.
In fact, in order to retrieve the amount of partial bytes on write, he must be reaching into what libuv calls "private", cause the callback for write is not like read:
typedef void (*uv_write_cb)(uv_write_t *req, int status)
Callback called after data was written on a stream. status will be 0 in case of success, < 0 otherwise.
typedef void (*uv_read_cb)(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
Callback called when data was read on a stream.
http://docs.libuv.org/en/v1.x/stream.html#c.uv_write_cb
I can dig up luv later but he is probably getting from:
struct uv_write_s { /* haesbaert: this is write_cb_t without the useless typedef */
UV_REQ_FIELDS
uv_write_cb cb;
uv_stream_t* send_handle; /* TODO: make private and unix-only in v2.x. */
uv_stream_t* handle;
UV_WRITE_PRIVATE_FIELDS
};
#define UV_WRITE_PRIVATE_FIELDS
void* queue[2];
unsigned int write_index; /* haesbaert: this guy is likely the second argument to stream */
uv_buf_t* bufs;
unsigned int nbufs;
int error;
uv_buf_t bufsml[4];
However, for files it looks like
write
behaves normally (either returns an error or a byte count, but not both).So, yeah, maybe we shouldn't provide
write_single
. But then we don't need a write method at all -Flow.write flow buf
can just beFlow.copy (cstruct_source [buf]) flow
.Possibly we just need to add
exception Write_error of int * exn
to report how many bytes got written, in case anyone cares.
I'd be against that, I like the idea of write_all
+ write_single
, one means "I don't care how" and the other means "I need it to be in one".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leme rephrase that, I'm fine with not having single + all, but having the Write_error is not necessary imho, if it's a real stream, no one cares how much was written. If it's a datagram-like FD that never happens and we get EMSGSIZE or ENOBUFS or ENOMEM.
If we don't care about the extra allocations, I agree Flow.write
can be Flow.copy (cstruct_source)...
I still think we need something like it, because copy
doesn't really guarantee that the source is "atomic".
fallback_copy
can end up splitting one chunk into multiple smaller chunks of 4k
Flow.write(16k)
being broken into 4x Flow.write(4k)
is wrong if the sink is a TAP interface for example.
So Flow.write
guarantees we try all in one syscall, but also writes the remaining bytes if we get a short-count (like copy
does).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, no need for Write_error
(or maybe Partial_write
?) in this PR. I've never needed it, so we'll wait and see if anyone else does.
Note: fallback_copy
won't be used if the source is a cstruct_source
(it'll use copy_with_rsb
instead, which behaves the way you want already). Though the luv backend doesn't do this optimisation yet.
I think it's fine to add a write
method as an optimisation for this (would be interesting to know what the speed difference is).
lib_eio/flow.ml
Outdated
@@ -78,6 +81,9 @@ let buffer_sink b = | |||
Buffer.add_string b (Cstruct.to_string ~len:got buf) | |||
done | |||
with End_of_file -> () | |||
|
|||
method write_from buf = | |||
Buffer.add_bytes b (Cstruct.to_bytes buf) (* XXX slow *) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there's a faster way to do it given the Buffer
API. We can just note in the mli that this might not be very efficient (in fact, it already points you in the direction of Buf_read
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that Buffer
doesn't expose useful functions for that, it might be a good idea to also expose a better implementation of it ({mutable buf: bytes; mutable len: int }
basically), if only to get a string
out of a Flow.source
easily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@avsm is thinking about changing Cstruct to be backed by
bytes
rather than bigarray. If that happens,Eio.Buf_read
will work for this case (see #140).
Aye, I checked that, I've been thinking of implementing something like Mbuf.t
which could be backed by a chain of Bytes or Cstruct, a bit inspired by how mbuf(9) works. With that we can guarantee that the backing Bytes is always > wosize so it goes directly in the major heap, and also, if needed, swap to Cstruct transparently in the future if things change in the GC.
It would be something like "allocate a fixed bytes size, say 256, start writing at say offset 63 so future prepends are free, appending a few words would just write to the existing chunk, appending a longer chunk would link it to the chain". Maybe have a big (>1500) and little ( < 512 ).
We could then make full usage of writing a backing method in Eio to turn that into the iovecs.
I don't think Cstruct or Bytes directly is a good way for people to build packets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: we already have Buf_write
, which is similar (though it doesn't allocate room for prepending). It does allow appending cstructs without copying, which results in using writev
automatically.
lib_eio/flow.mli
Outdated
end | ||
|
||
val write : #sink -> Cstruct.t -> unit | ||
(** [write dst buf] writes one or more bytes from [buf] *) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be good to note here that this is a low-level API and that using copy
may allow optimisations in some cases.
I'm not sure I properly understood what you meant, so I'm keeping write as returning unit, changed from write_from -> write, and addressed the comments you mentioned. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. How about taking a list (writev
), since both backends already support that? I guess it's needed for datagram sockets, where writing a vector of two buffers isn't the same as doing two separate writes.
lib_eio/flow.ml
Outdated
@@ -73,8 +73,11 @@ let string_source s : source = | |||
class virtual sink = object (_ : <Generic.t; ..>) | |||
method probe _ = None | |||
method virtual copy : 'a. (#source as 'a) -> unit | |||
method virtual write : Cstruct.t -> unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can provide a default implementation here in terms of copy
, then nothing else needs to change except as an optimisation.
lib_eio/mock/eio_mock.mli
Outdated
@@ -106,6 +107,9 @@ module Flow : sig | |||
val on_read : t -> string Handler.actions -> unit | |||
(** [on_read t actions] configures the values to return from the mock's [read] function. *) | |||
|
|||
val on_write : t -> unit Handler.actions -> unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this. We can just use the default behaviour of calling copy
.
This provides an optimised alternative to copy in the case where you are writing from a buffer. The default implementation simply call copy. Co-authored-by: Thomas Leonard <talex5@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I squashed and rebased the commits, and also marked write
as non-virtual in the interface.
If you're happy with that, it's ready for merging.
yep 😄 |
CHANGES: Changes: - Update to OCaml 5.0.0~beta1 (@anmonteiro @talex5 ocaml-multicore/eio#346). - Add API for seekable read/writes (@nojb ocaml-multicore/eio#307). - Add `Flow.write` (@haesbaert ocaml-multicore/eio#318). This provides an optimised alternative to `copy` in the case where you are writing from a buffer. - Add `Net.with_tcp_connect` (@bikallem ocaml-multicore/eio#302). Convenience function for opening a TCP connection. - Add `Eio.Time.Timeout` (@talex5 ocaml-multicore/eio#320). Makes it easier to pass timeouts around. - Add `Eio_mock.Clock` (@talex5 ocaml-multicore/eio#328). Control time in tests. - Add `Buf_read.take_while1` and `skip_while1` (@bikallem ocaml-multicore/eio#309). These fail if no characters match. - Make the type parameter for `Promise.t` covariant (@anmonteiro ocaml-multicore/eio#300). - Move list functions into a dedicated submodule (@raphael-proust ocaml-multicore/eio#315). - Direct implementation of `Flow.source_string` (@c-cube ocaml-multicore/eio#317). Slightly faster. Bug fixes: - `Condition.broadcast` must interlock as well (@haesbaert ocaml-multicore/eio#324). - Split the reads into no more than 2^32-1 for luv (@haesbaert @talex5 @EduardoRFS ocaml-multicore/eio#343). Luv uses a 32 bit int for buffer sizes and wraps if the value passed is too big. - eio_luv: allow `Net.connect` to be cancelled (@talex5 @nojb ocaml-multicore/eio#311). - eio_main: Use dedicated log source (@anmonteiro ocaml-multicore/eio#326). - linux_eio: fix kernel version number in log message (@talex5 @nojb ocaml-multicore/eio#314). - Account for stack differences in the socketpair test (issue ocaml-multicore/eio#312) (@haesbaert ocaml-multicore/eio#313). Documentation: - Add Best Practices section to README (@talex5 ocaml-multicore/eio#299). - Documentation improvements (@talex5 ocaml-multicore/eio#295 ocaml-multicore/eio#337).
These are the corollary from Flow.read and read_into.
--
Not super sure about
write_from
naming, but it's the closest to the opposite ofread_into
I can think of.