Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: auto-tune stream receive window #1868

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
24 changes: 12 additions & 12 deletions neqo-http3/src/connection_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1292,7 +1292,7 @@ mod tests {
use neqo_qpack::{encoder::QPackEncoder, QpackSettings};
use neqo_transport::{
CloseReason, ConnectionEvent, ConnectionParameters, Output, State, StreamId, StreamType,
Version, MIN_INITIAL_PACKET_SIZE, RECV_BUFFER_SIZE, SEND_BUFFER_SIZE,
Version, INITIAL_RECV_BUFFER_SIZE, INITIAL_SEND_BUFFER_SIZE, MIN_INITIAL_PACKET_SIZE,
};
use test_fixture::{
anti_replay, default_server_h3, fixture_init, new_server, now,
Expand Down Expand Up @@ -2747,7 +2747,7 @@ mod tests {
if let ConnectionEvent::RecvStreamReadable { stream_id } = e {
if stream_id == request_stream_id {
// Read the DATA frame.
let mut buf = vec![1_u8; RECV_BUFFER_SIZE];
let mut buf = vec![1_u8; INITIAL_RECV_BUFFER_SIZE];
let (amount, fin) = server.conn.stream_recv(stream_id, &mut buf).unwrap();
assert!(fin);
assert_eq!(
Expand Down Expand Up @@ -2821,7 +2821,7 @@ mod tests {
assert_eq!(sent, Ok(first_frame.len()));

// The second frame cannot fit.
let sent = client.send_data(request_stream_id, &vec![0_u8; SEND_BUFFER_SIZE]);
let sent = client.send_data(request_stream_id, &vec![0_u8; INITIAL_SEND_BUFFER_SIZE]);
assert_eq!(sent, Ok(expected_second_data_frame.len()));

// Close stream.
Expand All @@ -2830,7 +2830,7 @@ mod tests {
let mut out = client.process(None, now());
// We need to loop a bit until all data has been sent. Once for every 1K
// of data.
for _i in 0..SEND_BUFFER_SIZE / 1000 {
for _i in 0..INITIAL_SEND_BUFFER_SIZE / 1000 {
out = server.conn.process(out.as_dgram_ref(), now());
out = client.process(out.as_dgram_ref(), now());
}
Expand All @@ -2840,7 +2840,7 @@ mod tests {
if let ConnectionEvent::RecvStreamReadable { stream_id } = e {
if stream_id == request_stream_id {
// Read DATA frames.
let mut buf = vec![1_u8; RECV_BUFFER_SIZE];
let mut buf = vec![1_u8; INITIAL_RECV_BUFFER_SIZE];
let (amount, fin) = server.conn.stream_recv(stream_id, &mut buf).unwrap();
assert!(fin);
assert_eq!(
Expand Down Expand Up @@ -2893,7 +2893,7 @@ mod tests {
// After the first frame there is exactly 63+2 bytes left in the send buffer.
#[test]
fn fetch_two_data_frame_second_63bytes() {
let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 88);
let (buf, hdr) = alloc_buffer(INITIAL_SEND_BUFFER_SIZE - 88);
fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x3f], &[0_u8; 63]);
}

Expand All @@ -2902,7 +2902,7 @@ mod tests {
// but we can only send 63 bytes.
#[test]
fn fetch_two_data_frame_second_63bytes_place_for_66() {
let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 89);
let (buf, hdr) = alloc_buffer(INITIAL_SEND_BUFFER_SIZE - 89);
fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x3f], &[0_u8; 63]);
}

Expand All @@ -2911,15 +2911,15 @@ mod tests {
// but we can only send 64 bytes.
#[test]
fn fetch_two_data_frame_second_64bytes_place_for_67() {
let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 90);
let (buf, hdr) = alloc_buffer(INITIAL_SEND_BUFFER_SIZE - 90);
fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x40, 0x40], &[0_u8; 64]);
}

// Send 2 frames. For the second one we can only send 16383 bytes.
// After the first frame there is exactly 16383+3 bytes left in the send buffer.
#[test]
fn fetch_two_data_frame_second_16383bytes() {
let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 16409);
let (buf, hdr) = alloc_buffer(INITIAL_SEND_BUFFER_SIZE - 16409);
fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x7f, 0xff], &[0_u8; 16383]);
}

Expand All @@ -2928,7 +2928,7 @@ mod tests {
// send 16383 bytes.
#[test]
fn fetch_two_data_frame_second_16383bytes_place_for_16387() {
let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 16410);
let (buf, hdr) = alloc_buffer(INITIAL_SEND_BUFFER_SIZE - 16410);
fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x7f, 0xff], &[0_u8; 16383]);
}

Expand All @@ -2937,7 +2937,7 @@ mod tests {
// send 16383 bytes.
#[test]
fn fetch_two_data_frame_second_16383bytes_place_for_16388() {
let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 16411);
let (buf, hdr) = alloc_buffer(INITIAL_SEND_BUFFER_SIZE - 16411);
fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x7f, 0xff], &[0_u8; 16383]);
}

