diff --git a/include/uv.h b/include/uv.h index c3c68cbb5f..2777e16ef9 100644 --- a/include/uv.h +++ b/include/uv.h @@ -654,6 +654,31 @@ UV_EXTERN int uv_is_readable(const uv_stream_t* handle); UV_EXTERN int uv_is_writable(const uv_stream_t* handle); +/* + * Enable or disable blocking mode for a stream. + * + * When blocking mode is enabled all writes complete synchronously. The + * interface remains unchanged otherwise, e.g. completion or failure of the + * operation will still be reported through a callback which is made + * asychronously. + * + * Relying too much on this API is not recommended. It is likely to change + * significantly in the future. + * + * On windows this currently works only for uv_pipe_t instances. On unix it + * works for tcp, pipe and tty instances. Be aware that changing the blocking + * mode on unix sets or clears the O_NONBLOCK bit. If you are sharing a handle + * with another process, the other process is affected by the change too, + * which can lead to unexpected results. + * + * Also libuv currently makes no ordering guarantee when the blocking mode + * is changed after write requests have already been submitted. Therefore it is + * recommended to set the blocking mode immediately after opening or creating + * the stream. + */ +UV_EXTERN int uv_stream_set_blocking(uv_stream_t* handle, int blocking); + + /* * Used to determine whether a stream is closing or closed. * diff --git a/src/unix/stream.c b/src/unix/stream.c index ad70eadacb..54bbc15b29 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -1409,3 +1409,8 @@ void uv__stream_close(uv_stream_t* handle) { assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT)); } + + +int uv_stream_set_blocking(uv_stream_t* handle, int blocking) { + assert(0 && "implement me"); +} diff --git a/src/win/internal.h b/src/win/internal.h index 9920f704ab..56c43e7859 100644 --- a/src/win/internal.h +++ b/src/win/internal.h @@ -58,6 +58,7 @@ #define UV_HANDLE_SYNC_BYPASS_IOCP 0x00040000 #define UV_HANDLE_ZERO_READ 0x00080000 #define UV_HANDLE_EMULATE_IOCP 0x00100000 +#define UV_HANDLE_BLOCKING_WRITES 0x00200000 /* Only used by uv_tcp_t handles. */ #define UV_HANDLE_IPV6 0x01000000 diff --git a/src/win/pipe.c b/src/win/pipe.c index 0fb70eae32..8946ccc29a 100644 --- a/src/win/pipe.c +++ b/src/win/pipe.c @@ -1112,6 +1112,7 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req, if (handle->ipc_header_write_req.type != UV_WRITE) { ipc_header_req = (uv_write_t*)&handle->ipc_header_write_req; } else { + /* TODO: track this malloc and free after request completes */ ipc_header_req = (uv_write_t*)malloc(sizeof(uv_write_t)); if (!ipc_header_req) { uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); @@ -1128,6 +1129,13 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req, /* Write the header or the whole frame. */ memset(&ipc_header_req->overlapped, 0, sizeof(ipc_header_req->overlapped)); + /* Using overlapped IO, but wait for completion before returning. + This write is blocking because ipc_frame is on stack. */ + ipc_header_req->overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL); + if (!ipc_header_req->overlapped.hEvent) { + uv_fatal_error(GetLastError(), "CreateEvent"); + } + result = WriteFile(handle->handle, &ipc_frame, ipc_frame.header.flags & UV_IPC_TCP_SERVER ? @@ -1136,18 +1144,22 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req, &ipc_header_req->overlapped); if (!result && GetLastError() != ERROR_IO_PENDING) { uv__set_sys_error(loop, GetLastError()); + CloseHandle(ipc_header_req->overlapped.hEvent); return -1; } - if (result) { - /* Request completed immediately. */ - ipc_header_req->queued_bytes = 0; - } else { - /* Request queued by the kernel. */ - ipc_header_req->queued_bytes = ipc_frame.header.flags & UV_IPC_TCP_SERVER ? - sizeof(ipc_frame) : sizeof(ipc_frame.header); - handle->write_queue_size += ipc_header_req->queued_bytes; + if (!result) { + /* Request not completed immediately. Wait for it.*/ + if (WaitForSingleObject(ipc_header_req->overlapped.hEvent, INFINITE) != + WAIT_OBJECT_0) { + uv__set_sys_error(loop, GetLastError()); + CloseHandle(ipc_header_req->overlapped.hEvent); + return -1; + } } + ipc_header_req->queued_bytes = 0; + CloseHandle(ipc_header_req->overlapped.hEvent); + ipc_header_req->overlapped.hEvent = NULL; REGISTER_HANDLE_REQ(loop, handle, ipc_header_req); handle->reqs_pending++; @@ -1159,7 +1171,29 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req, } } - if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { + if ((handle->flags & + (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) == + (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) { + DWORD bytes; + result = WriteFile(handle->handle, + bufs[0].base, + bufs[0].len, + &bytes, + NULL); + + if (!result) { + return uv__set_sys_error(loop, GetLastError()); + } else { + /* Request completed immediately. */ + req->queued_bytes = 0; + } + + REGISTER_HANDLE_REQ(loop, handle, req); + handle->reqs_pending++; + handle->write_reqs_pending++; + POST_COMPLETION_FOR_REQ(loop, req); + return 0; + } else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { req->write_buffer = bufs[0]; uv_insert_non_overlapped_write_req(handle, req); if (handle->write_reqs_pending == 0) { @@ -1169,6 +1203,44 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req, /* Request queued by the kernel. */ req->queued_bytes = uv_count_bufs(bufs, bufcnt); handle->write_queue_size += req->queued_bytes; + } else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) { + /* Using overlapped IO, but wait for completion before returning */ + req->overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL); + if (!req->overlapped.hEvent) { + uv_fatal_error(GetLastError(), "CreateEvent"); + } + + result = WriteFile(handle->handle, + bufs[0].base, + bufs[0].len, + NULL, + &req->overlapped); + + if (!result && GetLastError() != ERROR_IO_PENDING) { + uv__set_sys_error(loop, GetLastError()); + CloseHandle(req->overlapped.hEvent); + return -1; + } + + if (result) { + /* Request completed immediately. */ + req->queued_bytes = 0; + } else { + /* Request queued by the kernel. */ + if (WaitForSingleObject(ipc_header_req->overlapped.hEvent, INFINITE) != + WAIT_OBJECT_0) { + uv__set_sys_error(loop, GetLastError()); + CloseHandle(ipc_header_req->overlapped.hEvent); + return -1; + } + } + CloseHandle(req->overlapped.hEvent); + + REGISTER_HANDLE_REQ(loop, handle, req); + handle->reqs_pending++; + handle->write_reqs_pending++; + POST_COMPLETION_FOR_REQ(loop, req); + return 0; } else { result = WriteFile(handle->handle, bufs[0].base, diff --git a/src/win/stream.c b/src/win/stream.c index edc5407cf5..26485851e9 100644 --- a/src/win/stream.c +++ b/src/win/stream.c @@ -196,3 +196,13 @@ int uv_is_readable(const uv_stream_t* handle) { int uv_is_writable(const uv_stream_t* handle) { return !!(handle->flags & UV_HANDLE_WRITABLE); } + + +int uv_stream_set_blocking(uv_stream_t* handle, int blocking) { + if (blocking != 0) + handle->flags |= UV_HANDLE_BLOCKING_WRITES; + else + handle->flags &= ~UV_HANDLE_BLOCKING_WRITES; + + return 0; +}