Skip to content

Commit

Permalink
poll: performance improvements, simplifications
Browse files Browse the repository at this point in the history
We preallocate the arrays used for pollfds, based on what the
system can tolerate (tunable with NNG_MAX_OPEN), and we change
the code for inserting and removing pollfds from the list so
that it can run without acquiring the locks during the main loop,
only when adding or removing files.

The poll() implementation is very nearly lock free in the hot
code path, and soon will be.
  • Loading branch information
gdamore committed Dec 19, 2024
1 parent 5e18eb4 commit 60f6355
Showing 1 changed file with 84 additions and 85 deletions.
169 changes: 84 additions & 85 deletions src/platform/posix/posix_pollq_poll.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <poll.h>
#include <stdlib.h>
#include <string.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/uio.h>
Expand All @@ -32,26 +33,18 @@
// nni_posix_pollq is a work structure used by the poller thread, that keeps
// track of all the underlying pipe handles and so forth being used by poll().

// Locking strategy: We use the pollq lock to guard the lists on the pollq,
// the nfds (which counts the number of items in the pollq), the pollq
// shutdown flags (pq->closing and pq->closed) and the cv on each pfd. We
// use a lock on the pfd only to protect the the events field (which we treat
// as an atomic bitfield), and the cb and arg pointers. Note that the pfd
// lock is therefore a leaf lock, which is sometimes acquired while holding
// the pq lock. Every reasonable effort is made to minimize holding locks.
// (Btw, pfd->fd is not guarded, because it is set at pfd creation and
// persists until the pfd is destroyed.)

typedef struct nni_posix_pollq {
nni_mtx mtx;
int nfds;
int wakewfd; // write side of waker pipe
int wakerfd; // read side of waker pipe
bool closing; // request for worker to exit
bool closed;
nni_thr thr; // worker thread
nni_list pollq; // armed nodes
nni_list reapq;
nni_mtx mtx;
int wakewfd; // write side of waker pipe
int wakerfd; // read side of waker pipe
nni_thr thr; // worker thread
nni_list pollq; // armed nodes
nni_list reapq;
nni_list addq;
struct pollfd *fds;
nni_posix_pfd **pfds;
int nalloc;
bool closed;
} nni_posix_pollq;

static nni_posix_pollq nni_posix_global_pollq;
Expand Down Expand Up @@ -87,16 +80,9 @@ nni_posix_pfd_init(nni_posix_pfd **pfdp, int fd)
pfd->arg = NULL;
pfd->pq = pq;
nni_mtx_lock(&pq->mtx);
if (pq->closing) {
nni_mtx_unlock(&pq->mtx);
nni_cv_fini(&pfd->cv);
nni_mtx_fini(&pfd->mtx);
NNI_FREE_STRUCT(pfd);
return (NNG_ECLOSED);
}
nni_list_append(&pq->pollq, pfd);
pq->nfds++;
nni_list_append(&pq->addq, pfd);
nni_mtx_unlock(&pq->mtx);
nni_plat_pipe_raise(pq->wakewfd);
*pfdp = pfd;
return (0);
}
Expand Down Expand Up @@ -132,7 +118,6 @@ nni_posix_pfd_fini(nni_posix_pfd *pfd)
nni_mtx_lock(&pq->mtx);
if (nni_list_active(&pq->pollq, pfd)) {
nni_list_remove(&pq->pollq, pfd);
pq->nfds--;
}

if ((!nni_thr_is_self(&pq->thr)) && (!pq->closed)) {
Expand Down Expand Up @@ -169,45 +154,42 @@ nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events)
return (0);
}

static void
posix_poll_add(nni_posix_pollq *pq)
{
nni_posix_pfd *pfd;
// Also lets reap anything that was in the reaplist!
while ((pfd = nni_list_first(&pq->addq)) != NULL) {
nni_list_remove(&pq->addq, pfd);
nni_list_append(&pq->pollq, pfd);
}
}

static void
posix_poll_reap(nni_posix_pollq *pq, unsigned events)
{
nni_posix_pfd *pfd;
if (events & POLLHUP) {
pq->closed = true;
}
while ((pfd = nni_list_first(&pq->reapq)) != NULL) {
nni_list_remove(&pq->reapq, pfd);
nni_cv_wake(&pfd->cv);
}
}