Expand All @@ -2946,7 +2946,7 @@ mod tests {
// 16384 bytes.
#[test]
fn fetch_two_data_frame_second_16384bytes_place_for_16389() {
let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 16412);
let (buf, hdr) = alloc_buffer(INITIAL_SEND_BUFFER_SIZE - 16412);
fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x80, 0x0, 0x40, 0x0], &[0_u8; 16384]);
}

Expand Down
22 changes: 17 additions & 5 deletions neqo-transport/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1986,7 +1986,13 @@ impl Connection {
&mut self,
builder: &mut PacketBuilder,
tokens: &mut Vec<RecoveryToken>,
now: Instant,
) {
let rtt = self.paths.primary().map_or_else(
|| RttEstimate::default().estimate(),
|p| p.borrow().rtt().estimate(),
);

let stats = &mut self.stats.borrow_mut();
let frame_stats = &mut stats.frame_tx;
if self.role == Role::Server {
Expand All @@ -2001,7 +2007,7 @@ impl Connection {
TransmissionPriority::Important,
] {
self.streams
.write_frames(prio, builder, tokens, frame_stats);
.write_frames(prio, builder, tokens, frame_stats, now, rtt);
if builder.is_full() {
return;
}
Expand All @@ -2020,7 +2026,7 @@ impl Connection {

for prio in [TransmissionPriority::High, TransmissionPriority::Normal] {
self.streams
.write_frames(prio, builder, tokens, &mut stats.frame_tx);
.write_frames(prio, builder, tokens, &mut stats.frame_tx, now, rtt);
if builder.is_full() {
return;
}
Expand Down Expand Up @@ -2050,8 +2056,14 @@ impl Connection {
return;
}

self.streams
.write_frames(TransmissionPriority::Low, builder, tokens, frame_stats);
self.streams.write_frames(
TransmissionPriority::Low,
builder,
tokens,
frame_stats,
now,
rtt,
);

#[cfg(test)]
if let Some(w) = &mut self.test_frame_writer {
Expand Down Expand Up @@ -2158,7 +2170,7 @@ impl Connection {

if primary {
if space == PacketNumberSpace::ApplicationData {
self.write_appdata_frames(builder, &mut tokens);
self.write_appdata_frames(builder, &mut tokens, now);
} else {
let stats = &mut self.stats.borrow_mut().frame_tx;
self.crypto.write_frame(space, builder, &mut tokens, stats);
Expand Down
10 changes: 6 additions & 4 deletions neqo-transport/src/connection/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{cmp::max, time::Duration};
pub use crate::recovery::FAST_PTO_SCALE;
use crate::{
connection::{ConnectionIdManager, Role, LOCAL_ACTIVE_CID_LIMIT},
recv_stream::RECV_BUFFER_SIZE,
recv_stream::INITIAL_RECV_BUFFER_SIZE,
rtt::GRANULARITY,
stream_id::StreamType,
tparams::{self, PreferredAddress, TransportParameter, TransportParametersHandler},
Expand Down Expand Up @@ -87,9 +87,11 @@ impl Default for ConnectionParameters {
versions: VersionConfig::default(),
cc_algorithm: CongestionControlAlgorithm::NewReno,
max_data: LOCAL_MAX_DATA,
max_stream_data_bidi_remote: u64::try_from(RECV_BUFFER_SIZE).unwrap(),
max_stream_data_bidi_local: u64::try_from(RECV_BUFFER_SIZE).unwrap(),
max_stream_data_uni: u64::try_from(RECV_BUFFER_SIZE).unwrap(),
// TODO: Why start with 1 MiB? Why not start lower?
max_stream_data_bidi_remote: u64::try_from(INITIAL_RECV_BUFFER_SIZE).unwrap(),
// TODO: Why start with 1 MiB? Why not start lower?
max_stream_data_bidi_local: u64::try_from(INITIAL_RECV_BUFFER_SIZE).unwrap(),
max_stream_data_uni: u64::try_from(INITIAL_RECV_BUFFER_SIZE).unwrap(),
max_streams_bidi: LOCAL_STREAM_LIMIT_BIDI,
max_streams_uni: LOCAL_STREAM_LIMIT_UNI,
ack_ratio: DEFAULT_ACK_RATIO,
Expand Down
16 changes: 7 additions & 9 deletions neqo-transport/src/connection/tests/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use super::{
};
use crate::{
events::ConnectionEvent,
recv_stream::RECV_BUFFER_SIZE,
send_stream::{OrderGroup, SendStreamState, SEND_BUFFER_SIZE},
recv_stream::INITIAL_RECV_BUFFER_SIZE,
send_stream::{OrderGroup, SendStreamState, INITIAL_SEND_BUFFER_SIZE},
streams::{SendOrder, StreamOrder},
tparams::{self, TransportParameter},
CloseReason,
Expand Down Expand Up @@ -437,19 +437,17 @@ fn max_data() {
client.streams.handle_max_data(100_000_000);
assert_eq!(
client.stream_avail_send_space(stream_id).unwrap(),
SEND_BUFFER_SIZE - SMALL_MAX_DATA
INITIAL_SEND_BUFFER_SIZE - SMALL_MAX_DATA
);

// Increase max stream data. Avail space now limited by tx buffer
// Increase max stream data. Avail space no longer limited now.
client
.streams
.get_send_stream_mut(stream_id)
.unwrap()
.set_max_stream_data(100_000_000);
assert_eq!(
client.stream_avail_send_space(stream_id).unwrap(),
SEND_BUFFER_SIZE - SMALL_MAX_DATA + 4096
);
// TODO: Improve assert.
assert!(client.stream_avail_send_space(stream_id).unwrap() > INITIAL_SEND_BUFFER_SIZE);

let evts = client.events().collect::<Vec<_>>();
assert_eq!(evts.len(), 1);
Expand Down Expand Up @@ -723,7 +721,7 @@ fn stream_data_blocked_generates_max_stream_data() {
}
written += amount;
}
assert_eq!(written, RECV_BUFFER_SIZE);
assert_eq!(written, INITIAL_RECV_BUFFER_SIZE);
}

/// See <https://github.com/mozilla/neqo/issues/871>
Expand Down
58 changes: 55 additions & 3 deletions neqo-transport/src/fc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
// into flow control frames needing to be sent to the remote.

use std::{
cmp::min,
fmt::Debug,
ops::{Deref, DerefMut, Index, IndexMut},
time::{Duration, Instant},
};

use neqo_common::{qtrace, Role};
use neqo_common::{qdebug, qtrace, Role};

use crate::{
frame::{
Expand All @@ -27,6 +29,12 @@ use crate::{
Error, Res,
};

/// Limit for the maximum amount of bytes active on a single stream, i.e. limit
/// for the size of the stream receive window.
//
// TODO: Find reasonable limit.
const STREAM_MAX_ACTIVE_LIMIT: u64 = 10 * 1024 * 1024;

#[derive(Debug)]
pub struct SenderFlowControl<T>
where
Expand Down Expand Up @@ -66,6 +74,8 @@ where

/// Update the maximum. Returns `Some` with the updated available flow
/// control if the change was an increase and `None` otherwise.
//
// TODO: Impose a limit? Otherwise attacker can set large max thus local node allocates large send buffer.
pub fn update(&mut self, limit: u64) -> Option<usize> {
debug_assert!(limit < u64::MAX);
if limit > self.limit {
Expand Down Expand Up @@ -206,12 +216,15 @@ where
{
/// The thing that we're counting for.
subject: T,
// TODO: Update. The receive buffer is no longer relevant.
/// The maximum amount of items that can be active (e.g., the size of the receive buffer).
max_active: u64,
/// Last max allowed sent.
max_allowed: u64,
// TODO: Not ideal as it adds an Option for all T, even though only needed for T=StreamId.
max_allowed_sent_at: Option<Instant>,
/// Item received, but not retired yet.
/// This will be used for byte flow control: each stream will remember is largest byte
/// This will be used for byte flow control: each stream will remember its largest byte
/// offset received and session flow control will remember the sum of all bytes consumed
/// by all streams.
consumed: u64,
Expand All @@ -230,20 +243,24 @@ where
subject,
max_active: max,
max_allowed: max,
// TODO: Starting with None has us loose a round-trip before
// increasing stream receive window. Better to start with now.
max_allowed_sent_at: None,
consumed: 0,
retired: 0,
frame_pending: false,
}
}

/// Retired some items and maybe send flow control
/// Retire some items and maybe send flow control
/// update.
pub fn retire(&mut self, retired: u64) {
if retired <= self.retired {
return;
}

self.retired = retired;
// TODO: Move the `/ 2` logic into function? It is duplicated, no?
if self.retired + self.max_active / 2 > self.max_allowed {
self.frame_pending = true;
}
Expand Down Expand Up @@ -345,15 +362,48 @@ impl Default for ReceiverFlowControl<()> {
}

impl ReceiverFlowControl<StreamId> {
// TODO: Should as well apply to Connection flow control? Currently just using a huge limit.
// https://github.com/mozilla/neqo/blob/e44c472487b663ea4892bd2ff2786919d20329a2/neqo-transport/src/connection/params.rs#L21
pub fn write_frames(
&mut self,
builder: &mut PacketBuilder,
tokens: &mut Vec<RecoveryToken>,
stats: &mut FrameStats,
now: Instant,
rtt: Duration,
) {
if !self.frame_needed() {
return;
}

// TODO: Remove. Debugging only for now.
let previous_retired = self.max_allowed.saturating_sub(self.max_active);
if let Some(previous) = self.max_allowed_sent_at {
let secs = (now - previous).as_secs_f64();
let bits = (self.retired - previous_retired) as f64 * 8.0;
let mbits = (bits / secs) / 1024.0 / 1024.0;
qdebug!("{mbits} mbit/s");
}

// Auto-tune max_active.
//
// TODO: Should one also auto-tune down?
//
// TODO: Deduplicate the /2 logic. Used in other places as well.
if self.retired + self.max_active / 2 > self.max_allowed
&& self
.max_allowed_sent_at
.is_some_and(|at| now - at < rtt * 2)
&& self.max_active < STREAM_MAX_ACTIVE_LIMIT
{
let prev_max_active = self.max_active;
self.max_active = min(self.max_active * 2, STREAM_MAX_ACTIVE_LIMIT);
qdebug!(
"Increasing max stream receive window: previous max_active: {} MiB new max_active: {} MiB now: {now:?} rtt: {rtt:?} stream_id: {}",
prev_max_active / 1024 / 1024, self.max_active / 1024 / 1024, self.subject,
);
}

let max_allowed = self.next_limit();
if builder.write_varint_frame(&[
FRAME_TYPE_MAX_STREAM_DATA,
Expand All @@ -367,6 +417,8 @@ impl ReceiverFlowControl<StreamId> {
}));
self.frame_sent(max_allowed);
}
// TODO: Document why outside of if.
self.max_allowed_sent_at = Some(now);
}

pub fn add_retired(&mut self, count: u64) {
Expand Down
4 changes: 2 additions & 2 deletions neqo-transport/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ pub use self::{
frame::CloseError,
packet::MIN_INITIAL_PACKET_SIZE,
quic_datagrams::DatagramTracking,
recv_stream::{RecvStreamStats, RECV_BUFFER_SIZE},
send_stream::{SendStreamStats, SEND_BUFFER_SIZE},
recv_stream::{RecvStreamStats, INITIAL_RECV_BUFFER_SIZE},
send_stream::{SendStreamStats, INITIAL_SEND_BUFFER_SIZE},
stats::Stats,
stream_id::{StreamId, StreamType},
version::Version,
Expand Down
Loading
Loading