Skip to content

Commit

Permalink
NNG_OPT_RECVFD and NNG_OPT_SENDFD converted to functions.
Browse files Browse the repository at this point in the history
These options are removed entirely, and their functionality is now
available via special functions, `nng_socket_get_send_poll_fd` and
`nng_socket_get_recv_poll_fd`, making these first class methods on
the socket.

This eliminates a bit of wasteful code, and provides type safety
for these methods.
  • Loading branch information
gdamore committed Nov 2, 2024
1 parent 9b27984 commit 279180c
Show file tree
Hide file tree
Showing 36 changed files with 321 additions and 528 deletions.
36 changes: 18 additions & 18 deletions docs/ref/migrate/nanomsg.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,24 +66,24 @@ NNG approach to messages. Likewise there is no `struct nn_cmsghdr` equivalent.

The following options are changed.

| Nanomsg Option | NNG Eqvaivalent | Notes |
| ---------------------- | -------------------- | ------------------------------------------------------- |
| `NN_LINGER` | None | NNG does not support tuning this. |
| `NN_SNDBUF` | `NNG_OPT_SENDBUF` | NNG value is given in messages, not bytes. |
| `NN_RCVBUF` | `NNG_OPT_RECVBUF` | NNG value is given in messages, not bytes. |
| `NN_SNDTIMEO` | `NNG_OPT_SENDTIMEO` |
| `NN_RCVTIMEO` | `NNG_OPT_RECVTIMEO` |
| `NN_RECONNECT_IVL` | `NNG_OPT_RECONNMINT` |
| `NN_RECONNECT_IVL_MAX` | `NNG_OPT_RECONNMAXT` |
| `NN_SNDPRIO` | None | Not supported in NNG yet. |
| `NN_RCVPRIO` | None | Not supported in NNG yet. |
| `NN_RCVFD` | `NNG_OPT_RECVFD` |
| `NN_SNDFD` | `NNG_OPT_SENDFD` |
| `NN_DOMAIN` | None | NNG options are not divided by domain or protocol. |
| `NN_PROTOCOL` | `NNG_OPT_PROTO` | See also `NNG_OPT_PROTONAME`. |
| `NN_IPV4ONLY` | None | Use URL such as `tcp4://` to obtain this functionality. |
| `NN_SOCKET_NAME` | `NNG_OPT_SOCKNAME` |
| `NN_MAXTTL` | `NNG_OPT_MAXTTL` |
| Nanomsg Option | NNG Eqvaivalent | Notes |
| ---------------------- | ------------------------------ | ------------------------------------------------------- |
| `NN_LINGER` | None | NNG does not support tuning this. |
| `NN_SNDBUF` | `NNG_OPT_SENDBUF` | NNG value is given in messages, not bytes. |
| `NN_RCVBUF` | `NNG_OPT_RECVBUF` | NNG value is given in messages, not bytes. |
| `NN_SNDTIMEO` | `NNG_OPT_SENDTIMEO` |
| `NN_RCVTIMEO` | `NNG_OPT_RECVTIMEO` |
| `NN_RECONNECT_IVL` | `NNG_OPT_RECONNMINT` |
| `NN_RECONNECT_IVL_MAX` | `NNG_OPT_RECONNMAXT` |
| `NN_SNDPRIO` | None | Not supported in NNG yet. |
| `NN_RCVPRIO` | None | Not supported in NNG yet. |
| `NN_RCVFD` | `+nng_socket_get_recv_poll_fd` | No longer an option, use a function call. |
| `NN_SNDFD` | `+nng_socket_get_send_poll_fd` | No longer an option, use a function call. |
| `NN_DOMAIN` | None | NNG options are not divided by domain or protocol. |
| `NN_PROTOCOL` | `NNG_OPT_PROTO` | See also `NNG_OPT_PROTONAME`. |
| `NN_IPV4ONLY` | None | Use URL such as `tcp4://` to obtain this functionality. |
| `NN_SOCKET_NAME` | `NNG_OPT_SOCKNAME` |
| `NN_MAXTTL` | `NNG_OPT_MAXTTL` |

## Error Codes

Expand Down
14 changes: 12 additions & 2 deletions include/nng/nng.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,18 @@ NNG_DECL int nng_socket_get_ptr(nng_socket, const char *, void **);
NNG_DECL int nng_socket_get_ms(nng_socket, const char *, nng_duration *);
NNG_DECL int nng_socket_get_addr(nng_socket, const char *, nng_sockaddr *);

