Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POSIX poller: add support for select, and for choosing the poller #1996

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions src/platform/posix/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ if (NNG_PLATFORM_POSIX)
nng_check_sym(port_create port.h NNG_HAVE_PORT_CREATE)
nng_check_sym(epoll_create sys/epoll.h NNG_HAVE_EPOLL)
nng_check_sym(epoll_create1 sys/epoll.h NNG_HAVE_EPOLL_CREATE1)
nng_check_sym(poll poll.h NNG_HAVE_POLL)
nng_check_sym(select sys/select.h NNG_HAVE_SELECT)
nng_check_sym(getpeereid unistd.h NNG_HAVE_GETPEEREID)
nng_check_sym(SO_PEERCRED sys/socket.h NNG_HAVE_SOPEERCRED)
nng_check_struct_member(sockpeercred uid sys/socket.h NNG_HAVE_SOCKPEERCRED)
Expand Down Expand Up @@ -101,14 +103,48 @@ if (NNG_PLATFORM_POSIX)
posix_udp.c
)

if (NNG_HAVE_PORT_CREATE)
nng_sources(posix_pollq_port.c)
set(NNG_POLLQ_POLLER "auto" CACHE STRING "Poller used for multiplexing I/O")
set_property(CACHE NNG_POLLQ_POLLER PROPERTY STRINGS auto ports kqueue epoll poll select)
mark_as_advanced(NNG_POLLQ_POLLER)
if (NNG_POLLQ_POLLER STREQUAL "ports")
nng_defines(NNG_POLLQ_PORTS)
elseif (NNG_POLLQ_POLLER STREQUAL "kqueue")
nng_defines(NNG_POLLQ_KQUEUE)
elseif (NNG_POLLQ_POLLER STREQUAL "epoll")
nng_defines(NNG_POLLQ_EPOLL)
elseif (NNG_POLLQ_POLLER STREQUAL "poll")
nng_defines(NNG_POLLQ_POLL)
elseif (NNG_POLLQ_POLLER STREQUAL "select")
set(NNG_POLLQ_SELECT ON)
elseif (NNG_HAVE_PORT_CREATE)
set(NNG_POLLQ_PORTS ON)
elseif (NNG_HAVE_KQUEUE)
nng_sources(posix_pollq_kqueue.c)
set(NNG_POLLQ_KQUEUE ON)
elseif (NNG_HAVE_EPOLL AND NNG_HAVE_EVENTFD)
set(NNG_POLLQ_EPOLL ON)
elseif (NNG_HAVE_POLL)
set(NNG_POLLQ_POLL ON)
elseif (NNG_HAVE_SELECT)
set(NNG_POLLQ_SELECT TRUE)
endif()

if (NNG_POLLQ_PORTS)
message(STATUS "Using port events for multiplexing I/O.")
nng_sources(posix_pollq_port.c)
elseif (NNG_POLLQ_KQUEUE)
message(STATUS "Using kqueue for multiplexing I/O.")
nng_sources(posix_pollq_kqueue.c)
elseif (NNG_POLLQ_EPOLL)
message(DEBUG "Using epoll for multiplexing I/O.")
nng_sources(posix_pollq_epoll.c)
else ()
elseif (NNG_POLLQ_POLL)
message(STATUS "Using poll for multiplexing I/O.")
nng_sources(posix_pollq_poll.c)
elseif (NNG_POLLQ_SELECT)
message(STATUS "Using select for multiplexing I/O.")
nng_sources(posix_pollq_select.c)
else()
message(FATAL_ERROR "No suitable poller found for multiplexing I/O.")
endif ()

if (NNG_HAVE_ARC4RANDOM)
Expand Down
4 changes: 2 additions & 2 deletions src/platform/posix/posix_ipcconn.c
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ ipc_send(void *arg, nni_aio *aio)
// means we didn't finish the job, so arm the poller to
// complete us.
if (nni_list_first(&c->writeq) == aio) {
nni_posix_pfd_arm(c->pfd, POLLOUT);
nni_posix_pfd_arm(c->pfd, NNI_POLL_OUT);
}
}
nni_mtx_unlock(&c->mtx);
Expand Down Expand Up @@ -298,7 +298,7 @@ ipc_recv(void *arg, nni_aio *aio)
// means we didn't finish the job, so arm the poller to
// complete us.
if (nni_list_first(&c->readq) == aio) {
nni_posix_pfd_arm(c->pfd, POLLIN);
nni_posix_pfd_arm(c->pfd, NNI_POLL_IN);
}
}
nni_mtx_unlock(&c->mtx);
Expand Down
9 changes: 9 additions & 0 deletions src/platform/posix/posix_pollq.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,20 @@ extern int nni_posix_pfd_fd(nni_posix_pfd *);
extern void nni_posix_pfd_close(nni_posix_pfd *);
extern void nni_posix_pfd_set_cb(nni_posix_pfd *, nni_posix_pfd_cb, void *);

#ifdef POLLIN
#define NNI_POLL_IN ((unsigned) POLLIN)
#define NNI_POLL_OUT ((unsigned) POLLOUT)
#define NNI_POLL_HUP ((unsigned) POLLHUP)
#define NNI_POLL_ERR ((unsigned) POLLERR)
#define NNI_POLL_INVAL ((unsigned) POLLNVAL)
#else
// maybe using select
#define NNI_POLL_IN (0x0001)
#define NNI_POLL_OUT (0x0010)
#define NNI_POLL_HUP (0x0004)
#define NNI_POLL_ERR (0x0008)
#define NNI_POLL_INVAL (0x0020)
#endif // POLLIN

#endif // NNG_PLATFORM_POSIX

Expand Down
10 changes: 5 additions & 5 deletions src/platform/posix/posix_pollq_kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,10 @@
return (0);
}

if (events & POLLIN) {
if (events & NNI_POLL_IN) {
EV_SET(&ev[nev++], pf->fd, EVFILT_READ, flags, 0, 0, pf);
}
if (events & POLLOUT) {
if (events & NNI_POLL_OUT) {
EV_SET(&ev[nev++], pf->fd, EVFILT_WRITE, flags, 0, 0, pf);
}
while (kevent(pq->kq, ev, nev, NULL, 0, NULL) != 0) {
Expand Down Expand Up @@ -254,10 +254,10 @@

switch (ev->filter) {
case EVFILT_READ:
revents = POLLIN;
revents = NNI_POLL_IN;
break;
case EVFILT_WRITE:
revents = POLLOUT;
revents = NNI_POLL_OUT;
break;
}
if (ev->udata == NULL) {
Expand All @@ -267,7 +267,7 @@
}
pf = (void *) ev->udata;
if (ev->flags & EV_ERROR) {
revents |= POLLHUP;
revents |= NNI_POLL_HUP;

Check warning on line 270 in src/platform/posix/posix_pollq_kqueue.c

View check run for this annotation

Codecov / codecov/patch

src/platform/posix/posix_pollq_kqueue.c#L270

Added line #L270 was not covered by tests
}

nni_mtx_lock(&pf->mtx);
Expand Down
Loading
Loading