From a9f5af9641e459342a7e2c9a95e8da5aa104492b Mon Sep 17 00:00:00 2001 From: jthomas43 <129407891+jthomas43@users.noreply.github.com> Date: Tue, 3 Dec 2024 08:43:17 -0500 Subject: [PATCH] defer adding padding to probe packets until send_packet() to simplify code (#227) --- include/udx.h | 4 --- src/udx.c | 86 +++++++++++++++++++++++---------------------------- 2 files changed, 39 insertions(+), 51 deletions(-) diff --git a/include/udx.h b/include/udx.h index 7b52293..fac5e28 100644 --- a/include/udx.h +++ b/include/udx.h @@ -315,10 +315,6 @@ struct udx_packet_s { // just alloc it in place here, easier to manage char header[UDX_HEADER_SIZE]; unsigned short nbufs; - - // inefficient - only relevant for stream_t packets - unsigned short nwbufs; - udx_stream_write_buf_t **wbufs; }; struct udx_socket_send_s { diff --git a/src/udx.c b/src/udx.c index 08538f7..5bfc456 100644 --- a/src/udx.c +++ b/src/udx.c @@ -418,6 +418,11 @@ on_bytes_acked (udx_stream_write_buf_t *wbuf, size_t bytes, bool cancelled) { } } +static udx_stream_write_buf_t ** +wbufs_offset (udx_packet_t *pkt) { + return (udx_stream_write_buf_t **) (((uv_buf_t *) (pkt + 1)) + pkt->nbufs); +} + static void clear_outgoing_packets (udx_stream_t *stream) { // todo: skip the math, and just @@ -433,14 +438,12 @@ clear_outgoing_packets (udx_stream_t *stream) { assert(pkt->nbufs >= 2); - int diff = pkt->nbufs - pkt->nwbufs; - assert(diff == 1 || diff == 2); // either header buf, or header + padding buff - uv_buf_t *bufs = (uv_buf_t *) (pkt + 1); + udx_stream_write_buf_t **wbufs = wbufs_offset(pkt); - for (int i = 0; i < pkt->nwbufs; i++) { - size_t pkt_len = bufs[i + diff].len; - udx_stream_write_buf_t *wbuf = pkt->wbufs[i]; + for (int i = 1; i < pkt->nbufs; i++) { + size_t pkt_len = bufs[i].len; + udx_stream_write_buf_t *wbuf = wbufs[i - 1]; on_bytes_acked(wbuf, pkt_len, true); // todo: move into on_bytes_acked itself @@ -517,10 +520,6 @@ init_stream_packet (udx_packet_t *pkt, int type, udx_stream_t *stream, const uv_ pkt->nbufs = 1 + nuserbufs; bufs[0] = uv_buf_init((char *) &(pkt->header), UDX_HEADER_SIZE); - // for now, set when stream writes data - pkt->wbufs = NULL; - pkt->nwbufs = 0; - for (int i = 0; i < nuserbufs; i++) { bufs[i + 1] = userbufs[i]; pkt->size += userbufs[i].len; @@ -541,16 +540,6 @@ mtu_probeify_packet (udx_packet_t *pkt, int wanted_size) { return 0; } // debug_printf("mtu: probeify rid=%u seq=%u size=%u wanted=%d padding=%d\n", udx__swap_uint32_if_be(((unsigned int *) pkt->header)[1]), pkt->seq, pkt->size + header_size, wanted_size, padding_size); - static char probe_data[256] = {0}; - - uv_buf_t *bufs = (uv_buf_t *) (pkt + 1); - for (int i = pkt->nbufs; i > 1; i--) { - bufs[i] = bufs[i - 1]; - } - pkt->nbufs++; - - bufs[1].len = padding_size; - bufs[1].base = probe_data; pkt->header[3] = padding_size; pkt->is_mtu_probe = true; @@ -563,17 +552,6 @@ mtu_unprobeify_packet (udx_packet_t *pkt, udx_stream_t *stream) { assert(pkt->is_mtu_probe); pkt->header[3] = 0; - - uv_buf_t *bufs = (uv_buf_t *) (pkt + 1); - - // [header][padding][2][3] 4 = nbufs - - for (int i = 2; i < pkt->nbufs; i++) { - bufs[i - 1] = bufs[i]; - } - - pkt->nbufs--; - pkt->is_mtu_probe = false; debug_printf("mtu: probe failed rid=%u %d/%d", stream->remote_id, stream->mtu_probe_count, UDX_MTU_MAX_PROBES); @@ -1063,13 +1041,12 @@ ack_packet (udx_stream_t *stream, uint32_t seq, int sack) { stream->reordering_seen = true; } - int diff = pkt->nbufs - pkt->nwbufs; - uv_buf_t *bufs = (uv_buf_t *) (pkt + 1); + udx_stream_write_buf_t **wbufs = wbufs_offset(pkt); - for (int i = 0; i < pkt->nwbufs; i++) { - size_t pkt_len = bufs[i + diff].len; - udx_stream_write_buf_t *wbuf = pkt->wbufs[i]; + for (int i = 1; i < pkt->nbufs; i++) { + size_t pkt_len = bufs[i].len; + udx_stream_write_buf_t *wbuf = wbufs[i - 1]; on_bytes_acked(wbuf, pkt_len, false); @@ -1484,7 +1461,25 @@ send_packet (udx_socket_t *socket, udx_packet_t *pkt) { pkt->dest_len = sizeof(struct sockaddr_in6); } - ssize_t rc = udx__sendmsg(socket, (uv_buf_t *) (pkt + 1), pkt->nbufs, (struct sockaddr *) &(pkt->dest), pkt->dest_len); + uv_buf_t *bufs = (uv_buf_t *) (pkt + 1); + int nbufs = pkt->nbufs; + uv_buf_t _bufs[UDX_MAX_COMBINED_WRITES + 2]; + + if (pkt->is_mtu_probe) { + int padding_size = pkt->header[3]; + static char probe_data[256] = {0}; + _bufs[0] = bufs[0]; + _bufs[1].base = probe_data; + _bufs[1].len = padding_size; + + for (int i = 1; i < pkt->nbufs; i++) { + _bufs[1 + i] = bufs[i]; + } + bufs = _bufs; + nbufs = nbufs + 1; + } + + ssize_t rc = udx__sendmsg(socket, bufs, nbufs, (struct sockaddr *) &(pkt->dest), pkt->dest_len); if (adjust_ttl) uv_udp_set_ttl((uv_udp_t *) socket, socket->ttl); @@ -1754,17 +1749,12 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) { } assert(header_flag & UDX_HEADER_DATA_OR_END); - int nbufs = 2 + nwbufs; // extra for 1.header 2.padding + int nbufs = 1 + nwbufs; // extra buf for header udx_packet_t *pkt = malloc(sizeof(udx_packet_t) + sizeof(uv_buf_t) * nbufs + sizeof(void *) * nwbufs); init_stream_packet(pkt, header_flag, stream, bufs, nwbufs); - pkt->wbufs = (udx_stream_write_buf_t **) (((uv_buf_t *) (pkt + 1)) + nbufs); - pkt->nwbufs = nwbufs; - - for (int i = 0; i < nwbufs; i++) { - pkt->wbufs[i] = wbufs[i]; - } + memcpy(wbufs_offset(pkt), wbufs, sizeof(wbufs[0]) * nwbufs); pkt->ttl = 0; @@ -1773,13 +1763,14 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) { ssize_t rc = send_packet(socket, pkt); if (rc == UV_EAGAIN) { - for (int i = 0; i < pkt->nwbufs; i++) { - udx_stream_write_buf_t *wbuf = pkt->wbufs[i]; + int i = nwbufs; + while (nwbufs--) { + udx_stream_write_buf_t *wbuf = wbufs[i]; if (wbuf->bytes_acked + wbuf->bytes_inflight == wbuf->buf.len) { udx__queue_head(&stream->write_queue, &wbuf->queue); } - wbuf->bytes_inflight -= bufs[(pkt->is_mtu_probe ? 2 : 1) + i].len; + wbuf->bytes_inflight -= bufs[i + 1].len; } free(pkt); @@ -2132,6 +2123,7 @@ udx_socket_send_ttl (udx_socket_send_t *req, udx_socket_t *socket, const uv_buf_ memcpy(&(pkt->dest), dest, pkt->dest_len); pkt->lost = false; pkt->retransmitted = false; + pkt->is_mtu_probe = false; pkt->transmits = 0; pkt->rto_timeouts = 0; pkt->nbufs = 1;