// These functions are used to obtain a file descriptor that will poll
// as readable if the socket can receive or send. Applications must never
// read or write to the file descriptor directly, but simply check it
// with poll, epoll, kqueue, or similar functions. This is intended to
// aid in integration NNG with external event loops based on polling I/O.
// Note that using these functions will force NNG to make extra system calls,
// and thus impact performance. The file descriptor pollability is
// level-triggered. These file descriptors will be closed when the socket
// is closed.
NNG_DECL int nng_socket_get_recv_poll_fd(nng_socket id, int *fdp);
NNG_DECL int nng_socket_get_send_poll_fd(nng_socket id, int *fdp);

// Utility function for getting a printable form of the socket address
// for display in logs, etc. It is not intended to be parsed, and the
// display format may change without notice. Generally you should alow
Expand Down Expand Up @@ -722,8 +734,6 @@ NNG_DECL nng_listener nng_pipe_listener(nng_pipe);
#define NNG_OPT_PEERNAME "peer-name"
#define NNG_OPT_RECVBUF "recv-buffer"
#define NNG_OPT_SENDBUF "send-buffer"
#define NNG_OPT_RECVFD "recv-fd"
#define NNG_OPT_SENDFD "send-fd"
#define NNG_OPT_RECVTIMEO "recv-timeout"
#define NNG_OPT_SENDTIMEO "send-timeout"
#define NNG_OPT_LOCADDR "local-address"
Expand Down
8 changes: 7 additions & 1 deletion src/core/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ struct nni_proto_sock_ops {
// Receive a message.
void (*sock_recv)(void *, nni_aio *);

// Return the receive poll FD.
int (*sock_recv_poll_fd)(void *, int *);

// Return the send poll FD.
int (*sock_send_poll_fd)(void *, int *);

// Options. Must not be NULL. Final entry should have NULL name.
nni_option *sock_options;
};
Expand All @@ -124,7 +130,7 @@ struct nni_proto {
uint32_t proto_flags; // Protocol flags
const nni_proto_sock_ops *proto_sock_ops; // Per-socket operations
const nni_proto_pipe_ops *proto_pipe_ops; // Per-pipe operations
const nni_proto_ctx_ops * proto_ctx_ops; // Context operations
const nni_proto_ctx_ops *proto_ctx_ops; // Context operations
};

// We quite intentionally use a signature where the upper word is nonzero,
Expand Down
11 changes: 0 additions & 11 deletions src/core/sock_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,6 @@ test_send_nonblock(void)
NUTS_CLOSE(s1);
}

void
test_readonly_options(void)
{
nng_socket s1;
NUTS_OPEN(s1);
NUTS_FAIL(nng_socket_set_int(s1, NNG_OPT_RECVFD, 0), NNG_EREADONLY);
NUTS_FAIL(nng_socket_set_int(s1, NNG_OPT_SENDFD, 0), NNG_EREADONLY);
NUTS_CLOSE(s1);
}

