From 17508915b43cd8e6ca3e563c8f8c6b4b4a716b09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adolfo=20Ochagav=C3=ADa?= Date: Tue, 14 Mar 2023 08:29:12 +0100 Subject: [PATCH] Implement PLPMTUD Sponsored by Stormshield --- quinn-proto/src/config.rs | 97 ++- quinn-proto/src/connection/datagrams.rs | 3 +- quinn-proto/src/connection/mod.rs | 150 +++-- quinn-proto/src/connection/pacing.rs | 14 +- quinn-proto/src/connection/packet_builder.rs | 3 - quinn-proto/src/connection/paths.rs | 636 ++++++++++++++++++- quinn-proto/src/connection/stats.rs | 5 + quinn-proto/src/lib.rs | 4 +- quinn-proto/src/tests/mod.rs | 281 ++++++++ quinn-proto/src/tests/util.rs | 57 +- 10 files changed, 1185 insertions(+), 65 deletions(-) diff --git a/quinn-proto/src/config.rs b/quinn-proto/src/config.rs index da54a925ef..dd0a575560 100644 --- a/quinn-proto/src/config.rs +++ b/quinn-proto/src/config.rs @@ -10,6 +10,7 @@ use crate::{ congestion, crypto::{self, HandshakeTokenKey, HmacKey}, VarInt, VarIntBoundsExceeded, DEFAULT_SUPPORTED_VERSIONS, INITIAL_MAX_UDP_PAYLOAD_SIZE, + MAX_UDP_PAYLOAD, }; /// Parameters governing the core QUIC state machine @@ -37,6 +38,7 @@ pub struct TransportConfig { pub(crate) time_threshold: f32, pub(crate) initial_rtt: Duration, pub(crate) initial_max_udp_payload_size: u16, + pub(crate) plpmtud_config: Option, pub(crate) persistent_congestion_threshold: u32, pub(crate) keep_alive_interval: Option, @@ -154,23 +156,42 @@ impl TransportConfig { self } - /// UDP payload size that the network must be capable of carrying - /// - /// Effective max UDP payload size may change over the life of the connection in the future due - /// to path MTU detection, but will never fall below this value. + /// The initial value to be used as the maximum UDP payload size before running PLPMTUD (see + /// [`TransportConfig::plpmtud_config`]). /// /// Must be at least 1200, which is the default, and known to be safe for typical internet /// applications. Larger values are more efficient, but increase the risk of unpredictable - /// catastrophic packet loss due to exceeding the network path's IP MTU. + /// catastrophic packet loss due to exceeding the network path's IP MTU. If the provided value + /// is higher than what the network path actually supports, packet loss will eventually trigger + /// black hole detection and bring it down to 1200. /// /// Real-world MTUs can vary according to ISP, VPN, and properties of intermediate network links - /// outside of either endpoint's control. Extreme caution should be used when raising this value - /// for connections outside of private networks where these factors are fully controlled. + /// outside of either endpoint's control. Caution should be used when raising this value for + /// connections outside of private networks where these factors are fully controlled. pub fn initial_max_udp_payload_size(&mut self, value: u16) -> &mut Self { self.initial_max_udp_payload_size = value.max(INITIAL_MAX_UDP_PAYLOAD_SIZE); self } + /// Specifies the PLPMTUD config (see [`PlpmtudConfig`] for details). Defaults to `None`, which + /// disables PLPMTUD altogether. + /// + /// # Important + /// + /// PLPMTUD support is platform-specific. It should only be enabled if the IP DF bit of outgoing + /// UDP packets is being set. Failure to do so might result in the incorrect discovery of too + /// high PMTUs, leading to increased packet loss (e.g. a high PMTU may be incorrectly assumed + /// if a too-big probe was split in transit, yet was properly reassembled at the remote + /// endpoint). + /// + /// If you are using the `quinn` crate, you can safely enable PLPMTUD on Windows and Linux. If + /// you are using `quinn-proto`, you will need to ensure your IO layer properly sets the IP DF + /// bit. + pub fn plpmtud_config(&mut self, value: Option) -> &mut Self { + self.plpmtud_config = value; + self + } + /// Number of consecutive PTOs after which network is considered to be experiencing persistent congestion. pub fn persistent_congestion_threshold(&mut self, value: u32) -> &mut Self { self.persistent_congestion_threshold = value; @@ -267,6 +288,7 @@ impl Default for TransportConfig { time_threshold: 9.0 / 8.0, initial_rtt: Duration::from_millis(333), // per spec, intentionally distinct from EXPECTED_RTT initial_max_udp_payload_size: INITIAL_MAX_UDP_PAYLOAD_SIZE, + plpmtud_config: None, persistent_congestion_threshold: 3, keep_alive_interval: None, @@ -316,6 +338,67 @@ impl fmt::Debug for TransportConfig { } } +/// Parameters governing PLPMTUD. +/// +/// PLPMTUD performs a binary search, trying to find the highest packet size that is still +/// supported by the current network path. The lower bound of the search is equal to +/// [`TransportConfig::initial_max_udp_payload_size`] in the initial PLPMTUD round, and is equal +/// to the currently discovered MTU in subsequent runs. The upper bound is determined by +/// [`PlpmtudConfig::max_udp_payload_size_upper_bound`]. +/// +/// If, at some point, the network path no longer accepts packets of the detected size, packet +/// loss will eventually trigger black hole detection and reset the detected MTU to 1200. In +/// that case, PLPMTUD will be triggered after [`PlpmtudConfig::black_hole_cooldown`] (ignoring the +/// timer that was set based on [`PlpmtudConfig::interval`]). +#[derive(Clone, Debug)] +pub struct PlpmtudConfig { + pub(crate) interval: Duration, + pub(crate) max_udp_payload_size_upper_bound: u16, + pub(crate) black_hole_cooldown: Duration, +} + +impl PlpmtudConfig { + /// Specifies the time to wait after completing PLPMTUD before starting a new MTU discovery + /// round. Defaults to 600 seconds, as recommended by + /// [RFC 8899](https://www.rfc-editor.org/rfc/rfc8899). + pub fn interval(&mut self, value: Duration) -> &mut Self { + self.interval = value; + self + } + + /// Specifies the upper bound to the max UDP payload size that PLPMTUD will search for. + /// + /// Defaults to 1452, to stay within Ethernet's MTU when using IPv4 and IPv6. The highest + /// allowed value is 65527, which corresponds to the maximum permitted UDP payload. + /// + /// It is safe to use an arbitrarily high upper bound, regardless of the network path's MTU. The + /// only drawback is that PLPMTUD might take more time to finish. + pub fn max_udp_payload_size_upper_bound(&mut self, value: u16) -> &mut Self { + self.max_udp_payload_size_upper_bound = value.min(MAX_UDP_PAYLOAD); + self + } + + /// Specifies the amount of time that PLPMTUD should wait after a black hole was detected before + /// running again. Defaults to one minute. + /// + /// Black hole detection can be spuriously triggered in case of congestion, so it makes sense to + /// try PLPMTUD again after a short period of time. + pub fn black_hole_cooldown(&mut self, value: Duration) -> &mut Self { + self.black_hole_cooldown = value; + self + } +} + +impl Default for PlpmtudConfig { + fn default() -> Self { + PlpmtudConfig { + interval: Duration::from_secs(600), + max_udp_payload_size_upper_bound: 1452, + black_hole_cooldown: Duration::from_secs(60), + } + } +} + /// Global configuration for the endpoint, affecting all connections /// /// Default values should be suitable for most internet applications. diff --git a/quinn-proto/src/connection/datagrams.rs b/quinn-proto/src/connection/datagrams.rs index 1277e86489..7d1c243e44 100644 --- a/quinn-proto/src/connection/datagrams.rs +++ b/quinn-proto/src/connection/datagrams.rs @@ -55,8 +55,7 @@ impl<'a> Datagrams<'a> { /// /// Not necessarily the maximum size of received datagrams. pub fn max_size(&self) -> Option { - // This is usually 1162 bytes, but we shouldn't document that without a doctest. - let max_size = self.conn.path.max_udp_payload_size as usize + let max_size = self.conn.path.current_mtu() as usize - 1 // flags byte - self.conn.rem_cids.active().len() - 4 // worst-case packet number size diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 299728a459..6b74621314 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -263,6 +263,8 @@ impl Connection { config.initial_rtt, config.congestion_controller_factory.build(now), config.initial_max_udp_payload_size, + None, + config.plpmtud_config.clone(), now, path_validated, ), @@ -428,8 +430,8 @@ impl Connection { SpaceId::Data, "PATH_CHALLENGE queued without 1-RTT keys" ); - let mut buf = Vec::with_capacity(self.path.max_udp_payload_size as usize); - let buf_capacity = self.path.max_udp_payload_size as usize; + let mut buf = Vec::with_capacity(self.path.current_mtu() as usize); + let buf_capacity = self.path.current_mtu() as usize; let mut builder = PacketBuilder::new( now, @@ -542,7 +544,7 @@ impl Connection { // We need to send 1 more datagram and extend the buffer for that. // Is 1 more datagram allowed? - if buf_capacity >= self.path.max_udp_payload_size as usize * max_datagrams { + if buf_capacity >= self.path.current_mtu() as usize * max_datagrams { // No more datagrams allowed break; } @@ -554,7 +556,7 @@ impl Connection { // budget left, we always allow a full MTU to be sent // (see https://github.com/quinn-rs/quinn/issues/1082) if self.path.anti_amplification_blocked( - self.path.max_udp_payload_size as u64 * num_datagrams as u64 + 1, + self.path.current_mtu() as u64 * num_datagrams as u64 + 1, ) { trace!("blocked by anti-amplification"); break; @@ -569,9 +571,9 @@ impl Connection { } else { 0 } as u64; - debug_assert!(untracked_bytes <= self.path.max_udp_payload_size as u64); + debug_assert!(untracked_bytes <= self.path.current_mtu() as u64); - let bytes_to_send = u64::from(self.path.max_udp_payload_size) + untracked_bytes; + let bytes_to_send = u64::from(self.path.current_mtu()) + untracked_bytes; if self.in_flight.bytes + bytes_to_send >= self.path.congestion.window() { space_idx += 1; congestion_blocked = true; @@ -585,7 +587,7 @@ impl Connection { if let Some(delay) = self.path.pacing.delay( smoothed_rtt, bytes_to_send, - self.path.max_udp_payload_size, + self.path.current_mtu(), self.path.congestion.window(), now, ) { @@ -601,7 +603,7 @@ impl Connection { if let Some(mut builder) = builder.take() { // Pad the packet to make it suitable for sending with GSO // which will always send the maximum PDU. - builder.pad_to(self.path.max_udp_payload_size); + builder.pad_to(self.path.current_mtu()); builder.finish_and_track(now, self, sent_frames.take(), &mut buf); @@ -609,7 +611,7 @@ impl Connection { } // Allocate space for another datagram - buf_capacity += self.path.max_udp_payload_size as usize; + buf_capacity += self.path.current_mtu() as usize; if buf.capacity() < buf_capacity { // We reserve the maximum space for sending `max_datagrams` upfront // to avoid any reallocations if more datagrams have to be appended later on. @@ -619,9 +621,7 @@ impl Connection { // (e.g. purely containing ACKs), modern memory allocators // (e.g. mimalloc and jemalloc) will pool certain allocation sizes // and therefore this is still rather efficient. - buf.reserve( - max_datagrams * self.path.max_udp_payload_size as usize - buf.capacity(), - ); + buf.reserve(max_datagrams * self.path.current_mtu() as usize - buf.capacity()); } num_datagrams += 1; coalesce = true; @@ -666,7 +666,7 @@ impl Connection { space_id, &mut buf, buf_capacity, - (num_datagrams - 1) * (self.path.max_udp_payload_size as usize), + (num_datagrams - 1) * (self.path.current_mtu() as usize), ack_eliciting, self, self.version, @@ -742,8 +742,7 @@ impl Connection { !(sent.is_ack_only(&self.streams) && !can_send.acks && can_send.other - && (buf_capacity - builder.datagram_start) - == self.path.max_udp_payload_size as usize), + && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize), "SendableFrames was {can_send:?}, but only ACKs have been written" ); pad_datagram |= sent.requires_padding; @@ -773,6 +772,48 @@ impl Connection { self.app_limited = buf.is_empty() && !congestion_blocked; + // Send MTU probe if necessary + if buf.is_empty() && self.state.is_established() { + let space_id = SpaceId::Data; + let probe_size = match self + .path + .mtud + .poll_transmit(now, self.spaces[space_id].next_packet_number) + { + Some(next_probe_size) => next_probe_size, + None => return None, + }; + + let buf_capacity = probe_size as usize; + buf.reserve(buf_capacity); + + let mut builder = PacketBuilder::new( + now, + SpaceId::Data, + &mut buf, + buf_capacity, + 0, + true, + self, + self.version, + )?; + + // We implement MTU probes as ping packets padded up to the probe size + buf.write(frame::Type::PING); + builder.pad_to(probe_size); + let sent_frames = SentFrames { + non_retransmits: true, + ..Default::default() + }; + builder.finish_and_track(now, self, Some(sent_frames), &mut buf); + + self.stats.frame_tx.ping += 1; + self.stats.path.sent_plpmtud_probes += 1; + num_datagrams = 1; + + trace!(?probe_size, "writing MTUD probe"); + } + if buf.is_empty() { return None; } @@ -794,7 +835,7 @@ impl Connection { }, segment_size: match num_datagrams { 1 => None, - _ => Some(self.path.max_udp_payload_size as usize), + _ => Some(self.path.current_mtu() as usize), }, src_ip: self.local_ip, }) @@ -1175,6 +1216,7 @@ impl Connection { self.spaces[space].pending_acks.subtract_below(acked); } ack_eliciting_acked |= info.ack_eliciting; + self.path.mtud.on_acked(space, packet, info.size); self.on_packet_acked(now, space, info); } } @@ -1342,6 +1384,8 @@ impl Connection { fn detect_lost_packets(&mut self, now: Instant, pn_space: SpaceId, due_to_ack: bool) { let mut lost_packets = Vec::::new(); + let mut lost_mtu_probe = None; + let in_flight_mtu_probe = self.path.mtud.in_flight_mtu_probe(); let rtt = self.path.rtt.conservative(); let loss_delay = cmp::max(rtt.mul_f32(self.config.time_threshold), TIMER_GRANULARITY); @@ -1371,24 +1415,30 @@ impl Connection { if info.time_sent <= lost_send_time || largest_acked_packet >= packet + packet_threshold { - lost_packets.push(packet); - size_of_lost_packets += info.size as u64; - if info.ack_eliciting && due_to_ack { - match persistent_congestion_start { - // Two ACK-eliciting packets lost more than congestion_period apart, with no - // ACKed packets in between - Some(start) if info.time_sent - start > congestion_period => { - in_persistent_congestion = true; - } - // Persistent congestion must start after the first RTT sample - None if self - .path - .first_packet_after_rtt_sample - .map_or(false, |x| x < (pn_space, packet)) => - { - persistent_congestion_start = Some(info.time_sent); + if Some(packet) == in_flight_mtu_probe { + // Lost MTU probes are not included in `lost_packets`, because they should not + // trigger a congestion control response + lost_mtu_probe = in_flight_mtu_probe; + } else { + lost_packets.push(packet); + size_of_lost_packets += info.size as u64; + if info.ack_eliciting && due_to_ack { + match persistent_congestion_start { + // Two ACK-eliciting packets lost more than congestion_period apart, with no + // ACKed packets in between + Some(start) if info.time_sent - start > congestion_period => { + in_persistent_congestion = true; + } + // Persistent congestion must start after the first RTT sample + None if self + .path + .first_packet_after_rtt_sample + .map_or(false, |x| x < (pn_space, packet)) => + { + persistent_congestion_start = Some(info.time_sent); + } + _ => {} } - _ => {} } } } else { @@ -1423,6 +1473,9 @@ impl Connection { self.streams.retransmit(frame); } self.spaces[pn_space].pending |= info.retransmits; + self.path + .mtud + .on_non_probe_packet_lost(now, *packet, info.size); } // Don't apply congestion penalty for lost ack-only packets let lost_ack_eliciting = old_bytes_in_flight != self.in_flight.bytes; @@ -1437,6 +1490,17 @@ impl Connection { ); } } + + // Handle a lost MTU probe + if let Some(packet) = lost_mtu_probe { + let info = self.spaces[SpaceId::Data] + .sent_packets + .remove(&packet) + .unwrap(); // safe: lost_mtu_probe is omitted from lost_packets, and therefore must not have been removed yet + self.remove_in_flight(SpaceId::Data, &info); + self.path.mtud.on_probe_packet_lost(); + self.stats.path.lost_plpmtud_probes += 1; + } } fn loss_time_and_space(&self) -> Option<(Instant, SpaceId)> { @@ -2171,7 +2235,7 @@ impl Connection { } if self.side.is_client() { - // Client-only beceause server params were set from the client's Initial + // Client-only because server params were set from the client's Initial let params = self.crypto .transport_parameters()? @@ -2650,14 +2714,16 @@ impl Connection { let mut new_path = if remote.is_ipv4() && remote.ip() == self.path.remote.ip() { PathData::from_previous(remote, &self.path, now) } else { + let peer_max_udp_payload_size = + u16::try_from(self.peer_params.max_udp_payload_size.into_inner()) + .unwrap_or(u16::MAX); PathData::new( remote, self.config.initial_rtt, self.config.congestion_controller_factory.build(now), - self.config.initial_max_udp_payload_size.min( - u16::try_from(self.peer_params.max_udp_payload_size.into_inner()) - .unwrap_or(u16::MAX), - ), + self.config.initial_max_udp_payload_size, + Some(peer_max_udp_payload_size), + self.config.plpmtud_config.clone(), now, false, ) @@ -2962,7 +3028,7 @@ impl Connection { }).expect("preferred address CID is the first received, and hence is guaranteed to be legal"); } self.peer_params = params; - self.path.max_udp_payload_size = self.path.max_udp_payload_size.min( + self.path.mtud.on_peer_max_udp_payload_size_received( u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX), ); } @@ -3136,6 +3202,12 @@ impl Connection { self.rem_cids.active_seq() } + /// Returns the detected maximum udp payload size for the current path + #[cfg(test)] + pub(crate) fn path_mtu(&self) -> u16 { + self.path.current_mtu() + } + fn max_ack_delay(&self) -> Duration { Duration::from_micros(self.peer_params.max_ack_delay.0 * 1000) } diff --git a/quinn-proto/src/connection/pacing.rs b/quinn-proto/src/connection/pacing.rs index 973282473b..31dabf8dc1 100644 --- a/quinn-proto/src/connection/pacing.rs +++ b/quinn-proto/src/connection/pacing.rs @@ -15,6 +15,7 @@ use tracing::warn; pub struct Pacer { capacity: u64, last_window: u64, + last_mtu: u16, tokens: u64, prev: Instant, } @@ -31,6 +32,7 @@ impl Pacer { Self { capacity, last_window: window, + last_mtu: max_udp_payload_size, tokens: capacity, prev: now, } @@ -61,12 +63,13 @@ impl Pacer { "zero-sized congestion control window is nonsense" ); - if window != self.last_window { + if window != self.last_window || mtu != self.last_mtu { self.capacity = optimal_capacity(smoothed_rtt, window, mtu); // Clamp the tokens self.tokens = self.capacity.min(self.tokens); self.last_window = window; + self.last_mtu = mtu; } // if we can already send a packet, there is no need for delay @@ -227,6 +230,15 @@ mod tests { (window as u128 / 2 * BURST_INTERVAL_NANOS / rtt.as_nanos()) as u64 ); assert_eq!(pacer.tokens, initial_tokens / 2); + + pacer.delay(rtt, mtu as u64, mtu * 2, window, now); + assert_eq!( + pacer.capacity, + (window as u128 * BURST_INTERVAL_NANOS / rtt.as_nanos()) as u64 + ); + + pacer.delay(rtt, mtu as u64, 20_000, window, now); + assert_eq!(pacer.capacity, 20_000_u64 * MIN_BURST_SIZE); } #[test] diff --git a/quinn-proto/src/connection/packet_builder.rs b/quinn-proto/src/connection/packet_builder.rs index 8c1d81faa8..0423763429 100644 --- a/quinn-proto/src/connection/packet_builder.rs +++ b/quinn-proto/src/connection/packet_builder.rs @@ -238,9 +238,6 @@ impl PacketBuilder { ); buffer.resize(buffer.len() + packet_crypto.tag_len(), 0); - debug_assert!( - buffer.len() <= self.datagram_start + conn.path.max_udp_payload_size as usize - ); let encode_start = self.partial_encode.start; let packet_buf = &mut buffer[encode_start..]; self.partial_encode.finish( diff --git a/quinn-proto/src/connection/paths.rs b/quinn-proto/src/connection/paths.rs index c5f6f88fd4..b3e88d53ee 100644 --- a/quinn-proto/src/connection/paths.rs +++ b/quinn-proto/src/connection/paths.rs @@ -1,7 +1,9 @@ use std::{cmp, net::SocketAddr, time::Duration, time::Instant}; +use tracing::trace; use super::pacing::Pacer; -use crate::{congestion, packet::SpaceId, TIMER_GRANULARITY}; +use crate::config::PlpmtudConfig; +use crate::{congestion, packet::SpaceId, MAX_UDP_PAYLOAD, TIMER_GRANULARITY}; /// Description of a particular network path pub struct PathData { @@ -24,7 +26,8 @@ pub struct PathData { pub total_sent: u64, /// Total size of all UDP datagrams received on this path pub total_recvd: u64, - pub max_udp_payload_size: u16, + /// The state of the MTU discovery process + pub mtud: MtuDiscovery, /// Packet number of the first packet sent after an RTT sample was collected on this path /// /// Used in persistent congestion determination. @@ -37,6 +40,8 @@ impl PathData { initial_rtt: Duration, congestion: Box, initial_max_udp_payload_size: u16, + peer_max_udp_payload_size: Option, + plpmtud_config: Option, now: Instant, validated: bool, ) -> Self { @@ -56,7 +61,16 @@ impl PathData { validated, total_sent: 0, total_recvd: 0, - max_udp_payload_size: initial_max_udp_payload_size, + mtud: plpmtud_config.map_or( + MtuDiscovery::disabled(initial_max_udp_payload_size), + |config| { + MtuDiscovery::new( + initial_max_udp_payload_size, + peer_max_udp_payload_size, + config, + ) + }, + ), first_packet_after_rtt_sample: None, } } @@ -67,12 +81,7 @@ impl PathData { PathData { remote, rtt: prev.rtt, - pacing: Pacer::new( - smoothed_rtt, - congestion.window(), - prev.max_udp_payload_size, - now, - ), + pacing: Pacer::new(smoothed_rtt, congestion.window(), prev.current_mtu(), now), sending_ecn: true, congestion, challenge: None, @@ -80,7 +89,7 @@ impl PathData { validated: false, total_sent: 0, total_recvd: 0, - max_udp_payload_size: prev.max_udp_payload_size, + mtud: prev.mtud.clone(), first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample, } } @@ -90,6 +99,11 @@ impl PathData { pub fn anti_amplification_blocked(&self, bytes_to_send: u64) -> bool { !self.validated && self.total_recvd * 3 < self.total_sent + bytes_to_send } + + /// Returns the current MTU (maximum UDP payload size) as discovered through PLPMTUD + pub fn current_mtu(&self) -> u16 { + self.mtud.current_mtu + } } /// RTT estimation for a particular network path @@ -163,3 +177,605 @@ impl RttEstimator { } } } + +/// Implements Datagram Packetization Layer Path Maximum Transmission Unit Discovery +/// +/// See for details +#[derive(Clone)] +pub struct MtuDiscovery { + /// Detected MTU for the path + current_mtu: u16, + /// The state of the MTU discovery, if enabled + state: Option, + /// A count of the number of packets with a size > BASE_UDP_PAYLOAD_SIZE lost since + /// the last time a packet with size equal to the current MTU was acknowledged + black_hole_counter: u8, + /// The largest acked packet of size `current_mtu` + largest_acked_mtu_sized_packet: Option, +} + +impl MtuDiscovery { + fn new( + initial_plpmtu: u16, + peer_max_udp_payload_size: Option, + config: PlpmtudConfig, + ) -> Self { + debug_assert!( + initial_plpmtu >= BASE_UDP_PAYLOAD_SIZE, + "initial_max_udp_payload_size must be at least {BASE_UDP_PAYLOAD_SIZE}" + ); + + let mut mtud = + MtuDiscovery::with_state(initial_plpmtu, Some(EnabledMtuDiscovery::new(config))); + + // We might be migrating an existing connection to a new path, in which case the transport + // parameters have already been transmitted, and we already know the value of + // `peer_max_udp_payload_size` + if let Some(peer_max_udp_payload_size) = peer_max_udp_payload_size { + mtud.on_peer_max_udp_payload_size_received(peer_max_udp_payload_size); + } + + mtud + } + + /// MTU discovery will be disabled and the current MTU will be fixed to the provided value + fn disabled(plpmtu: u16) -> Self { + MtuDiscovery::with_state(plpmtu, None) + } + + fn with_state(current_mtu: u16, state: Option) -> Self { + MtuDiscovery { + current_mtu, + state, + black_hole_counter: 0, + largest_acked_mtu_sized_packet: None, + } + } + + /// Returns the amount of bytes that should be sent as an MTU probe, if any + pub fn poll_transmit(&mut self, now: Instant, next_packet_number: u64) -> Option { + self.state + .as_mut() + .and_then(|state| state.poll_transmit(now, self.current_mtu, next_packet_number)) + } + + /// Notifies the [`MtuDiscovery`] that the peer's `max_udp_payload_size` transport parameter has + /// been received + pub fn on_peer_max_udp_payload_size_received(&mut self, peer_max_udp_payload_size: u16) { + self.current_mtu = self.current_mtu.min(peer_max_udp_payload_size); + + if let Some(state) = self.state.as_mut() { + // MTUD is only active after the connection has been fully established, so it is + // guaranteed we will receive the peer's transport parameters before we start probing + debug_assert!(matches!(state.phase, Phase::Initial)); + state.peer_max_udp_payload_size = peer_max_udp_payload_size; + } + } + + /// Notifies the [`MtuDiscovery`] that a packet has been ACKed + pub fn on_acked(&mut self, space: SpaceId, packet_number: u64, packet_bytes: u16) { + // MTU probes are only sent in application data space + if space != SpaceId::Data { + return; + } + + // Update the state of the MTU search + if let Some(new_mtu) = self + .state + .as_mut() + .and_then(|state| state.on_probe_acked(packet_number)) + { + self.current_mtu = new_mtu; + trace!(max_udp_payload_size = self.current_mtu, "new MTU detected"); + + // We know for sure the path supports the current MTU + self.black_hole_counter = 0; + } + + // Reset the black hole counter if a packet the size of the current MTU or larger + // has been acknowledged + if packet_bytes >= self.current_mtu + && self + .largest_acked_mtu_sized_packet + .map_or(true, |pn| packet_number > pn) + { + self.black_hole_counter = 0; + self.largest_acked_mtu_sized_packet = Some(packet_number); + } + } + + /// Returns the packet number of the in-flight MTU probe, if any + pub fn in_flight_mtu_probe(&self) -> Option { + match &self.state { + Some(EnabledMtuDiscovery { + phase: Phase::Searching(search_state), + .. + }) => search_state.in_flight_probe, + _ => None, + } + } + + /// Notifies the [`MtuDiscovery`] that the in-flight MTU probe was lost + pub fn on_probe_packet_lost(&mut self) { + if let Some(state) = &mut self.state { + state.on_probe_packet_lost(); + } + } + + /// Notifies the [`MtuDiscovery`] that a non-probe packet was lost + pub fn on_non_probe_packet_lost(&mut self, now: Instant, packet_number: u64, packet_size: u16) { + // Keep track of lost packets higher than the base size, and reset the current MTU + // if they reach the threshold + if packet_size > BASE_UDP_PAYLOAD_SIZE + && self + .largest_acked_mtu_sized_packet + .map_or(true, |pn| packet_number > pn) + { + self.black_hole_counter += 1; + if self.black_hole_counter > BLACK_HOLE_THRESHOLD { + self.black_hole_counter = 0; + self.largest_acked_mtu_sized_packet = None; + self.current_mtu = BASE_UDP_PAYLOAD_SIZE; + + if let Some(state) = &mut self.state { + state.on_black_hole_detected(now); + } + } + } + } +} + +/// Additional state for enabled MTU discovery +#[derive(Debug, Clone)] +struct EnabledMtuDiscovery { + phase: Phase, + peer_max_udp_payload_size: u16, + config: PlpmtudConfig, +} + +impl EnabledMtuDiscovery { + fn new(config: PlpmtudConfig) -> Self { + EnabledMtuDiscovery { + phase: Phase::Initial, + peer_max_udp_payload_size: MAX_UDP_PAYLOAD, + config, + } + } + + /// Returns the amount of bytes that should be sent as an MTU probe, if any + fn poll_transmit( + &mut self, + now: Instant, + current_mtu: u16, + next_packet_number: u64, + ) -> Option { + if let Phase::Initial = &self.phase { + // Start the first search + self.phase = Phase::Searching(SearchState::new( + current_mtu, + self.peer_max_udp_payload_size, + &self.config, + )); + } else if let Phase::Complete(next_mtud_activation) = &self.phase { + if now < *next_mtud_activation { + return None; + } else { + // Start a new search (we have reached the next activation time) + self.phase = Phase::Searching(SearchState::new( + current_mtu, + self.peer_max_udp_payload_size, + &self.config, + )); + } + } + + if let Phase::Searching(state) = &mut self.phase { + // Nothing to do while there is a probe in flight + if state.in_flight_probe.is_some() { + return None; + } + + // Retransmit lost probes, if any + if 0 < state.lost_probe_count && state.lost_probe_count < MAX_PROBE_RETRANSMITS { + state.in_flight_probe = Some(next_packet_number); + return Some(state.last_probed_udp_payload_size); + } + + let last_probe_succeeded = state.lost_probe_count == 0; + + // The probe is definitely lost (we reached the MAX_PROBE_RETRANSMITS threshold) + if !last_probe_succeeded { + state.lost_probe_count = 0; + state.in_flight_probe = None; + } + + if let Some(probe_udp_payload_size) = state.next_mtu_to_probe(last_probe_succeeded) { + state.in_flight_probe = Some(next_packet_number); + state.last_probed_udp_payload_size = probe_udp_payload_size; + return Some(probe_udp_payload_size); + } else { + let next_mtud_activation = now + self.config.interval; + self.phase = Phase::Complete(next_mtud_activation); + return None; + } + } + + None + } + + /// Called when a packet is acknowledged in [`SpaceId::Data`] + /// + /// Returns the new `current_mtu` if the packet number corresponds to the in-flight MTU probe + fn on_probe_acked(&mut self, packet_number: u64) -> Option { + match &mut self.phase { + Phase::Searching(state) if state.in_flight_probe == Some(packet_number) => { + state.in_flight_probe = None; + state.lost_probe_count = 0; + Some(state.last_probed_udp_payload_size) + } + _ => None, + } + } + + /// Called when the in-flight MTU probe was lost + fn on_probe_packet_lost(&mut self) { + // We might no longer be searching, e.g. if a black hole was detected + if let Phase::Searching(state) = &mut self.phase { + state.in_flight_probe = None; + state.lost_probe_count += 1; + } + } + + /// Called when a black hole is detected + fn on_black_hole_detected(&mut self, now: Instant) { + // Stop searching, if applicable, and reset the timer + let next_mtud_activation = now + self.config.black_hole_cooldown; + self.phase = Phase::Complete(next_mtud_activation); + } +} + +#[derive(Debug, Clone)] +enum Phase { + /// We haven't started polling yet + Initial, + /// We are currently searching for a higher PMTU + Searching(SearchState), + /// Searching has completed and will be triggered again at the provided instant + Complete(Instant), +} + +#[derive(Debug, Clone)] +struct SearchState { + /// The lower bound for the current binary search + search_lower_bound: u16, + /// The upper bound for the current binary search + search_upper_bound: u16, + /// The UDP payload size we last sent a probe for + last_probed_udp_payload_size: u16, + /// Packet number of an in-flight probe (if any) + in_flight_probe: Option, + /// Lost probes at the current probe size + lost_probe_count: usize, +} + +impl SearchState { + /// Creates a new search state, with the specified lower bound (the upper bound is derived from + /// the config and the peer's `max_udp_payload_size` transport parameter) + fn new(lower_bound: u16, peer_max_udp_payload_size: u16, config: &PlpmtudConfig) -> Self { + let search_upper_bound = config + .max_udp_payload_size_upper_bound + .min(peer_max_udp_payload_size) + .max(lower_bound); + + SearchState { + in_flight_probe: None, + lost_probe_count: 0, + search_lower_bound: lower_bound, + search_upper_bound, + // During initialization, we consider the lower bound to have already been + // successfully probed + last_probed_udp_payload_size: lower_bound, + } + } + + /// Determines the next MTU to probe using binary search + fn next_mtu_to_probe(&mut self, last_probe_succeeded: bool) -> Option { + debug_assert_eq!(self.in_flight_probe, None); + + if last_probe_succeeded { + self.search_lower_bound = self.last_probed_udp_payload_size; + } else { + self.search_upper_bound = self.last_probed_udp_payload_size - 1; + } + + let next_mtu = (self.search_lower_bound as i32 + self.search_upper_bound as i32) / 2; + + // Binary search stopping condition + if ((next_mtu - self.last_probed_udp_payload_size as i32).unsigned_abs() as u16) + < BINARY_SEARCH_MINIMUM_CHANGE + { + // Special case: if the upper bound is far enough, we want to probe it as a last + // step (otherwise we will never achieve the upper bound) + if self + .search_upper_bound + .saturating_sub(self.last_probed_udp_payload_size) + >= BINARY_SEARCH_MINIMUM_CHANGE + { + return Some(self.search_upper_bound); + } + + return None; + } + + Some(next_mtu as u16) + } +} + +// Corresponds to the RFC's `MAX_PROBES` constant (see +// https://www.rfc-editor.org/rfc/rfc8899#section-5.1.2) +const MAX_PROBE_RETRANSMITS: usize = 3; +const BASE_UDP_PAYLOAD_SIZE: u16 = 1_200; +const BLACK_HOLE_THRESHOLD: u8 = 3; +const BINARY_SEARCH_MINIMUM_CHANGE: u16 = 20; + +#[cfg(test)] +mod tests { + use super::*; + use crate::MAX_UDP_PAYLOAD; + use assert_matches::assert_matches; + + fn default_mtud() -> MtuDiscovery { + let config = PlpmtudConfig::default(); + MtuDiscovery::new(1_200, None, config) + } + + fn completed(mtud: &MtuDiscovery) -> bool { + matches!(mtud.state.as_ref().unwrap().phase, Phase::Complete(_)) + } + + /// Drives mtud until it reaches `Phase::Completed` + fn drive_to_completion( + mtud: &mut MtuDiscovery, + now: Instant, + link_payload_size_limit: u16, + ) -> Vec { + let mut probed_sizes = Vec::new(); + for probe_packet_number in 1..100 { + let result = mtud.poll_transmit(now, probe_packet_number); + + if completed(mtud) { + break; + } + + // "Send" next probe + assert!(result.is_some()); + let probe_size = result.unwrap(); + probed_sizes.push(probe_size); + + if probe_size <= link_payload_size_limit { + mtud.on_acked(SpaceId::Data, probe_packet_number, probe_size); + } else { + mtud.on_probe_packet_lost(); + } + } + probed_sizes + } + + #[test] + fn mtu_discovery_disabled_does_nothing() { + let mut mtud = MtuDiscovery::disabled(1_200); + let probe_size = mtud.poll_transmit(Instant::now(), 0); + assert_eq!(probe_size, None); + } + + #[test] + fn mtu_discovery_disabled_lost_four_packets_triggers_black_hole_detection() { + let mut mtud = MtuDiscovery::disabled(1_400); + let now = Instant::now(); + + for _ in 0..4 { + mtud.on_non_probe_packet_lost(now, 0, 1300); + } + + assert_eq!(mtud.current_mtu, 1200); + assert_matches!(mtud.state, None); + } + + #[test] + fn mtu_discovery_lost_four_packets_triggers_black_hole_detection_and_resets_timer() { + let mut mtud = default_mtud(); + let now = Instant::now(); + + for _ in 0..4 { + mtud.on_non_probe_packet_lost(now, 0, 1300); + } + + assert_eq!(mtud.current_mtu, 1200); + if let Phase::Complete(next_mtud_activation) = mtud.state.unwrap().phase { + assert_eq!(next_mtud_activation, now + Duration::from_secs(60)); + } else { + panic!("Unexpected MTUD phase!"); + } + } + + #[test] + fn mtu_discovery_after_complete_reactivates_when_interval_elapsed() { + let mut config = PlpmtudConfig::default(); + config.max_udp_payload_size_upper_bound(9_000); + let mut mtud = MtuDiscovery::new(1_200, None, config); + let now = Instant::now(); + drive_to_completion(&mut mtud, now, 1_500); + + // Polling right after completion does not cause new packets to be sent + assert_eq!(mtud.poll_transmit(now, 42), None); + assert!(completed(&mtud)); + assert_eq!(mtud.current_mtu, 1_471); + + // Polling after the interval has passed does (taking the current mtu as lower bound) + assert_eq!( + mtud.poll_transmit(now + Duration::from_secs(600), 43), + Some(5235) + ); + + match mtud.state.unwrap().phase { + Phase::Searching(state) => { + assert_eq!(state.search_lower_bound, 1_471); + assert_eq!(state.search_upper_bound, 9_000); + } + _ => { + panic!("Unexpected MTUD phase!") + } + } + } + + #[test] + fn mtu_discovery_lost_three_probes_lowers_probe_size() { + let mut mtud = default_mtud(); + + let mut probe_sizes = (0..4).map(|i| { + let probe_size = mtud.poll_transmit(Instant::now(), i); + assert!(probe_size.is_some(), "no probe returned for packet {i}"); + + mtud.on_probe_packet_lost(); + probe_size.unwrap() + }); + + // After the first probe is lost, it gets retransmitted twice + let first_probe_size = probe_sizes.next().unwrap(); + for _ in 0..2 { + assert_eq!(probe_sizes.next().unwrap(), first_probe_size) + } + + // After the third probe is lost, we decrement our probe size + let fourth_probe_size = probe_sizes.next().unwrap(); + assert!(fourth_probe_size < first_probe_size); + assert_eq!( + fourth_probe_size, + first_probe_size - (first_probe_size - BASE_UDP_PAYLOAD_SIZE) / 2 - 1 + ); + } + + #[test] + fn mtu_discovery_with_peer_max_udp_payload_size_clamps_upper_bound() { + let mut mtud = default_mtud(); + + mtud.on_peer_max_udp_payload_size_received(1300); + let probed_sizes = drive_to_completion(&mut mtud, Instant::now(), 1500); + + assert_eq!(mtud.state.as_ref().unwrap().peer_max_udp_payload_size, 1300); + assert_eq!(mtud.current_mtu, 1300); + let expected_probed_sizes = &[1250, 1275, 1300]; + assert_eq!(probed_sizes, expected_probed_sizes); + assert!(completed(&mtud)); + } + + #[test] + fn mtu_discovery_with_previous_peer_max_udp_payload_size_clamps_upper_bound() { + let mut mtud = MtuDiscovery::new(1500, Some(1400), PlpmtudConfig::default()); + + assert_eq!(mtud.current_mtu, 1400); + assert_eq!(mtud.state.as_ref().unwrap().peer_max_udp_payload_size, 1400); + + let probed_sizes = drive_to_completion(&mut mtud, Instant::now(), 1500); + + assert_eq!(mtud.current_mtu, 1400); + assert!(probed_sizes.is_empty()); + assert!(completed(&mtud)); + } + + #[test] + #[should_panic] + fn mtu_discovery_with_peer_max_udp_payload_size_after_search_panics() { + let mut mtud = default_mtud(); + drive_to_completion(&mut mtud, Instant::now(), 1500); + mtud.on_peer_max_udp_payload_size_received(1300); + } + + #[test] + fn mtu_discovery_with_1500_limit() { + let mut mtud = default_mtud(); + + let probed_sizes = drive_to_completion(&mut mtud, Instant::now(), 1500); + + let expected_probed_sizes = &[1326, 1389, 1420, 1452]; + assert_eq!(probed_sizes, expected_probed_sizes); + assert_eq!(mtud.current_mtu, 1452); + assert!(completed(&mtud)); + } + + #[test] + fn mtu_discovery_with_1500_limit_and_10000_upper_bound() { + let mut config = PlpmtudConfig::default(); + config.max_udp_payload_size_upper_bound(10_000); + let mut mtud = MtuDiscovery::new(1_200, None, config); + + let probed_sizes = drive_to_completion(&mut mtud, Instant::now(), 1500); + + let expected_probed_sizes = &[ + 5600, 5600, 5600, 3399, 3399, 3399, 2299, 2299, 2299, 1749, 1749, 1749, 1474, 1611, + 1611, 1611, 1542, 1542, 1542, 1507, 1507, 1507, + ]; + assert_eq!(probed_sizes, expected_probed_sizes); + assert_eq!(mtud.current_mtu, 1474); + assert!(completed(&mtud)); + } + + #[test] + fn mtu_discovery_no_lost_probes_finds_maximum_udp_payload() { + let mut config = PlpmtudConfig::default(); + config.max_udp_payload_size_upper_bound(MAX_UDP_PAYLOAD); + let mut mtud = MtuDiscovery::new(1200, None, config); + + drive_to_completion(&mut mtud, Instant::now(), u16::MAX); + + assert_eq!(mtud.current_mtu, 65527); + assert!(completed(&mtud)); + } + + #[test] + fn mtu_discovery_lost_half_of_probes_finds_maximum_udp_payload() { + let mut config = PlpmtudConfig::default(); + config.max_udp_payload_size_upper_bound(MAX_UDP_PAYLOAD); + let mut mtud = MtuDiscovery::new(1200, None, config); + + let now = Instant::now(); + let mut iterations = 0; + for i in 1..100 { + iterations += 1; + + let probe_packet_number = i * 2 - 1; + let other_packet_number = i * 2; + + let result = mtud.poll_transmit(Instant::now(), probe_packet_number); + + if completed(&mtud) { + break; + } + + // "Send" next probe + assert!(result.is_some()); + assert!(mtud.in_flight_mtu_probe().is_some()); + + // Nothing else to send while the probe is in-flight + assert_matches!(mtud.poll_transmit(now, other_packet_number), None); + + if i % 2 == 0 { + // ACK probe and ensure it results in an increase of current_mtu + let previous_max_size = mtud.current_mtu; + mtud.on_acked(SpaceId::Data, probe_packet_number, result.unwrap()); + println!( + "ACK packet {}. Previous MTU = {previous_max_size}. New MTU = {}", + result.unwrap(), + mtud.current_mtu + ); + // assert!(mtud.current_mtu > previous_max_size); + } else { + mtud.on_probe_packet_lost(); + } + } + + assert_eq!(iterations, 25); + assert_eq!(mtud.current_mtu, 65527); + assert!(completed(&mtud)); + } +} diff --git a/quinn-proto/src/connection/stats.rs b/quinn-proto/src/connection/stats.rs index dfae4e751e..2224383798 100644 --- a/quinn-proto/src/connection/stats.rs +++ b/quinn-proto/src/connection/stats.rs @@ -132,6 +132,11 @@ pub struct PathStats { pub lost_bytes: u64, /// The amount of packets sent on this path pub sent_packets: u64, + /// The amount of PLPMTUD probe packets sent on this path (also counted by `sent_packets`) + pub sent_plpmtud_probes: u64, + /// The amount of PLPMTUD probe packets lost on this path (ignored by `lost_packets` and + /// `lost_bytes`) + pub lost_plpmtud_probes: u64, } /// Connection statistics diff --git a/quinn-proto/src/lib.rs b/quinn-proto/src/lib.rs index 48b3fae510..e174a5c2dc 100644 --- a/quinn-proto/src/lib.rs +++ b/quinn-proto/src/lib.rs @@ -47,7 +47,8 @@ pub use crate::connection::{ mod config; pub use config::{ - ClientConfig, ConfigError, EndpointConfig, IdleTimeout, ServerConfig, TransportConfig, + ClientConfig, ConfigError, EndpointConfig, IdleTimeout, PlpmtudConfig, ServerConfig, + TransportConfig, }; pub mod crypto; @@ -294,6 +295,7 @@ const MAX_CID_SIZE: usize = 20; const MIN_INITIAL_SIZE: u16 = 1200; /// const INITIAL_MAX_UDP_PAYLOAD_SIZE: u16 = 1200; +const MAX_UDP_PAYLOAD: u16 = 65527; const TIMER_GRANULARITY: Duration = Duration::from_millis(1); /// Maximum number of streams that can be uniquely identified by a stream ID const MAX_STREAM_COUNT: u64 = 1 << 60; diff --git a/quinn-proto/src/tests/mod.rs b/quinn-proto/src/tests/mod.rs index 2d18a85f42..0cdc6ed451 100644 --- a/quinn-proto/src/tests/mod.rs +++ b/quinn-proto/src/tests/mod.rs @@ -1911,3 +1911,284 @@ fn malformed_token_len() { hex!("8900 0000 0101 0000 1b1b 841b 0000 0000 3f00")[..].into(), ); } + +#[test] +/// This is mostly a sanity check to ensure our testing code is correctly dropping packets above the +/// pmtu +fn connect_too_low_mtu() { + let _guard = subscribe(); + let mut pair = Pair::default(); + + // The maximum payload size is lower than 1200, so no packages will get through! + pair.max_udp_payload_size = 1000; + + pair.begin_connect(client_config()); + pair.drive(); + pair.server.assert_no_accept() +} + +#[test] +fn connect_lost_mtu_probes_do_not_trigger_congestion_control() { + let _guard = subscribe(); + let mut pair = Pair::default(); + + let (client_ch, server_ch) = pair.connect(); + pair.drive(); + + let client_stats = pair.client_conn_mut(client_ch).stats(); + let server_stats = pair.server_conn_mut(server_ch).stats(); + + // Sanity check (all MTU probes should have been lost) + assert_eq!(client_stats.path.sent_plpmtud_probes, 9); + assert_eq!(client_stats.path.lost_plpmtud_probes, 9); + assert_eq!(server_stats.path.sent_plpmtud_probes, 9); + assert_eq!(server_stats.path.lost_plpmtud_probes, 9); + + // No congestion events + assert_eq!(client_stats.path.congestion_events, 0); + assert_eq!(server_stats.path.congestion_events, 0); +} + +#[test] +fn connect_detects_mtu() { + let _guard = subscribe(); + let max_udp_payload_and_expected_mtu = &[(1200, 1200), (1400, 1389), (1500, 1452)]; + + for &(pair_max_udp, expected_mtu) in max_udp_payload_and_expected_mtu { + println!("Trying {pair_max_udp}"); + let mut pair = Pair::default(); + pair.max_udp_payload_size = pair_max_udp; + let (client_ch, server_ch) = pair.connect(); + pair.drive(); + + assert_eq!(pair.client_conn_mut(client_ch).path_mtu(), expected_mtu); + assert_eq!(pair.server_conn_mut(server_ch).path_mtu(), expected_mtu); + } +} + +#[test] +fn migrate_detects_new_mtu_and_respects_original_peer_max_udp_payload_size() { + let _guard = subscribe(); + + let client_max_udp_payload_size: u16 = 1400; + + // Set up a client with a max payload size of 1400 (and use the defaults for the server) + let server_endpoint_config = EndpointConfig::default(); + let server = Endpoint::new( + Arc::new(server_endpoint_config), + Some(Arc::new(server_config())), + ); + let client_endpoint_config = EndpointConfig { + max_udp_payload_size: VarInt::from(client_max_udp_payload_size), + ..EndpointConfig::default() + }; + let client = Endpoint::new(Arc::new(client_endpoint_config), None); + let mut pair = Pair::new_from_endpoint(client, server); + pair.max_udp_payload_size = 1300; + + // Connect + let (client_ch, server_ch) = pair.connect(); + pair.drive(); + + // Sanity check: MTUD ran to completion (the numbers differ because binary search stops when + // changes are smaller than 20, otherwise both endpoints would converge at the same MTU of 1300) + assert_eq!(pair.client_conn_mut(client_ch).path_mtu(), 1293); + assert_eq!(pair.server_conn_mut(server_ch).path_mtu(), 1300); + + // Migrate client to a different port (and simulate a higher path MTU) + pair.max_udp_payload_size = 1500; + pair.client.addr = SocketAddr::new( + Ipv4Addr::new(127, 0, 0, 1).into(), + CLIENT_PORTS.lock().unwrap().next().unwrap(), + ); + pair.client_conn_mut(client_ch).ping(); + pair.drive(); + + // Sanity check: the server saw that the client address was updated + assert_eq!( + pair.server_conn_mut(server_ch).remote_address(), + pair.client.addr + ); + + // MTU detection has successfully run after migrating + assert_eq!( + pair.server_conn_mut(server_ch).path_mtu(), + client_max_udp_payload_size + ); + + // Sanity check: the client keeps the old MTU, because migration is triggered by incoming + // packets from a different address + assert_eq!(pair.client_conn_mut(client_ch).path_mtu(), 1293); +} + +#[test] +fn connect_runs_mtud_again_after_600_seconds() { + let _guard = subscribe(); + let mut server_config = server_config(); + let mut client_config = client_config(); + + // Note: we use an infinite idle timeout to ensure we can wait 600 seconds without the + // connection closing + Arc::get_mut(&mut server_config.transport) + .unwrap() + .max_idle_timeout(None); + Arc::get_mut(&mut client_config.transport) + .unwrap() + .max_idle_timeout(None); + + let mut pair = Pair::new(Default::default(), server_config); + pair.max_udp_payload_size = 1400; + let (client_ch, server_ch) = pair.connect_with(client_config); + pair.drive(); + + // Sanity check: the mtu has been discovered + let client_conn = pair.client_conn_mut(client_ch); + assert_eq!(client_conn.path_mtu(), 1389); + assert_eq!(client_conn.stats().path.sent_plpmtud_probes, 5); + assert_eq!(client_conn.stats().path.lost_plpmtud_probes, 3); + let server_conn = pair.server_conn_mut(server_ch); + assert_eq!(server_conn.path_mtu(), 1389); + assert_eq!(server_conn.stats().path.sent_plpmtud_probes, 5); + assert_eq!(server_conn.stats().path.lost_plpmtud_probes, 3); + + // Sanity check: the mtu does not change after the fact, even though the link now supports a + // higher udp payload size + pair.max_udp_payload_size = 1500; + pair.drive(); + assert_eq!(pair.client_conn_mut(client_ch).path_mtu(), 1389); + assert_eq!(pair.server_conn_mut(server_ch).path_mtu(), 1389); + + // The MTU changes after 600 seconds, because now MTUD runs for the second time + pair.time += Duration::from_secs(600); + pair.drive(); + assert!(!pair.client_conn_mut(client_ch).is_closed()); + assert!(!pair.server_conn_mut(client_ch).is_closed()); + assert_eq!(pair.client_conn_mut(client_ch).path_mtu(), 1452); + assert_eq!(pair.server_conn_mut(server_ch).path_mtu(), 1452); +} + +#[test] +fn packet_loss_and_retry_too_low_mtu() { + let _guard = subscribe(); + let mut pair = Pair::default(); + let (client_ch, server_ch) = pair.connect(); + + let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap(); + + pair.client_send(client_ch, s).write(b"hello").unwrap(); + pair.drive(); + + // Nothing will get past this mtu + pair.max_udp_payload_size = 10; + pair.client_send(client_ch, s).write(b" world").unwrap(); + pair.drive_client(); + + // The packet was dropped + assert!(pair.client.outbound.is_empty()); + assert!(pair.server.inbound.is_empty()); + + // Restore the default mtu, so future packets are properly transmitted + pair.max_udp_payload_size = DEFAULT_MAX_UDP_PAYLOAD_SIZE; + + // The lost packet is resent + pair.drive(); + assert!(pair.client.outbound.is_empty()); + + let recv = pair.server_recv(server_ch, s); + let buf = stream_chunks(recv); + + assert_eq!(buf, b"hello world".as_slice()); +} + +#[test] +fn blackhole_after_mtu_change_repairs_itself() { + let _guard = subscribe(); + let mut pair = Pair::default(); + pair.max_udp_payload_size = 1500; + let (client_ch, server_ch) = pair.connect(); + pair.drive(); + + // Sanity check + assert_eq!(pair.client_conn_mut(client_ch).path_mtu(), 1452); + assert_eq!(pair.server_conn_mut(server_ch).path_mtu(), 1452); + + // Back to the base MTU + pair.max_udp_payload_size = 1200; + + // The payload will be sent in a single packet, because the detected MTU was 1444, but it will + // be dropped because the link no longer supports that packet size! + let payload = vec![42; 1300]; + let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap(); + pair.client_send(client_ch, s).write(&payload).unwrap(); + let out_of_bounds = pair.drive_bounded(); + + if out_of_bounds { + panic!("Connections never reached an idle state"); + } + + let recv = pair.server_recv(server_ch, s); + let buf = stream_chunks(recv); + + // The whole packet arrived in the end + assert_eq!(buf.len(), 1300); + + // Sanity checks (black hole detected after 3 lost packets) + let client_stats = pair.client_conn_mut(client_ch).stats(); + assert!(client_stats.path.lost_packets >= 3); + assert!(client_stats.path.congestion_events >= 3); +} + +#[test] +fn packet_splitting_with_default_mtu() { + let _guard = subscribe(); + + // The payload needs to be split in 2 in order to be sent, because it is higher than the max MTU + let payload = vec![42; 1300]; + + let mut pair = Pair::default(); + let (client_ch, _) = pair.connect(); + pair.drive(); + + let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap(); + + pair.client_send(client_ch, s).write(&payload).unwrap(); + pair.client.drive(pair.time, pair.server.addr); + assert_eq!(pair.client.outbound.len(), 2); + + pair.drive_client(); + assert_eq!(pair.server.inbound.len(), 2); +} + +#[test] +fn packet_splitting_not_necessary_after_higher_mtu_discovered() { + let _guard = subscribe(); + let payload = vec![42; 1300]; + + let mut pair = Pair::default(); + pair.max_udp_payload_size = 1500; + + let (client_ch, _) = pair.connect(); + pair.drive(); + + let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap(); + + pair.client_send(client_ch, s).write(&payload).unwrap(); + pair.client.drive(pair.time, pair.server.addr); + assert_eq!(pair.client.outbound.len(), 1); + + pair.drive_client(); + assert_eq!(pair.server.inbound.len(), 1); +} + +fn stream_chunks(mut recv: RecvStream) -> Vec { + let mut buf = Vec::new(); + + let mut chunks = recv.read(true).unwrap(); + while let Ok(Some(chunk)) = chunks.next(usize::MAX) { + buf.extend(chunk.bytes); + } + + let _ = chunks.finalize(); + + buf +} diff --git a/quinn-proto/src/tests/util.rs b/quinn-proto/src/tests/util.rs index eeb4b3859f..ab90c1dbba 100644 --- a/quinn-proto/src/tests/util.rs +++ b/quinn-proto/src/tests/util.rs @@ -18,10 +18,14 @@ use tracing::{info_span, trace}; use super::*; +pub const DEFAULT_MAX_UDP_PAYLOAD_SIZE: usize = 1200; + pub struct Pair { pub server: TestEndpoint, pub client: TestEndpoint, pub time: Instant, + /// Simulates the maximum size allowed for UDP payloads by the link (packets exceeding this size will be dropped) + pub max_udp_payload_size: usize, // One-way pub latency: Duration, /// Number of spin bit flips @@ -50,6 +54,7 @@ impl Pair { server: TestEndpoint::new(server, server_addr), client: TestEndpoint::new(client, client_addr), time: Instant::now(), + max_udp_payload_size: DEFAULT_MAX_UDP_PAYLOAD_SIZE, latency: Duration::new(0, 0), spins: 0, last_spin: false, @@ -91,11 +96,32 @@ impl Pair { while self.step() {} } + /// Advance time until both connections are idle, or after 100 steps have been executed + /// + /// Returns true if the amount of steps exceeds the bounds, because the connections never became + /// idle + pub fn drive_bounded(&mut self) -> bool { + for _ in 0..100 { + if !self.step() { + return false; + } + } + + true + } + pub fn drive_client(&mut self) { let span = info_span!("client"); let _guard = span.enter(); self.client.drive(self.time, self.server.addr); for x in self.client.outbound.drain(..) { + if packet_size(&x) > self.max_udp_payload_size { + info!( + packet_size = packet_size(&x), + "dropping packet (max size exceeded)" + ); + continue; + } if x.contents[0] & packet::LONG_HEADER_FORM == 0 { let spin = x.contents[0] & packet::SPIN_BIT != 0; self.spins += (spin == self.last_spin) as u64; @@ -117,6 +143,13 @@ impl Pair { let _guard = span.enter(); self.server.drive(self.time, self.client.addr); for x in self.server.outbound.drain(..) { + if packet_size(&x) > self.max_udp_payload_size { + info!( + packet_size = packet_size(&x), + "dropping packet (max size exceeded)" + ); + continue; + } if let Some(ref socket) = self.server.socket { socket.send_to(&x.contents, x.destination).unwrap(); } @@ -351,6 +384,10 @@ impl TestEndpoint { pub fn assert_accept(&mut self) -> ConnectionHandle { self.accepted.take().expect("server didn't connect") } + + pub fn assert_no_accept(&self) { + assert!(self.accepted.is_none(), "server did unexpectedly connect") + } } impl ::std::ops::Deref for TestEndpoint { @@ -390,7 +427,11 @@ impl Write for TestWriter { } pub fn server_config() -> ServerConfig { - ServerConfig::with_crypto(Arc::new(server_crypto())) + let mut config = ServerConfig::with_crypto(Arc::new(server_crypto())); + Arc::get_mut(&mut config.transport) + .unwrap() + .plpmtud_config(Some(PlpmtudConfig::default())); + config } pub fn server_config_with_cert(cert: Certificate, key: PrivateKey) -> ServerConfig { @@ -408,7 +449,11 @@ pub fn server_crypto_with_cert(cert: Certificate, key: PrivateKey) -> rustls::Se } pub fn client_config() -> ClientConfig { - ClientConfig::new(Arc::new(client_crypto())) + let mut config = ClientConfig::new(Arc::new(client_crypto())); + Arc::get_mut(&mut config.transport) + .unwrap() + .plpmtud_config(Some(PlpmtudConfig::default())); + config } pub fn client_config_with_certs(certs: Vec) -> ClientConfig { @@ -468,6 +513,14 @@ fn split_transmit(transmit: Transmit) -> Vec { transmits } +fn packet_size(transmit: &Transmit) -> usize { + if transmit.segment_size.is_some() { + panic!("This transmit is meant to be split into multiple packets!"); + } + + transmit.contents.len() +} + lazy_static! { pub static ref SERVER_PORTS: Mutex> = Mutex::new(4433..); pub static ref CLIENT_PORTS: Mutex> = Mutex::new(44433..);