diff --git a/src/platform/windows/win_ipcconn.c b/src/platform/windows/win_ipcconn.c index 41b3c5080..b2136a89e 100644 --- a/src/platform/windows/win_ipcconn.c +++ b/src/platform/windows/win_ipcconn.c @@ -46,48 +46,46 @@ ipc_recv_start(ipc_conn *c) DWORD len; int rv; - if (c->closed) { - while ((aio = nni_list_first(&c->recv_aios)) != NULL) { - nni_list_remove(&c->recv_aios, aio); + while ((aio = nni_list_first(&c->recv_aios)) != NULL) { + if (c->closed) { + nni_aio_list_remove(aio); nni_aio_finish_error(aio, NNG_ECLOSED); + continue; } - nni_cv_wake(&c->cv); - } -again: - if ((aio = nni_list_first(&c->recv_aios)) == NULL) { - return; - } - nni_aio_get_iov(aio, &naiov, &aiov); + nni_aio_get_iov(aio, &naiov, &aiov); - idx = 0; - while ((idx < naiov) && (aiov[idx].iov_len == 0)) { - idx++; - } - NNI_ASSERT(idx < naiov); - // Now start a transfer. We assume that only one send can be - // outstanding on a pipe at a time. This is important to avoid - // scrambling the data anyway. Note that Windows named pipes do - // not appear to support scatter/gather, so we have to process - // each element in turn. - buf = aiov[idx].iov_buf; - len = (DWORD) aiov[idx].iov_len; - NNI_ASSERT(buf != NULL); - NNI_ASSERT(len != 0); - - // We limit ourselves to writing 16MB at a time. Named Pipes - // on Windows have limits of between 31 and 64MB. - if (len > 0x1000000) { - len = 0x1000000; - } + idx = 0; + while ((idx < naiov) && (aiov[idx].iov_len == 0)) { + idx++; + } + NNI_ASSERT(idx < naiov); + // Now start a transfer. We assume that only one send can be + // outstanding on a pipe at a time. This is important to avoid + // scrambling the data anyway. Note that Windows named pipes + // do not appear to support scatter/gather, so we have to + // process each element in turn. + buf = aiov[idx].iov_buf; + len = (DWORD) aiov[idx].iov_len; + NNI_ASSERT(buf != NULL); + NNI_ASSERT(len != 0); + + // We limit ourselves to writing 16MB at a time. Named Pipes + // on Windows have limits of between 31 and 64MB. + if (len > 0x1000000) { + len = 0x1000000; + } - if ((!ReadFile(c->f, buf, len, NULL, &c->recv_io.olpd)) && - ((rv = GetLastError()) != ERROR_IO_PENDING)) { - // Synchronous failure. - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, nni_win_error(rv)); - goto again; + if ((!ReadFile(c->f, buf, len, NULL, &c->recv_io.olpd)) && + ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Synchronous failure. + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, nni_win_error(rv)); + } else { + return; + } } + nni_cv_wake(&c->cv); } static void @@ -96,11 +94,8 @@ ipc_recv_cb(nni_win_io *io, int rv, size_t num) nni_aio *aio; ipc_conn *c = io->ptr; nni_mtx_lock(&c->mtx); - if ((aio = nni_list_first(&c->recv_aios)) == NULL) { - // Should indicate that it was closed. - nni_mtx_unlock(&c->mtx); - return; - } + aio = nni_list_first(&c->recv_aios); + NNI_ASSERT(aio != NULL); if (c->recv_rv != 0) { rv = c->recv_rv; c->recv_rv = 0; @@ -110,11 +105,7 @@ ipc_recv_cb(nni_win_io *io, int rv, size_t num) rv = NNG_ECONNSHUT; } nni_aio_list_remove(aio); - if (c->closed) { - nni_cv_wake(&c->cv); - } else { - ipc_recv_start(c); - } + ipc_recv_start(c); nni_mtx_unlock(&c->mtx); nni_aio_finish_sync(aio, rv, num); @@ -128,10 +119,16 @@ ipc_recv_cancel(nni_aio *aio, void *arg, int rv) if (aio == nni_list_first(&c->recv_aios)) { c->recv_rv = rv; CancelIoEx(c->f, &c->recv_io.olpd); - } else if (nni_aio_list_active(aio)) { - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, rv); - nni_cv_wake(&c->cv); + } else { + nni_aio *srch; + NNI_LIST_FOREACH (&c->recv_aios, srch) { + if (srch == aio) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + nni_cv_wake(&c->cv); + break; + } + } } nni_mtx_unlock(&c->mtx); } @@ -174,48 +171,41 @@ ipc_send_start(ipc_conn *c) DWORD len; int rv; - if (c->closed) { - while ((aio = nni_list_first(&c->send_aios)) != NULL) { - nni_list_remove(&c->send_aios, aio); - nni_aio_finish_error(aio, NNG_ECLOSED); - } - nni_cv_wake(&c->cv); - } -again: - if ((aio = nni_list_first(&c->send_aios)) == NULL) { - return; - } + while ((aio = nni_list_first(&c->send_aios)) != NULL) { - nni_aio_get_iov(aio, &naiov, &aiov); + nni_aio_get_iov(aio, &naiov, &aiov); - idx = 0; - while ((idx < naiov) && (aiov[idx].iov_len == 0)) { - idx++; - } - NNI_ASSERT(idx < naiov); - // Now start a transfer. We assume that only one send can be - // outstanding on a pipe at a time. This is important to avoid - // scrambling the data anyway. Note that Windows named pipes do - // not appear to support scatter/gather, so we have to process - // each element in turn. - buf = aiov[idx].iov_buf; - len = (DWORD) aiov[idx].iov_len; - NNI_ASSERT(buf != NULL); - NNI_ASSERT(len != 0); - - // We limit ourselves to writing 16MB at a time. Named Pipes - // on Windows have limits of between 31 and 64MB. - if (len > 0x1000000) { - len = 0x1000000; - } + idx = 0; + while ((idx < naiov) && (aiov[idx].iov_len == 0)) { + idx++; + } + NNI_ASSERT(idx < naiov); + // Now start a transfer. We assume that only one send can be + // outstanding on a pipe at a time. This is important to avoid + // scrambling the data anyway. Note that Windows named pipes + // do not appear to support scatter/gather, so we have to + // process each element in turn. + buf = aiov[idx].iov_buf; + len = (DWORD) aiov[idx].iov_len; + NNI_ASSERT(buf != NULL); + NNI_ASSERT(len != 0); + + // We limit ourselves to writing 16MB at a time. Named Pipes + // on Windows have limits of between 31 and 64MB. + if (len > 0x1000000) { + len = 0x1000000; + } - if ((!WriteFile(c->f, buf, len, NULL, &c->send_io.olpd)) && - ((rv = GetLastError()) != ERROR_IO_PENDING)) { - // Synchronous failure. - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, nni_win_error(rv)); - goto again; + if ((!WriteFile(c->f, buf, len, NULL, &c->send_io.olpd)) && + ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Synchronous failure. + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, nni_win_error(rv)); + } else { + return; + } } + nni_cv_wake(&c->cv); } static void @@ -224,20 +214,14 @@ ipc_send_cb(nni_win_io *io, int rv, size_t num) nni_aio *aio; ipc_conn *c = io->ptr; nni_mtx_lock(&c->mtx); - if ((aio = nni_list_first(&c->send_aios)) == NULL) { - // Should indicate that it was closed. - nni_mtx_unlock(&c->mtx); - return; - } + aio = nni_list_first(&c->send_aios); + NNI_ASSERT(aio != NULL); + nni_aio_list_remove(aio); if (c->send_rv != 0) { rv = c->send_rv; c->send_rv = 0; } - nni_aio_list_remove(aio); ipc_send_start(c); - if (c->closed) { - nni_cv_wake(&c->cv); - } nni_mtx_unlock(&c->mtx); nni_aio_finish_sync(aio, rv, num); @@ -251,10 +235,16 @@ ipc_send_cancel(nni_aio *aio, void *arg, int rv) if (aio == nni_list_first(&c->send_aios)) { c->send_rv = rv; CancelIoEx(c->f, &c->send_io.olpd); - } else if (nni_aio_list_active(aio)) { - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, rv); - nni_cv_wake(&c->cv); + } else { + nni_aio *srch; + NNI_LIST_FOREACH (&c->recv_aios, srch) { + if (srch == aio) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + nni_cv_wake(&c->cv); + break; + } + } } nni_mtx_unlock(&c->mtx); } @@ -269,11 +259,6 @@ ipc_send(void *arg, nni_aio *aio) return; } nni_mtx_lock(&c->mtx); - if (c->closed) { - nni_mtx_unlock(&c->mtx); - nni_aio_finish_error(aio, NNG_ECLOSED); - return; - } if ((rv = nni_aio_schedule(aio, ipc_send_cancel, c)) != 0) { nni_mtx_unlock(&c->mtx); nni_aio_finish_error(aio, rv); @@ -292,12 +277,13 @@ ipc_close(void *arg) ipc_conn *c = arg; nni_mtx_lock(&c->mtx); if (!c->closed) { + HANDLE f = c->f; c->closed = true; - CancelIoEx(c->f, NULL); // cancel all requests + c->f = INVALID_HANDLE_VALUE; - if (c->f != INVALID_HANDLE_VALUE) { - // NB: closing the pipe is dangerous at this point. - DisconnectNamedPipe(c->f); + if (f != INVALID_HANDLE_VALUE) { + DisconnectNamedPipe(f); + CloseHandle(f); } } nni_mtx_unlock(&c->mtx);