void
test_socket_base(void)
{
Expand Down Expand Up @@ -596,7 +586,6 @@ NUTS_TESTS = {
{ "recv non-block", test_recv_nonblock },
{ "send timeout", test_send_timeout },
{ "send non-block", test_send_nonblock },
{ "read only options", test_readonly_options },
{ "socket base", test_socket_base },
{ "socket name", test_socket_name },
{ "socket name oversize", test_socket_name_oversize },
Expand Down
58 changes: 22 additions & 36 deletions src/core/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,19 @@ static void nni_ctx_destroy(nni_ctx *);
#define SOCK(s) ((nni_sock *) (s))

static int
sock_get_fd(void *s, unsigned flag, int *fdp)
sock_get_fd(nni_sock *s, unsigned flag, int *fdp)
{
int rv;
nni_pollable *p;

if ((flag & nni_sock_flags(SOCK(s))) == 0) {
if ((flag & nni_sock_flags(s)) == 0) {
return (NNG_ENOTSUP);
}

if (flag == NNI_PROTO_FLAG_SND) {
rv = nni_msgq_get_sendable(SOCK(s)->s_uwq, &p);
rv = nni_msgq_get_sendable(s->s_uwq, &p);
} else {
rv = nni_msgq_get_recvable(SOCK(s)->s_urq, &p);
rv = nni_msgq_get_recvable(s->s_urq, &p);
}

if (rv == 0) {
Expand All @@ -136,30 +136,6 @@ sock_get_fd(void *s, unsigned flag, int *fdp)
return (rv);
}

static int
sock_get_sendfd(void *s, void *buf, size_t *szp, nni_type t)
{
int fd;
int rv;

if ((rv = sock_get_fd(SOCK(s), NNI_PROTO_FLAG_SND, &fd)) != 0) {
return (rv);
}
return (nni_copyout_int(fd, buf, szp, t));
}

static int
sock_get_recvfd(void *s, void *buf, size_t *szp, nni_type t)
{
int fd;
int rv;

if ((rv = sock_get_fd(SOCK(s), NNI_PROTO_FLAG_RCV, &fd)) != 0) {
return (rv);
}
return (nni_copyout_int(fd, buf, szp, t));
}

static int
sock_get_raw(void *s, void *buf, size_t *szp, nni_type t)
{
Expand Down Expand Up @@ -286,14 +262,6 @@ static const nni_option sock_options[] = {
.o_get = sock_get_sendtimeo,
.o_set = sock_set_sendtimeo,
},
{
.o_name = NNG_OPT_RECVFD,
.o_get = sock_get_recvfd,
},
{
.o_name = NNG_OPT_SENDFD,
.o_get = sock_get_sendfd,
},
{
.o_name = NNG_OPT_RECVBUF,
.o_get = sock_get_recvbuf,
Expand Down Expand Up @@ -353,6 +321,24 @@ nni_sock_id(nni_sock *s)
return (s->s_id);
}

int
nni_sock_get_send_fd(nni_sock *s, int *fdp)
{
if (s->s_sock_ops.sock_send_poll_fd != NULL) {
return (s->s_sock_ops.sock_send_poll_fd(s->s_data, fdp));
}
return (sock_get_fd(s, NNI_PROTO_FLAG_SND, fdp));
}

int
nni_sock_get_recv_fd(nni_sock *s, int *fdp)
{
if (s->s_sock_ops.sock_recv_poll_fd != NULL) {
return (s->s_sock_ops.sock_recv_poll_fd(s->s_data, fdp));
}
return (sock_get_fd(s, NNI_PROTO_FLAG_RCV, fdp));
}

// nni_sock_sendq and nni_sock_recvq are called by the protocol to obtain
// the upper read and write queues.
nni_msgq *
Expand Down
2 changes: 2 additions & 0 deletions src/core/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ extern int nni_sock_getopt(
extern void nni_sock_send(nni_sock *, nni_aio *);
extern void nni_sock_recv(nni_sock *, nni_aio *);
extern uint32_t nni_sock_id(nni_sock *);
extern int nni_sock_get_send_fd(nni_sock *s, int *fdp);
extern int nni_sock_get_recv_fd(nni_sock *s, int *fdp);

// These are socket methods that protocol operations can expect to call.
// Note that each of these should be called without any locks held, since
Expand Down
33 changes: 33 additions & 0 deletions src/nng.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "nng/nng.h"
#include "core/nng_impl.h"
#include "core/platform.h"
#include "core/socket.h"

// This file provides the "public" API. This is a thin wrapper around
// internal API functions. We use the public prefix instead of internal,
Expand Down Expand Up @@ -1141,6 +1142,38 @@ nng_socket_get_addr(nng_socket id, const char *n, nng_sockaddr *v)
return (socket_get(id, n, v, NULL, NNI_TYPE_SOCKADDR));
}

int
nng_socket_get_recv_poll_fd(nng_socket id, int *fdp)
{
int rv;
nni_sock *sock;

if (((rv = nni_init()) != 0) ||
((rv = nni_sock_find(&sock, id.id)) != 0)) {
return (rv);

Check warning on line 1153 in src/nng.c

View check run for this annotation

Codecov / codecov/patch

src/nng.c#L1153

Added line #L1153 was not covered by tests
}

rv = nni_sock_get_recv_fd(sock, fdp);
nni_sock_rele(sock);
return (rv);
}

int
nng_socket_get_send_poll_fd(nng_socket id, int *fdp)
{
int rv;
nni_sock *sock;

if (((rv = nni_init()) != 0) ||
((rv = nni_sock_find(&sock, id.id)) != 0)) {
return (rv);

Check warning on line 1169 in src/nng.c

View check run for this annotation

Codecov / codecov/patch

src/nng.c#L1169

Added line #L1169 was not covered by tests
}

rv = nni_sock_get_send_fd(sock, fdp);
nni_sock_rele(sock);
return (rv);
}

int
nng_pipe_notify(nng_socket s, nng_pipe_ev ev, nng_pipe_cb cb, void *arg)
{
Expand Down
66 changes: 24 additions & 42 deletions src/sp/protocol/bus0/bus.c
Original file line number Diff line number Diff line change
Expand Up @@ -363,34 +363,20 @@ bus0_sock_recv(void *arg, nni_aio *aio)
}

static int
bus0_sock_get_send_fd(void *arg, void *buf, size_t *szp, nni_type t)
bus0_sock_get_send_fd(void *arg, int *fdp)
{
bus0_sock *sock = arg;
int fd;
int rv;
nni_mtx_lock(&sock->mtx);
// BUS sockets are *always* writable (best effort)
nni_pollable_raise(&sock->can_send);
rv = nni_pollable_getfd(&sock->can_send, &fd);
nni_mtx_unlock(&sock->mtx);

if (rv == 0) {
rv = nni_copyout_int(fd, buf, szp, t);
}
return (rv);
return (nni_pollable_getfd(&sock->can_send, fdp));
}

static int
bus0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t)
bus0_sock_get_recv_fd(void *arg, int *fdp)
{
bus0_sock *s = arg;
int rv;
int fd;

if ((rv = nni_pollable_getfd(&s->can_recv, &fd)) == 0) {
rv = nni_copyout_int(fd, buf, szp, t);
}
return (rv);
return (nni_pollable_getfd(&s->can_recv, fdp));
}

static int
Expand Down Expand Up @@ -474,14 +460,6 @@ static nni_proto_pipe_ops bus0_pipe_ops = {
};

static nni_option bus0_sock_options[] = {
{
.o_name = NNG_OPT_SENDFD,
.o_get = bus0_sock_get_send_fd,
},
{
.o_name = NNG_OPT_RECVFD,
.o_get = bus0_sock_get_recv_fd,
},
{
.o_name = NNG_OPT_RECVBUF,
.o_get = bus0_sock_get_recv_buf_len,
Expand All @@ -499,25 +477,29 @@ static nni_option bus0_sock_options[] = {
};

static nni_proto_sock_ops bus0_sock_ops = {
.sock_size = sizeof(bus0_sock),
.sock_init = bus0_sock_init,
.sock_fini = bus0_sock_fini,
.sock_open = bus0_sock_open,
.sock_close = bus0_sock_close,
.sock_send = bus0_sock_send,
.sock_recv = bus0_sock_recv,
.sock_options = bus0_sock_options,
.sock_size = sizeof(bus0_sock),
.sock_init = bus0_sock_init,
.sock_fini = bus0_sock_fini,
.sock_open = bus0_sock_open,
.sock_close = bus0_sock_close,
.sock_send = bus0_sock_send,
.sock_recv = bus0_sock_recv,
.sock_send_poll_fd = bus0_sock_get_send_fd,
.sock_recv_poll_fd = bus0_sock_get_recv_fd,
.sock_options = bus0_sock_options,
};

static nni_proto_sock_ops bus0_sock_ops_raw = {
.sock_size = sizeof(bus0_sock),
.sock_init = bus0_sock_init_raw,
.sock_fini = bus0_sock_fini,
.sock_open = bus0_sock_open,
.sock_close = bus0_sock_close,
.sock_send = bus0_sock_send,
.sock_recv = bus0_sock_recv,
.sock_options = bus0_sock_options,
.sock_size = sizeof(bus0_sock),
.sock_init = bus0_sock_init_raw,
.sock_fini = bus0_sock_fini,
.sock_open = bus0_sock_open,
.sock_close = bus0_sock_close,
.sock_send = bus0_sock_send,
.sock_recv = bus0_sock_recv,
.sock_send_poll_fd = bus0_sock_get_send_fd,
.sock_recv_poll_fd = bus0_sock_get_recv_fd,
.sock_options = bus0_sock_options,
};

static nni_proto bus0_proto = {
Expand Down
7 changes: 4 additions & 3 deletions src/sp/protocol/bus0/bus_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// found online at https://opensource.org/licenses/MIT.
//

#include "nng/nng.h"
#include <nuts.h>

#include <nng/protocol/bus0/bus.h>
Expand Down Expand Up @@ -244,7 +245,7 @@ test_bus_poll_readable(void)
NUTS_PASS(nng_bus0_open(&s2));
NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_RECVTIMEO, 1000));
NUTS_PASS(nng_socket_set_ms(s2, NNG_OPT_SENDTIMEO, 1000));
NUTS_PASS(nng_socket_get_int(s1, NNG_OPT_RECVFD, &fd));
NUTS_PASS(nng_socket_get_recv_poll_fd(s1, &fd));
NUTS_TRUE(fd >= 0);

// Not readable if not connected!
Expand Down Expand Up @@ -277,10 +278,10 @@ test_bus_poll_writeable(void)
NUTS_PASS(nng_bus0_open(&s1));
NUTS_PASS(nng_bus0_open(&s2));
NUTS_PASS(nng_socket_set_int(s2, NNG_OPT_SENDBUF, 1));
NUTS_PASS(nng_socket_get_int(s2, NNG_OPT_SENDFD, &fd));
NUTS_PASS(nng_socket_get_send_poll_fd(s2, &fd));
NUTS_TRUE(fd >= 0);

// Pub is *always* writeable
// Bus is *always* writeable
NUTS_TRUE(nuts_poll_fd(fd));

// Even after connect (no message yet)
Expand Down
Loading

0 comments on commit 279180c

Please sign in to comment.