Skip to content

Commit

Permalink
Feature/change remote add socket (#147)
Browse files Browse the repository at this point in the history
* copied perf test from sendmmsg branch

* trailing whitespace

* change socket on change_remote call, crash fix.

* always set pkt->fifo_gc and pkt->send_queue when queueing a packet

* win fix

---------

Co-authored-by: Kasper Isager Dalsgarð <kasperisager@hey.com>
  • Loading branch information
jthomas43 and kasperisager authored Sep 21, 2023
1 parent 6fe2cac commit 1c06689
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 22 deletions.
5 changes: 3 additions & 2 deletions include/udx.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ struct udx_packet_s {
int ttl;
int is_retransmit;

uint32_t fifo_gc;
udx_fifo_t *send_queue; // pointer to socket->send_queue
uint32_t fifo_gc; // index into socket->send_queue

uint8_t transmits;
uint16_t size;
Expand Down Expand Up @@ -406,7 +407,7 @@ int
udx_stream_connect (udx_stream_t *handle, udx_socket_t *socket, uint32_t remote_id, const struct sockaddr *remote_addr);

int
udx_stream_change_remote (udx_stream_t *stream, uint32_t remote_id, const struct sockaddr *remote_addr, udx_stream_remote_changed_cb remote_changed_cb);
udx_stream_change_remote (udx_stream_t *stream, udx_socket_t *socket, uint32_t remote_id, const struct sockaddr *remote_addr, udx_stream_remote_changed_cb remote_changed_cb);

int
udx_stream_relay_to (udx_stream_t *handle, udx_stream_t *destination);
Expand Down
2 changes: 1 addition & 1 deletion src/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ addr_to_v6 (struct sockaddr_in *addr) {
}

void
udx__trigger_send_callback (udx_socket_t *socket, udx_packet_t *packet);
udx__trigger_send_callback (udx_packet_t *packet);
void
udx__close_handles (udx_socket_t *socket);

Expand Down
4 changes: 2 additions & 2 deletions src/io_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ udx__on_writable (udx_socket_t *socket) {
int type = pkt->type;

if (type & (UDX_PACKET_STREAM_SEND | UDX_PACKET_STREAM_DESTROY | UDX_PACKET_SEND)) {
udx__trigger_send_callback(socket, pkt);
udx__trigger_send_callback(pkt);
// TODO: watch for re-entry here!
}

Expand Down Expand Up @@ -223,7 +223,7 @@ udx__on_writable (udx_socket_t *socket) {
int type = pkt->type;

if (type & UDX_PACKET_CALLBACK) {
udx__trigger_send_callback(socket, pkt);
udx__trigger_send_callback(pkt);
// TODO: watch for re-entry here!
}

Expand Down
2 changes: 1 addition & 1 deletion src/io_win.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ udx__on_writable (udx_socket_t *socket) {
int type = pkt->type;

if (type & UDX_PACKET_CALLBACK) {
udx__trigger_send_callback(socket, pkt);
udx__trigger_send_callback(pkt);
// TODO: watch for re-entry here!
}

Expand Down
43 changes: 29 additions & 14 deletions src/udx.c
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ unqueue_first_transmits (udx_stream_t *stream) {
stream->inflight -= pkt->size;
stream->seq_flushed--;

udx__fifo_remove(&(stream->socket->send_queue), pkt, pkt->fifo_gc);
udx__fifo_remove(pkt->send_queue, pkt, pkt->fifo_gc);
}
}

Expand All @@ -394,7 +394,6 @@ clear_incoming_packets (udx_stream_t *stream) {

static void
clear_outgoing_packets (udx_stream_t *stream) {
udx_fifo_t *q = &(stream->socket->send_queue);

// We should make sure all existing packets do not send, and notify the user that they failed
for (uint32_t seq = stream->remote_acked; seq != stream->seq; seq++) {
Expand All @@ -404,7 +403,7 @@ clear_outgoing_packets (udx_stream_t *stream) {

// Make sure to remove it from the fifo, if it was added
if (pkt->status == UDX_PACKET_SENDING) {
udx__fifo_remove(q, pkt, pkt->fifo_gc);
udx__fifo_remove(pkt->send_queue, pkt, pkt->fifo_gc);
}

udx_stream_write_t *w = (udx_stream_write_t *) pkt->ctx;
Expand Down Expand Up @@ -438,7 +437,7 @@ clear_outgoing_packets (udx_stream_t *stream) {
udx_packet_t *pkt = udx__fifo_shift(u);
if (pkt == NULL) continue;

udx__fifo_remove(q, pkt, pkt->fifo_gc);
udx__fifo_remove(pkt->send_queue, pkt, pkt->fifo_gc);

if (pkt->type == UDX_PACKET_STREAM_SEND) {
udx_stream_send_t *req = pkt->ctx;
Expand Down Expand Up @@ -481,6 +480,7 @@ init_stream_packet (udx_packet_t *pkt, int type, udx_stream_t *stream, const uv_
pkt->size = (uint16_t) (UDX_HEADER_SIZE + buf->len);
pkt->dest = stream->remote_addr;
pkt->dest_len = stream->remote_addr_len;
pkt->send_queue = NULL;

pkt->bufs_len = 2;

Expand Down Expand Up @@ -574,7 +574,8 @@ send_state_packet (udx_stream_t *stream) {
pkt->type = UDX_PACKET_STREAM_STATE;
pkt->ttl = 0;

udx__fifo_push(&(stream->socket->send_queue), pkt);
pkt->send_queue = &stream->socket->send_queue;
pkt->fifo_gc = udx__fifo_push(&stream->socket->send_queue, pkt);
return update_poll(stream->socket);
}

Expand Down Expand Up @@ -605,6 +606,7 @@ send_data_packet (udx_stream_t *stream, udx_packet_t *pkt) {
stream->seq_flushed = pkt->seq + 1;
}

pkt->send_queue = &stream->socket->send_queue;
pkt->fifo_gc = udx__fifo_push(&(stream->socket->send_queue), pkt);

int err = update_poll(stream->socket);
Expand Down Expand Up @@ -718,6 +720,7 @@ fill_window (udx_stream_t *stream) {
assert(seq_compare(stream->seq_flushed, pkt->seq) <= 0);
stream->seq_flushed = pkt->seq + 1;

pkt->send_queue = &stream->socket->send_queue;
pkt->fifo_gc = udx__fifo_push(&stream->socket->send_queue, pkt);

if (buf->len == 0) {
Expand Down Expand Up @@ -1032,7 +1035,8 @@ ack_packet (udx_stream_t *stream, uint32_t seq, int sack) {

// If this packet was queued for sending we need to remove it from the queue.
if (pkt->status == UDX_PACKET_SENDING) {
udx__fifo_remove(&(stream->socket->send_queue), pkt, pkt->fifo_gc);
debug_printf("removing packet from send_queue=%p pkt=%p, fifo_gc=%d\n", pkt->send_queue, pkt, pkt->fifo_gc);
udx__fifo_remove(pkt->send_queue, pkt, pkt->fifo_gc);
}

udx_stream_write_t *w = (udx_stream_write_t *) pkt->ctx;
Expand Down Expand Up @@ -1145,8 +1149,8 @@ relay_packet (udx_stream_t *stream, char *buf, ssize_t buf_len, int type, uint32
pkt->type = UDX_PACKET_STREAM_RELAY;
pkt->seq = seq;

udx__fifo_push(&(relay->socket->send_queue), pkt);

pkt->send_queue = &relay->socket->send_queue;
pkt->fifo_gc = udx__fifo_push(&relay->socket->send_queue, pkt);
update_poll(relay->socket);
}
}
Expand Down Expand Up @@ -1327,7 +1331,7 @@ remove_next (udx_fifo_t *f) {
}

void
udx__trigger_send_callback (udx_socket_t *socket, udx_packet_t *pkt) {
udx__trigger_send_callback (udx_packet_t *pkt) {
if (pkt->type == UDX_PACKET_SEND) {
udx_socket_send_t *req = pkt->ctx;

Expand Down Expand Up @@ -1589,8 +1593,8 @@ udx_socket_send_ttl (udx_socket_send_t *req, udx_socket_t *handle, const uv_buf_

pkt->bufs[0] = bufs[0];

pkt->send_queue = &handle->send_queue;
pkt->fifo_gc = udx__fifo_push(&(handle->send_queue), pkt);

return update_poll(handle);
}

Expand Down Expand Up @@ -1641,7 +1645,9 @@ udx_socket_close (udx_socket_t *handle, udx_socket_close_cb cb) {

// stream packet, allow them to flush, by requeueing them
// flips the order but these are all state packets so whatevs
udx__fifo_push(&(handle->send_queue), pkt);

pkt->send_queue = &handle->send_queue;
pkt->fifo_gc = udx__fifo_push(&handle->send_queue, pkt);
}

if (handle->send_queue.len == 0) {
Expand Down Expand Up @@ -1680,6 +1686,7 @@ udx_stream_init (udx_t *udx, udx_stream_t *handle, uint32_t local_id, udx_stream
handle->mtu_max = UDX_MTU_MAX; // revised in connect()

uv_timer_init(udx->loop, &handle->mtu_raise_timer);
handle->mtu_raise_timer.data = handle;

handle->seq = 0;
handle->ack = 0;
Expand Down Expand Up @@ -1900,8 +1907,12 @@ udx_stream_check_timeouts (udx_stream_t *handle) {
}

int
udx_stream_change_remote (udx_stream_t *stream, uint32_t remote_id, const struct sockaddr *remote_addr, udx_stream_remote_changed_cb on_remote_changed) {
udx_stream_change_remote (udx_stream_t *stream, udx_socket_t *socket, uint32_t remote_id, const struct sockaddr *remote_addr, udx_stream_remote_changed_cb on_remote_changed) {
assert(stream->status & UDX_STREAM_CONNECTED);

// the since the udx_t object stores streams_by_id, we cannot migrate streams across udx objects
// the local id's of different udx streams may collide.
assert(socket->udx == stream->socket->udx);
if (!(stream->status & UDX_STREAM_CONNECTED)) {
return UV_EINVAL;
}
Expand Down Expand Up @@ -1929,6 +1940,8 @@ udx_stream_change_remote (udx_stream_t *stream, uint32_t remote_id, const struct

stream->remote_id = remote_id;

stream->socket = socket;

if (stream->seq != stream->remote_acked) {
stream->remote_changing = true;
stream->seq_on_remote_changed = stream->seq;
Expand Down Expand Up @@ -2022,7 +2035,8 @@ udx_stream_send (udx_stream_send_t *req, udx_stream_t *handle, const uv_buf_t bu
pkt->is_retransmit = 0;
pkt->transmits = 0;

pkt->fifo_gc = udx__fifo_push(&(socket->send_queue), pkt);
pkt->send_queue = &socket->send_queue;
pkt->fifo_gc = udx__fifo_push(&socket->send_queue, pkt);
udx__fifo_push(&(handle->unordered), pkt);

return update_poll(socket);
Expand Down Expand Up @@ -2108,7 +2122,8 @@ udx_stream_destroy (udx_stream_t *handle) {

handle->seq++;

udx__fifo_push(&(handle->socket->send_queue), pkt);
pkt->send_queue = &handle->socket->send_queue;
pkt->fifo_gc = udx__fifo_push(&(handle->socket->send_queue), pkt);
udx__fifo_push(&(handle->unordered), pkt);

int err = update_poll(handle->socket);
Expand Down
4 changes: 2 additions & 2 deletions test/stream-change-remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) {
// swap to relay 1/3 of the way into the stream

if (nbytes_read > (NBYTES_TO_SEND / 3) && !changed) {
e = udx_stream_change_remote(&astream, 4, (struct sockaddr *) &daddr, on_remote_change);
e = udx_stream_change_remote(&astream, &bsock, 4, (struct sockaddr *) &daddr, on_remote_change);
assert(e == 0 && "reconnect");

e = udx_stream_change_remote(&dstream, 1, (struct sockaddr *) &aaddr, on_remote_change);
e = udx_stream_change_remote(&dstream, &csock, 1, (struct sockaddr *) &aaddr, on_remote_change);
assert(e == 0 && "reconnect");

changed = true;
Expand Down

0 comments on commit 1c06689

Please sign in to comment.