Skip to content

Commit

Permalink
Feature/combined writes with writev (#177)
Browse files Browse the repository at this point in the history
  • Loading branch information
jthomas43 authored Mar 21, 2024
1 parent 75c91e2 commit 54a0cce
Show file tree
Hide file tree
Showing 14 changed files with 354 additions and 233 deletions.
4 changes: 2 additions & 2 deletions examples/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ on_ack_end (udx_stream_write_t *req, int status, int unordered) {
static void
pump_writes () {
while (bytes_sent < PUMP_BYTES) {
udx_stream_write_t *req = malloc(sizeof(udx_stream_write_t));
udx_stream_write_t *req = malloc(udx_stream_write_sizeof(1));
bytes_sent += chunk.len;

if (udx_stream_write(req, &stream, &chunk, 1, on_ack)) continue;
Expand All @@ -62,7 +62,7 @@ pump_writes () {
return;
}

udx_stream_write_t *req = malloc(sizeof(udx_stream_write_t));
udx_stream_write_t *req = malloc(udx_stream_write_sizeof(1));
udx_stream_write_end(req, &stream, &empty, 1, on_ack_end);
}

Expand Down
38 changes: 28 additions & 10 deletions include/udx.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ typedef struct udx_packet_s udx_packet_t;
typedef struct udx_socket_send_s udx_socket_send_t;
typedef struct udx_stream_send_s udx_stream_send_t;
typedef struct udx_stream_write_s udx_stream_write_t;
typedef struct udx_stream_write_buf_s udx_stream_write_buf_t;

typedef enum {
UDX_LOOKUP_FAMILY_IPV4 = 1,
Expand Down Expand Up @@ -270,54 +271,68 @@ struct udx_packet_s {
int ttl;
int is_retransmit;

udx_stream_t *stream; // pointer to the stream if stream packet

uint8_t transmits;
bool is_mtu_probe;
uint16_t size;
uint64_t time_sent;

void *ctx;
void *ctx; // stream_send_t | socket_send_t | stream_t

struct sockaddr_storage dest;
int dest_len;

uint32_t fifo_gc; // for removing from inflight / retransmit queue
// udx_packet_t *prev; // alternative for inflight / retransmit queues
// udx_packet_t *next; // alternative for inflight / retransmit queues

// just alloc it in place here, easier to manage
char header[UDX_HEADER_SIZE];
unsigned int bufs_len;
uv_buf_t bufs[3];
unsigned short nbufs;

// inefficient - only relevant for stream_t packets
unsigned short nwbufs;
udx_stream_write_buf_t **wbufs;
};

struct udx_socket_send_s {
udx_packet_t pkt;
uv_buf_t bufs[1]; // buf_t[] must be after packet_t
udx_socket_t *socket;

udx_socket_send_cb on_send;

void *data;
};

struct udx_stream_write_s {
// immutable, original write
struct udx_stream_write_buf_s {
// immutable original buf
uv_buf_t buf;

size_t bytes_acked;
// 1. remove from write_queue when bytes_inflight + bytes_acked == buf.len
// 2. free when bytes_acked == buf.len
size_t bytes_inflight;
size_t bytes_acked;

udx_stream_write_t *write;

bool is_write_end;
};

struct udx_stream_write_s {
size_t size;
size_t bytes_acked;
bool is_write_end;

udx_stream_t *stream;
udx_stream_ack_cb on_ack;

void *data;

unsigned int nwbufs;
udx_stream_write_buf_t wbuf[];
};

struct udx_stream_send_s {
udx_packet_t pkt;
uv_buf_t bufs[3]; // buf_t[] must be after packet_t
udx_stream_t *stream;

udx_stream_send_cb on_send;
Expand Down Expand Up @@ -449,6 +464,9 @@ udx_stream_send (udx_stream_send_t *req, udx_stream_t *stream, const uv_buf_t bu
int
udx_stream_write_resume (udx_stream_t *stream, udx_stream_drain_cb drain_cb);

int
udx_stream_write_sizeof (int nwbufs);

int
udx_stream_write (udx_stream_write_t *req, udx_stream_t *stream, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_ack_cb ack_cb);

Expand Down
6 changes: 3 additions & 3 deletions src/io_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ udx__on_writable (udx_socket_t *socket) {
p->msg_hdr.msg_name = &pkt->dest;
p->msg_hdr.msg_namelen = pkt->dest_len;

p->msg_hdr.msg_iov = (struct iovec *) pkt->bufs;
p->msg_hdr.msg_iovlen = pkt->bufs_len;
p->msg_hdr.msg_iov = (struct iovec *) (pkt + 1);
p->msg_hdr.msg_iovlen = pkt->nbufs;

npkts++;
}
Expand Down Expand Up @@ -198,7 +198,7 @@ udx__on_writable (udx_socket_t *socket) {
pkt->dest_len = sizeof(struct sockaddr_in6);
}

ssize_t size = udx__sendmsg(socket, pkt->bufs, pkt->bufs_len, (struct sockaddr *) &(pkt->dest), pkt->dest_len);
ssize_t size = udx__sendmsg(socket, (uv_buf_t *) (pkt + 1), pkt->nbufs, (struct sockaddr *) &(pkt->dest), pkt->dest_len);

if (adjust_ttl) uv_udp_set_ttl((uv_udp_t *) socket, socket->ttl);

Expand Down
2 changes: 1 addition & 1 deletion src/io_win.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ udx__on_writable (udx_socket_t *socket) {
pkt->dest_len = sizeof(struct sockaddr_in6);
}

ssize_t size = udx__sendmsg(socket, pkt->bufs, pkt->bufs_len, (struct sockaddr *) &(pkt->dest), pkt->dest_len);
ssize_t size = udx__sendmsg(socket, (uv_buf_t *) (pkt + 1), pkt->nbufs, (struct sockaddr *) &(pkt->dest), pkt->dest_len);

if (adjust_ttl) uv_udp_set_ttl((uv_udp_t *) socket, socket->ttl);

Expand Down
Loading

0 comments on commit 54a0cce

Please sign in to comment.