Skip to content

Commit

Permalink
Test destroying old streams on changing remote (#145)
Browse files Browse the repository at this point in the history
  • Loading branch information
kasperisager authored Sep 13, 2023
1 parent ecf1330 commit 1d79877
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 14 deletions.
1 change: 1 addition & 0 deletions include/udx.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ struct udx_stream_s {
udx_t *udx;
udx_socket_t *socket;

bool relayed;
udx_stream_t *relay_to;
udx_cirbuf_t relaying_streams;

Expand Down
6 changes: 4 additions & 2 deletions src/udx.c
Original file line number Diff line number Diff line change
Expand Up @@ -1662,6 +1662,7 @@ udx_stream_init (udx_t *udx, udx_stream_t *handle, uint32_t local_id, udx_stream
handle->out_of_order = 0;
handle->recovery = 0;
handle->socket = NULL;
handle->relayed = false;
handle->relay_to = NULL;
handle->udx = udx;

Expand Down Expand Up @@ -1992,8 +1993,9 @@ udx_stream_connect (udx_stream_t *handle, udx_socket_t *socket, uint32_t remote_

int
udx_stream_relay_to (udx_stream_t *handle, udx_stream_t *destination) {
if (handle->relay_to != NULL) return UV_EINVAL;
if (handle->relayed) return UV_EINVAL;

handle->relayed = true;
handle->relay_to = destination;

udx__cirbuf_set(&(destination->relaying_streams), (udx_cirbuf_val_t *) handle);
Expand Down Expand Up @@ -2087,7 +2089,7 @@ udx_stream_destroy (udx_stream_t *handle) {
// todo: can we delete this line now that the queue in front of the destroy packet is shorter?
clear_outgoing_packets(handle);

if (handle->relay_to != NULL) {
if (handle->relayed) {
handle->status |= UDX_STREAM_DESTROYED;
close_maybe(handle, 0);
return 0;
Expand Down
36 changes: 24 additions & 12 deletions test/stream-change-remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

#include "../include/udx.h"

#define NBYTES_TO_SEND 100000

uv_loop_t loop;
udx_t udx;

Expand All @@ -28,38 +30,47 @@ udx_stream_write_t req;

bool ack_called = false;
bool read_called = false;
int remote_changed_called = 0;

#define NBYTES_TO_SEND 100000
size_t nbytes_read;

void
on_ack (udx_stream_write_t *r, int status, int unordered) {
uv_stop(&loop);

ack_called = true;
}

bool on_remote_changed_called = false;

void
on_remote_change (udx_stream_t *s) {
printf("on_remote_change!\n");
on_remote_changed_called = true;
}
remote_changed_called++;

if (remote_changed_called == 2) {
int e;

bool changed = false;
e = udx_stream_destroy(&bstream);
assert(e == 0);

e = udx_stream_destroy(&cstream);
assert(e == 0);
}
}

void
on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) {
int e;

static bool changed = false;

nbytes_read += read_len;

// swap to relay 1/3 of the way into the stream

if (nbytes_read > (NBYTES_TO_SEND / 3) && !changed) {
printf("switching remotes\n");
e = udx_stream_change_remote(&astream, 4, (struct sockaddr *) &daddr, on_remote_change);
assert(e == 0 && "reconnect");

// change to talk directly to a
int e = udx_stream_change_remote(&dstream, 1, (struct sockaddr *) &aaddr, on_remote_change);
e = udx_stream_change_remote(&dstream, 1, (struct sockaddr *) &aaddr, on_remote_change);
assert(e == 0 && "reconnect");

changed = true;
Expand Down Expand Up @@ -141,11 +152,12 @@ main () {
assert(e == 0);

uv_buf_t buf = uv_buf_init(malloc(NBYTES_TO_SEND), NBYTES_TO_SEND);
udx_stream_write(&req, &dstream, &buf, 1, on_ack);
e = udx_stream_write(&req, &dstream, &buf, 1, on_ack);
assert(e && "drained");

uv_run(&loop, UV_RUN_DEFAULT);

assert(ack_called && read_called && on_remote_changed_called && nbytes_read == NBYTES_TO_SEND);
assert(ack_called && read_called && remote_changed_called && nbytes_read == NBYTES_TO_SEND);

return 0;
}

0 comments on commit 1d79877

Please sign in to comment.