static void
nni_posix_poll_thr(void *arg)
{
nni_posix_pollq *pq = arg;
int nalloc = 0;
struct pollfd *fds = NULL;
nni_posix_pfd **pfds = NULL;
nni_posix_pollq *pq = arg;
struct pollfd *fds = pq->fds;
nni_posix_pfd **pfds = pq->pfds;

for (;;) {
int nfds;
unsigned events;
nni_posix_pfd *pfd;

nni_mtx_lock(&pq->mtx);
while (nalloc < (pq->nfds + 1)) {
int n = pq->nfds + 128;

// Drop the lock while we sleep or allocate. This
// allows additional items to be added or removed (!)
// while we wait.
nni_mtx_unlock(&pq->mtx);

// Toss the old ones first; avoids *doubling* memory
// consumption during alloc.
NNI_FREE_STRUCTS(fds, nalloc);
NNI_FREE_STRUCTS(pfds, nalloc);
nalloc = 0;

if ((pfds = NNI_ALLOC_STRUCTS(pfds, n)) == NULL) {
nni_msleep(10); // sleep for a bit, try later
} else if ((fds = NNI_ALLOC_STRUCTS(fds, n)) == NULL) {
NNI_FREE_STRUCTS(pfds, n);
nni_msleep(10);
} else {
nalloc = n;
}
nni_mtx_lock(&pq->mtx);
}

// The waker pipe is set up so that we will be woken
// when it is written (this allows us to be signaled).
fds[0].fd = pq->wakerfd;
Expand All @@ -216,21 +198,6 @@ nni_posix_poll_thr(void *arg)
pfds[pq->wakerfd] = NULL;
nfds = 1;

// Also lets reap anything that was in the reaplist!
while ((pfd = nni_list_first(&pq->reapq)) != NULL) {
nni_list_remove(&pq->reapq, pfd);
nni_cv_wake(&pfd->cv);
}

// If we're closing down, bail now. This is done *after* we
// have ensured that the reapq is empty. Anything still in
// the pollq is not going to receive further callbacks.
if (pq->closing) {
pq->closed = true;
nni_mtx_unlock(&pq->mtx);
break;
}

// Set up the poll list.
NNI_LIST_FOREACH (&pq->pollq, pfd) {

Expand All @@ -246,7 +213,6 @@ nni_posix_poll_thr(void *arg)
nfds++;
}
}
nni_mtx_unlock(&pq->mtx);

// We could get the result from poll, and avoid iterating
// over the entire set of pollfds, but since on average we
Expand All @@ -267,6 +233,10 @@ nni_posix_poll_thr(void *arg)
}
if (pfd == NULL || fd == pq->wakerfd) {
nni_plat_pipe_clear(pq->wakerfd);
nni_mtx_lock(&pq->mtx);
posix_poll_add(pq);
posix_poll_reap(pq, events);
nni_mtx_unlock(&pq->mtx);
if (events & POLLHUP) {
return;
}
Expand All @@ -291,23 +261,25 @@ nni_posix_poll_thr(void *arg)
}
}
}

NNI_FREE_STRUCTS(fds, nalloc);
NNI_FREE_STRUCTS(pfds, nalloc);
}

static void
nni_posix_pollq_destroy(nni_posix_pollq *pq)
{
nni_mtx_lock(&pq->mtx);
pq->closing = true;
nni_mtx_unlock(&pq->mtx);

nni_plat_pipe_raise(pq->wakewfd);

(void) close(pq->wakewfd);
nni_thr_fini(&pq->thr);
nni_plat_pipe_close(pq->wakewfd, pq->wakerfd);
(void) close(pq->wakerfd);
nni_mtx_fini(&pq->mtx);
if (pq->fds != NULL) {
NNI_FREE_STRUCTS(pq->fds, pq->nalloc);
pq->fds = NULL;
}
if (pq->pfds != NULL) {
NNI_FREE_STRUCTS(pq->pfds, pq->nalloc);
pq->pfds = NULL;
}
}

static int
Expand All @@ -317,8 +289,35 @@ nni_posix_pollq_create(nni_posix_pollq *pq)

NNI_LIST_INIT(&pq->pollq, nni_posix_pfd, node);
NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node);
pq->closing = false;
pq->closed = false;
NNI_LIST_INIT(&pq->addq, nni_posix_pfd, node);

pq->closed = false;
#if NNG_MAX_OPEN
pq->nalloc = NNG_MAX_OPEN;
#else
struct rlimit limits;
pq->nalloc = 0;
if (getrlimit(RLIMIT_NOFILE, &limits) == 0) {
if (limits.rlim_cur != RLIM_INFINITY) {
pq->nalloc = (int) limits.rlim_cur;
} else if (limits.rlim_max != RLIM_INFINITY) {
pq->nalloc = (int) limits.rlim_max;
}
pq->nalloc = (int) limits.rlim_max;
}
#endif
if (pq->nalloc == 0) {
// 5K files default. If you need more, either set
// rlimit properly, or
pq->nalloc = 5000;
}
if (pq->nalloc < 20) { // minimum allowed per POSIX
pq->nalloc = 20;
}
if (((pq->pfds = NNI_ALLOC_STRUCTS(pq->pfds, pq->nalloc)) == NULL) ||
((pq->fds = NNI_ALLOC_STRUCTS(pq->fds, pq->nalloc)) == NULL)) {
return (NNG_ENOMEM);
}

if ((rv = nni_plat_pipe_open(&pq->wakewfd, &pq->wakerfd)) != 0) {
return (rv);
Expand Down

0 comments on commit 60f6355

Please sign in to comment.