diff --git a/src/udx.c b/src/udx.c index 1957557..b0c9614 100644 --- a/src/udx.c +++ b/src/udx.c @@ -215,7 +215,6 @@ socket_write_wanted (udx_socket_t *socket) { udx_stream_t *stream; udx__link_foreach(socket->streams, stream) { - assert(stream->socket == socket); if (stream_write_wanted(stream)) { return true; } @@ -1883,8 +1882,20 @@ send_packets (udx_socket_t *socket) { udx_stream_t *stream; udx__link_foreach(socket->streams, stream) { + // in case of re-entry, the stream might be closed, ie both us and next one in line was destroyed + // if so no just ignore + if (stream->status & UDX_STREAM_CLOSED) continue; + assert(stream->socket == socket); + if (!send_stream_packets(socket, stream)) return; + + // the above could have triggered a re-entry moving this stream to another socket (change_remote) + // if so just restart, unlikely + if (stream->socket != socket) { + stream = socket->streams; + if (!stream || !send_stream_packets(socket, stream)) return; + } } } @@ -2464,6 +2475,8 @@ set_stream_socket (udx_stream_t *stream, udx_socket_t *socket) { udx_socket_t *prev = stream->socket; + // technically its unsafe to remove and add it to another queue + // if iterating the queue we removed from. if (prev == NULL) { udx_t *udx = stream->udx; udx__link_remove(udx->streams, stream); @@ -2477,11 +2490,14 @@ set_stream_socket (udx_stream_t *stream, udx_socket_t *socket) { int udx_stream_change_remote (udx_stream_t *stream, udx_socket_t *socket, uint32_t remote_id, const struct sockaddr *remote_addr, udx_stream_remote_changed_cb on_remote_changed) { - assert(stream->status & UDX_STREAM_CONNECTED); - // the since the udx_t object stores streams_by_id, we cannot migrate streams across udx objects // the local id's of different udx streams may collide. assert(socket->udx == stream->socket->udx); + + if (stream->status & UDX_STREAM_DEAD || stream->udx->teardown) { + return UV_EINVAL; + } + if (!(stream->status & UDX_STREAM_CONNECTED)) { return UV_EINVAL; } @@ -2531,6 +2547,10 @@ udx_stream_change_remote (udx_stream_t *stream, udx_socket_t *socket, uint32_t r int udx_stream_connect (udx_stream_t *stream, udx_socket_t *socket, uint32_t remote_id, const struct sockaddr *remote_addr) { + if (stream->status & UDX_STREAM_DEAD || stream->udx->teardown) { + return UV_EINVAL; + } + if (stream->status & UDX_STREAM_CONNECTED) { return UV_EISCONN; }