diff --git a/src/platform/posix/CMakeLists.txt b/src/platform/posix/CMakeLists.txt index 6f3dfcafa..aeedddb9c 100644 --- a/src/platform/posix/CMakeLists.txt +++ b/src/platform/posix/CMakeLists.txt @@ -60,6 +60,8 @@ if (NNG_PLATFORM_POSIX) nng_check_sym(port_create port.h NNG_HAVE_PORT_CREATE) nng_check_sym(epoll_create sys/epoll.h NNG_HAVE_EPOLL) nng_check_sym(epoll_create1 sys/epoll.h NNG_HAVE_EPOLL_CREATE1) + nng_check_sym(poll poll.h NNG_HAVE_POLL) + nng_check_sym(select sys/select.h NNG_HAVE_SELECT) nng_check_sym(getpeereid unistd.h NNG_HAVE_GETPEEREID) nng_check_sym(SO_PEERCRED sys/socket.h NNG_HAVE_SOPEERCRED) nng_check_struct_member(sockpeercred uid sys/socket.h NNG_HAVE_SOCKPEERCRED) @@ -101,14 +103,48 @@ if (NNG_PLATFORM_POSIX) posix_udp.c ) - if (NNG_HAVE_PORT_CREATE) - nng_sources(posix_pollq_port.c) + set(NNG_POLLQ_POLLER "auto" CACHE STRING "Poller used for multiplexing I/O") + set_property(CACHE NNG_POLLQ_POLLER PROPERTY STRINGS auto ports kqueue epoll poll select) + mark_as_advanced(NNG_POLLQ_POLLER) + if (NNG_POLLQ_POLLER STREQUAL "ports") + nng_defines(NNG_POLLQ_PORTS) + elseif (NNG_POLLQ_POLLER STREQUAL "kqueue") + nng_defines(NNG_POLLQ_KQUEUE) + elseif (NNG_POLLQ_POLLER STREQUAL "epoll") + nng_defines(NNG_POLLQ_EPOLL) + elseif (NNG_POLLQ_POLLER STREQUAL "poll") + nng_defines(NNG_POLLQ_POLL) + elseif (NNG_POLLQ_POLLER STREQUAL "select") + set(NNG_POLLQ_SELECT ON) + elseif (NNG_HAVE_PORT_CREATE) + set(NNG_POLLQ_PORTS ON) elseif (NNG_HAVE_KQUEUE) - nng_sources(posix_pollq_kqueue.c) + set(NNG_POLLQ_KQUEUE ON) elseif (NNG_HAVE_EPOLL AND NNG_HAVE_EVENTFD) + set(NNG_POLLQ_EPOLL ON) + elseif (NNG_HAVE_POLL) + set(NNG_POLLQ_POLL ON) + elseif (NNG_HAVE_SELECT) + set(NNG_POLLQ_SELECT TRUE) + endif() + + if (NNG_POLLQ_PORTS) + message(STATUS "Using port events for multiplexing I/O.") + nng_sources(posix_pollq_port.c) + elseif (NNG_POLLQ_KQUEUE) + message(STATUS "Using kqueue for multiplexing I/O.") + nng_sources(posix_pollq_kqueue.c) + elseif (NNG_POLLQ_EPOLL) + message(DEBUG "Using epoll for multiplexing I/O.") nng_sources(posix_pollq_epoll.c) - else () + elseif (NNG_POLLQ_POLL) + message(STATUS "Using poll for multiplexing I/O.") nng_sources(posix_pollq_poll.c) + elseif (NNG_POLLQ_SELECT) + message(STATUS "Using select for multiplexing I/O.") + nng_sources(posix_pollq_select.c) + else() + message(FATAL_ERROR "No suitable poller found for multiplexing I/O.") endif () if (NNG_HAVE_ARC4RANDOM) diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c index 8b11b64fa..a198b87f4 100644 --- a/src/platform/posix/posix_ipcconn.c +++ b/src/platform/posix/posix_ipcconn.c @@ -264,7 +264,7 @@ ipc_send(void *arg, nni_aio *aio) // means we didn't finish the job, so arm the poller to // complete us. if (nni_list_first(&c->writeq) == aio) { - nni_posix_pfd_arm(c->pfd, POLLOUT); + nni_posix_pfd_arm(c->pfd, NNI_POLL_OUT); } } nni_mtx_unlock(&c->mtx); @@ -298,7 +298,7 @@ ipc_recv(void *arg, nni_aio *aio) // means we didn't finish the job, so arm the poller to // complete us. if (nni_list_first(&c->readq) == aio) { - nni_posix_pfd_arm(c->pfd, POLLIN); + nni_posix_pfd_arm(c->pfd, NNI_POLL_IN); } } nni_mtx_unlock(&c->mtx); diff --git a/src/platform/posix/posix_pollq.h b/src/platform/posix/posix_pollq.h index 76105acc4..8dc92fb1d 100644 --- a/src/platform/posix/posix_pollq.h +++ b/src/platform/posix/posix_pollq.h @@ -32,11 +32,20 @@ extern int nni_posix_pfd_fd(nni_posix_pfd *); extern void nni_posix_pfd_close(nni_posix_pfd *); extern void nni_posix_pfd_set_cb(nni_posix_pfd *, nni_posix_pfd_cb, void *); +#ifdef POLLIN #define NNI_POLL_IN ((unsigned) POLLIN) #define NNI_POLL_OUT ((unsigned) POLLOUT) #define NNI_POLL_HUP ((unsigned) POLLHUP) #define NNI_POLL_ERR ((unsigned) POLLERR) #define NNI_POLL_INVAL ((unsigned) POLLNVAL) +#else +// maybe using select +#define NNI_POLL_IN (0x0001) +#define NNI_POLL_OUT (0x0010) +#define NNI_POLL_HUP (0x0004) +#define NNI_POLL_ERR (0x0008) +#define NNI_POLL_INVAL (0x0020) +#endif // POLLIN #endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c index ae1b2f471..562c888e4 100644 --- a/src/platform/posix/posix_pollq_kqueue.c +++ b/src/platform/posix/posix_pollq_kqueue.c @@ -198,10 +198,10 @@ nni_posix_pfd_arm(nni_posix_pfd *pf, unsigned events) return (0); } - if (events & POLLIN) { + if (events & NNI_POLL_IN) { EV_SET(&ev[nev++], pf->fd, EVFILT_READ, flags, 0, 0, pf); } - if (events & POLLOUT) { + if (events & NNI_POLL_OUT) { EV_SET(&ev[nev++], pf->fd, EVFILT_WRITE, flags, 0, 0, pf); } while (kevent(pq->kq, ev, nev, NULL, 0, NULL) != 0) { @@ -254,10 +254,10 @@ nni_posix_poll_thr(void *arg) switch (ev->filter) { case EVFILT_READ: - revents = POLLIN; + revents = NNI_POLL_IN; break; case EVFILT_WRITE: - revents = POLLOUT; + revents = NNI_POLL_OUT; break; } if (ev->udata == NULL) { @@ -267,7 +267,7 @@ nni_posix_poll_thr(void *arg) } pf = (void *) ev->udata; if (ev->flags & EV_ERROR) { - revents |= POLLHUP; + revents |= NNI_POLL_HUP; } nni_mtx_lock(&pf->mtx); diff --git a/src/platform/posix/posix_pollq_select.c b/src/platform/posix/posix_pollq_select.c new file mode 100644 index 000000000..3213aa116 --- /dev/null +++ b/src/platform/posix/posix_pollq_select.c @@ -0,0 +1,331 @@ +// +// Copyright 2024 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/defs.h" +#include "core/nng_impl.h" +#include "platform/posix/posix_pollq.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +// POSIX AIO using select(). We use a single poll thread to perform +// I/O operations for the entire system. This is the worst form of +// I/O multiplexing, but short of using threads or spin-polling from +// a single thread, this is our only reasonable solution. +// +// Note that select() is not scalable, and we will be limited to a small +// number of open files/sockets. As such it is is not suitable for use +// on large servers. However, this may be enough for use in constrained +// systems that are not likely to have many open files anyway. +// + +typedef struct nni_posix_pollq nni_posix_pollq; + +struct nni_posix_pollq { + nni_mtx mtx; + int wakewfd; // write side of waker pipe + int wakerfd; // read side of waker pipe + bool closing; // request for worker to exit + bool closed; + nni_thr thr; // worker thread + int maxfd; + struct nni_posix_pfd *pfds[FD_SETSIZE]; +}; + +struct nni_posix_pfd { + nni_posix_pollq *pq; + int fd; + nni_cv cv; + nni_mtx mtx; + unsigned events; + nni_posix_pfd_cb cb; + void *arg; + bool reap; +}; + +static nni_posix_pollq nni_posix_global_pollq; + +int +nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd) +{ + nni_posix_pfd *pfd; + nni_posix_pollq *pq = &nni_posix_global_pollq; + + // Set this is as soon as possible (narrow the close-exec race as + // much as we can; better options are system calls that suppress + // this behavior from descriptor creation.) + (void) fcntl(fd, F_SETFD, FD_CLOEXEC); + (void) fcntl(fd, F_SETFL, O_NONBLOCK); + + if ((pfd = NNI_ALLOC_STRUCT(pfd)) == NULL) { + return (NNG_ENOMEM); + } + if (fd >= FD_SETSIZE) { + return (NNG_EINVAL); + } + nni_mtx_init(&pfd->mtx); + nni_cv_init(&pfd->cv, &pq->mtx); + pfd->fd = fd; + pfd->events = 0; + pfd->cb = NULL; + pfd->arg = NULL; + pfd->pq = pq; + nni_mtx_lock(&pq->mtx); + if (pq->closing) { + nni_mtx_unlock(&pq->mtx); + nni_cv_fini(&pfd->cv); + nni_mtx_fini(&pfd->mtx); + NNI_FREE_STRUCT(pfd); + return (NNG_ECLOSED); + } + pq->pfds[fd] = pfd; + if (fd > pq->maxfd) { + pq->maxfd = fd; + } + nni_mtx_unlock(&pq->mtx); + *pfdp = pfd; + return (0); +} + +void +nni_posix_pfd_set_cb(nni_posix_pfd *pfd, nni_posix_pfd_cb cb, void *arg) +{ + nni_mtx_lock(&pfd->mtx); + pfd->cb = cb; + pfd->arg = arg; + nni_mtx_unlock(&pfd->mtx); +} + +int +nni_posix_pfd_fd(nni_posix_pfd *pfd) +{ + return (pfd->fd); +} + +void +nni_posix_pfd_close(nni_posix_pfd *pfd) +{ + (void) shutdown(pfd->fd, SHUT_RDWR); +} + +void +nni_posix_pfd_fini(nni_posix_pfd *pfd) +{ + nni_posix_pollq *pq = pfd->pq; + int fd = pfd->fd; + + nni_posix_pfd_close(pfd); + + nni_mtx_lock(&pq->mtx); + if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) { + pfd->reap = true; + nni_plat_pipe_raise(pq->wakewfd); + while (pfd->reap) { + nni_cv_wait(&pfd->cv); + } + } else { + pq->pfds[fd] = NULL; + } + nni_mtx_unlock(&pq->mtx); + + // We're exclusive now. + (void) close(fd); + nni_cv_fini(&pfd->cv); + nni_mtx_fini(&pfd->mtx); + NNI_FREE_STRUCT(pfd); +} + +int +nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events) +{ + nni_posix_pollq *pq = pfd->pq; + + nni_mtx_lock(&pq->mtx); + pfd->events |= events; + nni_mtx_unlock(&pq->mtx); + + // If we're running on the callback, then don't bother to kick + // the pollq again. This is necessary because we cannot modify + // the poller while it is polling. + if (!nni_thr_is_self(&pq->thr)) { + nni_plat_pipe_raise(pq->wakewfd); + } + return (0); +} + +static void +nni_posix_poll_thr(void *arg) +{ + nni_posix_pollq *pq = arg; + fd_set rfds; + fd_set wfds; + fd_set efds; + int maxfd; + + for (;;) { + unsigned events; + nni_posix_pfd *pfd; + + FD_ZERO(&rfds); + FD_ZERO(&wfds); + FD_ZERO(&efds); + + // The waker pipe is set up so that we will be woken + // when it is written (this allows us to be signaled). + FD_SET(pq->wakerfd, &rfds); + FD_SET(pq->wakerfd, &efds); + + nni_plat_pipe_clear(pq->wakerfd); + nni_mtx_lock(&pq->mtx); + + // If we're closing down, bail now. This is done *after* we + // have ensured that the reapq is empty. Anything still in + // the pollq is not going to receive further callbacks. + if (pq->closing) { + for (int fd = 0; fd <= pq->maxfd; fd++) { + if ((pfd = pq->pfds[fd]) != NULL) { + pq->pfds[fd] = NULL; + pfd->reap = false; + nni_cv_wake(&pfd->cv); + } + } + pq->closed = true; + nni_mtx_unlock(&pq->mtx); + break; + } + + // Set up the poll list. + maxfd = pq->wakerfd; + for (int fd = 0; fd <= pq->maxfd; fd++) { + if ((pfd = pq->pfds[fd]) == NULL) { + continue; + } + NNI_ASSERT(pfd->fd == fd); + if (pfd->reap) { + pq->pfds[fd] = NULL; + pfd->reap = false; + nni_cv_wake(&pfd->cv); + continue; + } + events = pfd->events; + + if (events != 0) { + if (events & NNI_POLL_IN) { + FD_SET(fd, &rfds); + } + if (events & NNI_POLL_OUT) { + FD_SET(fd, &wfds); + } + FD_SET(fd, &efds); + if (maxfd < fd) { + maxfd = fd; + } + } + } + while (pq->maxfd > 0 && (pq->pfds[pq->maxfd] == NULL)) { + pq->maxfd--; + } + nni_mtx_unlock(&pq->mtx); + + // We could get the result from poll, and avoid iterating + // over the entire set of pollfds, but since on average we + // will be walking half the list, doubling the work we do + // (the condition with a potential pipeline stall) seems like + // adding complexity with no real benefit. It also makes the + // worst case even worse. + (void) select(maxfd + 1, &rfds, &wfds, &efds, NULL); + + nni_mtx_lock(&pq->mtx); + for (int fd = 0; fd <= maxfd; fd++) { + events = 0; + if (FD_ISSET(fd, &rfds)) { + events |= NNI_POLL_IN; + } + if (FD_ISSET(fd, &wfds)) { + events |= NNI_POLL_OUT; + } + if (FD_ISSET(fd, &efds)) { + events |= NNI_POLL_HUP; + } + if (events != 0) { + nni_posix_pfd_cb cb = NULL; + void *arg; + if ((pfd = pq->pfds[fd]) != NULL) { + cb = pfd->cb; + arg = pfd->arg; + pfd->events &= ~events; + } + + if (cb) { + nni_mtx_unlock(&pq->mtx); + cb(pfd, events, arg); + nni_mtx_lock(&pq->mtx); + } + } + } + nni_mtx_unlock(&pq->mtx); + } +} + +static void +nni_posix_pollq_destroy(nni_posix_pollq *pq) +{ + nni_mtx_lock(&pq->mtx); + pq->closing = true; + nni_mtx_unlock(&pq->mtx); + + nni_plat_pipe_raise(pq->wakewfd); + + close(pq->wakewfd); + nni_thr_fini(&pq->thr); + close(pq->wakerfd); + // nni_plat_pipe_close(pq->wakewfd, pq->wakerfd); + nni_mtx_fini(&pq->mtx); +} + +static int +nni_posix_pollq_create(nni_posix_pollq *pq) +{ + int rv; + + pq->closing = false; + pq->closed = false; + + if ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) { + return (rv); + } + if ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) { + nni_plat_pipe_close(pq->wakewfd, pq->wakerfd); + return (rv); + } + nni_thr_set_name(&pq->thr, "nng:poll:select"); + nni_mtx_init(&pq->mtx); + nni_thr_run(&pq->thr); + return (0); +} + +int +nni_posix_pollq_sysinit(nng_init_params *params) +{ + NNI_ARG_UNUSED(params); + return (nni_posix_pollq_create(&nni_posix_global_pollq)); +} + +void +nni_posix_pollq_sysfini(void) +{ + nni_posix_pollq_destroy(&nni_posix_global_pollq); +} diff --git a/src/platform/posix/posix_sockfd.c b/src/platform/posix/posix_sockfd.c index 997feae6b..8ccca66c3 100644 --- a/src/platform/posix/posix_sockfd.c +++ b/src/platform/posix/posix_sockfd.c @@ -302,7 +302,7 @@ sfd_send(void *arg, nni_aio *aio) // means we didn't finish the job, so arm the poller to // complete us. if (nni_list_first(&c->writeq) == aio) { - nni_posix_pfd_arm(c->pfd, POLLOUT); + nni_posix_pfd_arm(c->pfd, NNI_POLL_OUT); } } nni_mtx_unlock(&c->mtx); @@ -336,7 +336,7 @@ sfd_recv(void *arg, nni_aio *aio) // means we didn't finish the job, so arm the poller to // complete us. if (nni_list_first(&c->readq) == aio) { - nni_posix_pfd_arm(c->pfd, POLLIN); + nni_posix_pfd_arm(c->pfd, NNI_POLL_IN); } } nni_mtx_unlock(&c->mtx); diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c index ce5243b03..d49fc838c 100644 --- a/src/platform/posix/posix_tcpconn.c +++ b/src/platform/posix/posix_tcpconn.c @@ -310,7 +310,7 @@ tcp_send(void *arg, nni_aio *aio) // means we didn't finish the job, so arm the poller to // complete us. if (nni_list_first(&c->writeq) == aio) { - nni_posix_pfd_arm(c->pfd, POLLOUT); + nni_posix_pfd_arm(c->pfd, NNI_POLL_OUT); } } nni_mtx_unlock(&c->mtx); @@ -344,7 +344,7 @@ tcp_recv(void *arg, nni_aio *aio) // means we didn't finish the job, so arm the poller to // complete us. if (nni_list_first(&c->readq) == aio) { - nni_posix_pfd_arm(c->pfd, POLLIN); + nni_posix_pfd_arm(c->pfd, NNI_POLL_IN); } } nni_mtx_unlock(&c->mtx); diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c index a7eb71440..8d0d4a420 100644 --- a/src/platform/posix/posix_udp.c +++ b/src/platform/posix/posix_udp.c @@ -181,22 +181,21 @@ nni_posix_udp_cb(nni_posix_pfd *pfd, unsigned events, void *arg) NNI_ARG_UNUSED(pfd); nni_mtx_lock(&udp->udp_mtx); - if (events & (unsigned) POLLIN) { + if (events & NNI_POLL_IN) { nni_posix_udp_dorecv(udp); } - if (events & (unsigned) POLLOUT) { + if (events & NNI_POLL_OUT) { nni_posix_udp_dosend(udp); } - if (events & - ((unsigned) POLLHUP | (unsigned) POLLERR | (unsigned) POLLNVAL)) { + if (events & (NNI_POLL_HUP | NNI_POLL_ERR | NNI_POLL_INVAL)) { nni_posix_udp_doclose(udp); } else { events = 0; if (!nni_list_empty(&udp->udp_sendq)) { - events |= (unsigned) POLLOUT; + events |= NNI_POLL_OUT; } if (!nni_list_empty(&udp->udp_recvq)) { - events |= (unsigned) POLLIN; + events |= NNI_POLL_IN; } if (events) { int rv; @@ -299,7 +298,7 @@ nni_plat_udp_recv(nni_plat_udp *udp, nni_aio *aio) } nni_list_append(&udp->udp_recvq, aio); if (nni_list_first(&udp->udp_recvq) == aio) { - if ((rv = nni_posix_pfd_arm(udp->udp_pfd, POLLIN)) != 0) { + if ((rv = nni_posix_pfd_arm(udp->udp_pfd, NNI_POLL_IN)) != 0) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); } @@ -322,7 +321,8 @@ nni_plat_udp_send(nni_plat_udp *udp, nni_aio *aio) } nni_list_append(&udp->udp_sendq, aio); if (nni_list_first(&udp->udp_sendq) == aio) { - if ((rv = nni_posix_pfd_arm(udp->udp_pfd, POLLOUT)) != 0) { + if ((rv = nni_posix_pfd_arm(udp->udp_pfd, NNI_POLL_OUT)) != + 0) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); }