Skip to content

Commit

Permalink
cherry pick commits from PR nanomsg#1828
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed May 30, 2024
1 parent ac1da00 commit 0b45092
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 505 deletions.
28 changes: 1 addition & 27 deletions src/core/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,28 +70,6 @@ static int nni_aio_expire_q_cnt;
// operations from starting, without waiting for any existing one to
// complete, call nni_aio_close.

// In some places we want to check that an aio is not in use.
// Technically if these checks pass, then they should not need
// to be done with a lock, because the caller should have the only
// references to them. However, race detectors can't necessarily
// know about this semantic, and may complain about potential data
// races. To suppress false positives, define NNG_RACE_DETECTOR.
// Note that this will cause extra locks to be acquired, affecting
// performance, so don't use it in production.
#ifdef __has_feature
#if __has_feature(thread_sanitizer)
#define NNG_RACE_DETECTOR
#endif
#endif

#ifdef NNG_RACE_DETECTOR
#define aio_safe_lock(l) nni_mtx_lock(l)
#define aio_safe_unlock(l) nni_mtx_unlock(l)
#else
#define aio_safe_lock(l) ((void) 1)
#define aio_safe_unlock(l) ((void) 1)
#endif

static nni_reap_list aio_reap_list = {
.rl_offset = offsetof(nni_aio, a_reap_node),
.rl_func = (nni_cb) nni_aio_free,
Expand Down Expand Up @@ -350,8 +328,7 @@ nni_aio_begin(nni_aio *aio)
// checks may wish ignore or suppress these checks.
nni_aio_expire_q *eq = aio->a_expire_q;

aio_safe_lock(&eq->eq_mtx);

nni_mtx_lock(&eq->eq_mtx);
NNI_ASSERT(!nni_aio_list_active(aio));
NNI_ASSERT(aio->a_cancel_fn == NULL);
NNI_ASSERT(!nni_list_node_active(&aio->a_expire_node));
Expand All @@ -365,9 +342,6 @@ nni_aio_begin(nni_aio *aio)
aio->a_count = 0;
aio->a_cancel_fn = NULL;

aio_safe_unlock(&eq->eq_mtx);

nni_mtx_lock(&eq->eq_mtx);
// We should not reschedule anything at this point.
if (aio->a_stop) {
aio->a_result = NNG_ECANCELED;
Expand Down
2 changes: 1 addition & 1 deletion src/core/pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ nni_pipe_bump_error(nni_pipe *p, int err)
{
if (p->p_dialer != NULL) {
nni_dialer_bump_error(p->p_dialer, err);
} else {
} else if (p->p_listener != NULL) {
nni_listener_bump_error(p->p_listener, err);
}
}
11 changes: 2 additions & 9 deletions src/core/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ struct nni_socket {

bool s_closing; // Socket is closing
bool s_closed; // Socket closed, protected by global lock
bool s_ctxwait; // Waiting for contexts to close.

nni_mtx s_pipe_cbs_mtx;
nni_sock_pipe_cb s_pipe_cbs[NNG_PIPE_EV_NUM];
Expand Down Expand Up @@ -732,7 +731,6 @@ nni_sock_shutdown(nni_sock *sock)
// a chance to do so gracefully.

while (!nni_list_empty(&sock->s_ctxs)) {
sock->s_ctxwait = true;
nni_cv_wait(&sock->s_close_cv);
}
nni_mtx_unlock(&sock_lk);
Expand Down Expand Up @@ -796,7 +794,6 @@ nni_sock_close(nni_sock *s)

// Wait for all other references to drop. Note that we
// have a reference already (from our caller).
s->s_ctxwait = true;
while ((s->s_ref > 1) || (!nni_list_empty(&s->s_ctxs))) {
nni_cv_wait(&s->s_close_cv);
}
Expand Down Expand Up @@ -1305,9 +1302,7 @@ nni_ctx_rele(nni_ctx *ctx)
// tries to avoid ID reuse.
nni_id_remove(&ctx_ids, ctx->c_id);
nni_list_remove(&sock->s_ctxs, ctx);
if (sock->s_closed || sock->s_ctxwait) {
nni_cv_wake(&sock->s_close_cv);
}
nni_cv_wake(&sock->s_close_cv);
nni_mtx_unlock(&sock_lk);

nni_ctx_destroy(ctx);
Expand Down Expand Up @@ -1771,9 +1766,7 @@ nni_pipe_remove(nni_pipe *p)
d->d_pipe = NULL;
dialer_timer_start_locked(d); // Kick the timer to redial.
}
if (s->s_closing) {
nni_cv_wake(&s->s_cv);
}
nni_cv_wake(&s->s_cv);
nni_mtx_unlock(&s->s_mx);
}

Expand Down
2 changes: 0 additions & 2 deletions src/platform/windows/win_ipcconn.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ typedef struct ipc_conn {
HANDLE f;
nni_win_io recv_io;
nni_win_io send_io;
nni_win_io conn_io;
nni_list recv_aios;
nni_list send_aios;
nni_aio *conn_aio;
Expand Down Expand Up @@ -319,7 +318,6 @@ ipc_conn_reap(void *arg)

nni_win_io_fini(&c->recv_io);
nni_win_io_fini(&c->send_io);
nni_win_io_fini(&c->conn_io);

if (c->f != INVALID_HANDLE_VALUE) {
CloseHandle(c->f);
Expand Down
24 changes: 11 additions & 13 deletions src/sp/transport/ipc/ipc.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Devolutions <info@devolutions.net>
//
Expand Down Expand Up @@ -67,7 +67,7 @@ struct ipc_ep {
nni_aio * time_aio;
nni_list busy_pipes; // busy pipes -- ones passed to socket
nni_list wait_pipes; // pipes waiting to match to socket
nni_list neg_pipes; // pipes busy negotiating
nni_list nego_pipes; // pipes busy negotiating
nni_reap_node reap;
#ifdef NNG_ENABLE_STATS
nni_stat_item st_rcv_max;
Expand All @@ -78,7 +78,7 @@ static void ipc_pipe_send_start(ipc_pipe *p);
static void ipc_pipe_recv_start(ipc_pipe *p);
static void ipc_pipe_send_cb(void *);
static void ipc_pipe_recv_cb(void *);
static void ipc_pipe_neg_cb(void *);
static void ipc_pipe_nego_cb(void *);
static void ipc_pipe_fini(void *);
static void ipc_ep_fini(void *);

Expand Down Expand Up @@ -152,10 +152,10 @@ ipc_pipe_fini(void *arg)
}
nni_mtx_unlock(&ep->mtx);
}
nng_stream_free(p->conn);
nni_aio_fini(&p->rx_aio);
nni_aio_fini(&p->tx_aio);
nni_aio_fini(&p->neg_aio);
nng_stream_free(p->conn);
if (p->rx_msg) {
nni_msg_free(p->rx_msg);
}
Expand All @@ -167,9 +167,6 @@ static void
ipc_pipe_reap(ipc_pipe *p)
{
if (!nni_atomic_flag_test_and_set(&p->reaped)) {
if (p->conn != NULL) {
nng_stream_close(p->conn);
}
nni_reap(&ipc_pipe_reap_list, p);
}
}
Expand All @@ -185,7 +182,7 @@ ipc_pipe_alloc(ipc_pipe **pipe_p)
nni_mtx_init(&p->mtx);
nni_aio_init(&p->tx_aio, ipc_pipe_send_cb, p);
nni_aio_init(&p->rx_aio, ipc_pipe_recv_cb, p);
nni_aio_init(&p->neg_aio, ipc_pipe_neg_cb, p);
nni_aio_init(&p->neg_aio, ipc_pipe_nego_cb, p);
nni_aio_list_init(&p->send_q);
nni_aio_list_init(&p->recv_q);
nni_atomic_flag_reset(&p->reaped);
Expand All @@ -212,7 +209,7 @@ ipc_ep_match(ipc_ep *ep)
}

static void
ipc_pipe_neg_cb(void *arg)
ipc_pipe_nego_cb(void *arg)
{
ipc_pipe *p = arg;
ipc_ep * ep = p->ep;
Expand Down Expand Up @@ -263,7 +260,7 @@ ipc_pipe_neg_cb(void *arg)

// We are ready now. We put this in the wait list, and
// then try to run the matcher.
nni_list_remove(&ep->neg_pipes, p);
nni_list_remove(&ep->nego_pipes, p);
nni_list_append(&ep->wait_pipes, p);

ipc_ep_match(ep);
Expand All @@ -278,6 +275,7 @@ ipc_pipe_neg_cb(void *arg)
if (rv == NNG_ECLOSED) {
rv = NNG_ECONNSHUT;
}
nni_list_remove(&ep->nego_pipes, p);
nng_stream_close(p->conn);
// If we are waiting to negotiate on a client side, then a failure
// here has to be passed to the user app.
Expand Down Expand Up @@ -647,7 +645,7 @@ ipc_pipe_start(ipc_pipe *p, nng_stream *conn, ipc_ep *ep)
iov.iov_len = 8;
iov.iov_buf = &p->tx_head[0];
nni_aio_set_iov(&p->neg_aio, 1, &iov);
nni_list_append(&ep->neg_pipes, p);
nni_list_append(&ep->nego_pipes, p);

nni_aio_set_timeout(&p->neg_aio, 10000); // 10 sec timeout to negotiate
nng_stream_send(p->conn, &p->neg_aio);
Expand All @@ -668,7 +666,7 @@ ipc_ep_close(void *arg)
if (ep->listener != NULL) {
nng_stream_listener_close(ep->listener);
}
NNI_LIST_FOREACH (&ep->neg_pipes, p) {
NNI_LIST_FOREACH (&ep->nego_pipes, p) {
ipc_pipe_close(p);
}
NNI_LIST_FOREACH (&ep->wait_pipes, p) {
Expand Down Expand Up @@ -824,7 +822,7 @@ ipc_ep_init(ipc_ep **epp, nni_sock *sock)
nni_mtx_init(&ep->mtx);
NNI_LIST_INIT(&ep->busy_pipes, ipc_pipe, node);
NNI_LIST_INIT(&ep->wait_pipes, ipc_pipe, node);
NNI_LIST_INIT(&ep->neg_pipes, ipc_pipe, node);
NNI_LIST_INIT(&ep->nego_pipes, ipc_pipe, node);

ep->proto = nni_sock_proto_id(sock);

Expand Down
Loading

0 comments on commit 0b45092

Please sign in to comment.