Skip to content
This repository has been archived by the owner on May 4, 2018. It is now read-only.

windows: Enable blocking pipe for stdio. #830

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions include/uv.h
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,31 @@ UV_EXTERN int uv_is_readable(const uv_stream_t* handle);
UV_EXTERN int uv_is_writable(const uv_stream_t* handle);


/*
* Enable or disable blocking mode for a stream.
*
* When blocking mode is enabled all writes complete synchronously. The
* interface remains unchanged otherwise, e.g. completion or failure of the
* operation will still be reported through a callback which is made
* asychronously.
*
* Relying too much on this API is not recommended. It is likely to change
* significantly in the future.
*
* On windows this currently works only for uv_pipe_t instances. On unix it
* works for tcp, pipe and tty instances. Be aware that changing the blocking
* mode on unix sets or clears the O_NONBLOCK bit. If you are sharing a handle
* with another process, the other process is affected by the change too,
* which can lead to unexpected results.
*
* Also libuv currently makes no ordering guarantee when the blocking mode
* is changed after write requests have already been submitted. Therefore it is
* recommended to set the blocking mode immediately after opening or creating
* the stream.
*/
UV_EXTERN int uv_stream_set_blocking(uv_stream_t* handle, int blocking);


/*
* Used to determine whether a stream is closing or closed.
*
Expand Down
5 changes: 5 additions & 0 deletions src/unix/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -1409,3 +1409,8 @@ void uv__stream_close(uv_stream_t* handle) {

assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT));
}


int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
assert(0 && "implement me");
}
1 change: 1 addition & 0 deletions src/win/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
#define UV_HANDLE_SYNC_BYPASS_IOCP 0x00040000
#define UV_HANDLE_ZERO_READ 0x00080000
#define UV_HANDLE_EMULATE_IOCP 0x00100000
#define UV_HANDLE_BLOCKING_WRITES 0x00200000

/* Only used by uv_tcp_t handles. */
#define UV_HANDLE_IPV6 0x01000000
Expand Down
90 changes: 81 additions & 9 deletions src/win/pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,7 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
if (handle->ipc_header_write_req.type != UV_WRITE) {
ipc_header_req = (uv_write_t*)&handle->ipc_header_write_req;
} else {
/* TODO: track this malloc and free after request completes */
ipc_header_req = (uv_write_t*)malloc(sizeof(uv_write_t));
if (!ipc_header_req) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
Expand All @@ -1128,6 +1129,13 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
/* Write the header or the whole frame. */
memset(&ipc_header_req->overlapped, 0, sizeof(ipc_header_req->overlapped));

/* Using overlapped IO, but wait for completion before returning.
This write is blocking because ipc_frame is on stack. */
ipc_header_req->overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL);
if (!ipc_header_req->overlapped.hEvent) {
uv_fatal_error(GetLastError(), "CreateEvent");
}

result = WriteFile(handle->handle,
&ipc_frame,
ipc_frame.header.flags & UV_IPC_TCP_SERVER ?
Expand All @@ -1136,18 +1144,22 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
&ipc_header_req->overlapped);
if (!result && GetLastError() != ERROR_IO_PENDING) {
uv__set_sys_error(loop, GetLastError());
CloseHandle(ipc_header_req->overlapped.hEvent);
return -1;
}

if (result) {
/* Request completed immediately. */
ipc_header_req->queued_bytes = 0;
} else {
/* Request queued by the kernel. */
ipc_header_req->queued_bytes = ipc_frame.header.flags & UV_IPC_TCP_SERVER ?
sizeof(ipc_frame) : sizeof(ipc_frame.header);
handle->write_queue_size += ipc_header_req->queued_bytes;
if (!result) {
/* Request not completed immediately. Wait for it.*/
if (WaitForSingleObject(ipc_header_req->overlapped.hEvent, INFINITE) !=
WAIT_OBJECT_0) {
uv__set_sys_error(loop, GetLastError());
CloseHandle(ipc_header_req->overlapped.hEvent);
return -1;
}
}
ipc_header_req->queued_bytes = 0;
CloseHandle(ipc_header_req->overlapped.hEvent);
ipc_header_req->overlapped.hEvent = NULL;

REGISTER_HANDLE_REQ(loop, handle, ipc_header_req);
handle->reqs_pending++;
Expand All @@ -1159,7 +1171,29 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
}
}

if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
if ((handle->flags &
(UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
(UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) {
DWORD bytes;
result = WriteFile(handle->handle,
bufs[0].base,
bufs[0].len,
&bytes,
NULL);

if (!result) {
return uv__set_sys_error(loop, GetLastError());
} else {
/* Request completed immediately. */
req->queued_bytes = 0;
}

REGISTER_HANDLE_REQ(loop, handle, req);
handle->reqs_pending++;
handle->write_reqs_pending++;
POST_COMPLETION_FOR_REQ(loop, req);
return 0;
} else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
req->write_buffer = bufs[0];
uv_insert_non_overlapped_write_req(handle, req);
if (handle->write_reqs_pending == 0) {
Expand All @@ -1169,6 +1203,44 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
/* Request queued by the kernel. */
req->queued_bytes = uv_count_bufs(bufs, bufcnt);
handle->write_queue_size += req->queued_bytes;
} else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
/* Using overlapped IO, but wait for completion before returning */
req->overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL);
if (!req->overlapped.hEvent) {
uv_fatal_error(GetLastError(), "CreateEvent");
}

result = WriteFile(handle->handle,
bufs[0].base,
bufs[0].len,
NULL,
&req->overlapped);

if (!result && GetLastError() != ERROR_IO_PENDING) {
uv__set_sys_error(loop, GetLastError());
CloseHandle(req->overlapped.hEvent);
return -1;
}

if (result) {
/* Request completed immediately. */
req->queued_bytes = 0;
} else {
/* Request queued by the kernel. */
if (WaitForSingleObject(ipc_header_req->overlapped.hEvent, INFINITE) !=
WAIT_OBJECT_0) {
uv__set_sys_error(loop, GetLastError());
CloseHandle(ipc_header_req->overlapped.hEvent);
return -1;
}
}
CloseHandle(req->overlapped.hEvent);

REGISTER_HANDLE_REQ(loop, handle, req);
handle->reqs_pending++;
handle->write_reqs_pending++;
POST_COMPLETION_FOR_REQ(loop, req);
return 0;
} else {
result = WriteFile(handle->handle,
bufs[0].base,
Expand Down
10 changes: 10 additions & 0 deletions src/win/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,13 @@ int uv_is_readable(const uv_stream_t* handle) {
int uv_is_writable(const uv_stream_t* handle) {
return !!(handle->flags & UV_HANDLE_WRITABLE);
}


int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
if (blocking != 0)
handle->flags |= UV_HANDLE_BLOCKING_WRITES;
else
handle->flags &= ~UV_HANDLE_BLOCKING_WRITES;

return 0;
}