Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent sending of ACK-only packets #1130

Merged
merged 1 commit into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 57 additions & 12 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ mod spaces;
pub use spaces::Retransmits;
#[cfg(not(fuzzing))]
use spaces::Retransmits;
use spaces::{PacketSpace, SentPacket, ThinRetransmits};
use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};

mod stats;
pub use stats::ConnectionStats;
Expand Down Expand Up @@ -516,7 +516,8 @@ where
}

// Is there data or a close message to send in this space?
if !self.space_can_send(space_id) && !close {
let can_send = self.space_can_send(space_id);
if can_send.is_empty() && !close {
space_idx += 1;
continue;
}
Expand Down Expand Up @@ -703,6 +704,20 @@ where
}

let sent = self.populate_packet(space_id, &mut buf, buf_capacity - builder.tag_len);

// ACK-only packets should only be sent when explicitly allowed. If we write them due
// to any other reason, there is a bug which leads to one component announcing write
// readiness while not writing any data. This degrades performance. The condition is
// only checked if the full MTU is available, so that lack of space in the datagram isn't
// the reason for just writing ACKs.
debug_assert!(
!(sent.is_ack_only()
&& !can_send.acks
&& can_send.other
&& (buf_capacity - builder.datagram_start) == self.path.mtu as _),
"SendableFrames was {:?}, but only ACKs have been written",
can_send
);
pad_datagram |= sent.requires_padding;

// If we sent any acks, don't immediately resend them. Setting this even if ack_only is
Expand Down Expand Up @@ -757,23 +772,35 @@ where
})
}

/// Returns `true` if a space has outgoing data to send
fn space_can_send(&self, space_id: SpaceId) -> bool {
if self.spaces[space_id].crypto.is_some() && self.spaces[space_id].can_send() {
return true;
/// Indicate what types of frames are ready to send for the given space
fn space_can_send(&self, space_id: SpaceId) -> SendableFrames {
if self.spaces[space_id].crypto.is_some() {
let can_send = self.spaces[space_id].can_send();
if !can_send.is_empty() {
return can_send;
}
}

if space_id != SpaceId::Data {
return false;
return SendableFrames::empty();
}

if self.spaces[space_id].crypto.is_some() && self.can_send_1rtt() {
return true;
return SendableFrames {
other: true,
acks: false,
};
}

self.zero_rtt_crypto.is_some()
&& self.side.is_client()
&& (self.spaces[space_id].can_send() || self.can_send_1rtt())
if self.zero_rtt_crypto.is_some() && self.side.is_client() {
let mut can_send = self.spaces[space_id].can_send();
can_send.other |= self.can_send_1rtt();
if !can_send.is_empty() {
return can_send;
}
}

SendableFrames::empty()
}

/// Process `ConnectionEvent`s generated by the associated `Endpoint`
Expand Down Expand Up @@ -2562,6 +2589,7 @@ where
if mem::replace(&mut space.ping_pending, false) {
trace!("PING");
buf.write(frame::Type::PING);
sent.non_retransmits = true;
self.stats.frame_tx.ping += 1;
}

Expand All @@ -2586,6 +2614,7 @@ where
if let Some(token) = self.path.challenge {
// But only send a packet solely for that purpose at most once
self.path.challenge_pending = false;
sent.non_retransmits = true;
sent.requires_padding = true;
trace!("PATH_CHALLENGE {:08x}", token);
buf.write(frame::Type::PATH_CHALLENGE);
Expand All @@ -2597,6 +2626,7 @@ where
// PATH_RESPONSE
if buf.len() + 9 < max_size && space_id == SpaceId::Data {
if let Some(response) = self.path_response.take() {
sent.non_retransmits = true;
sent.requires_padding = true;
trace!("PATH_RESPONSE {:08x}", response.token);
buf.write(frame::Type::PATH_RESPONSE);
Expand Down Expand Up @@ -2695,7 +2725,10 @@ where
// DATAGRAM
while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
match self.datagrams.write(buf, max_size) {
true => self.stats.frame_tx.datagram += 1,
true => {
sent.non_retransmits = true;
self.stats.frame_tx.datagram += 1;
}
false => break,
}
}
Expand Down Expand Up @@ -3184,5 +3217,17 @@ struct SentFrames {
retransmits: ThinRetransmits,
acks: ArrayRangeSet,
stream_frames: StreamMetaVec,
/// Whether the packet contains non-retransmittable frames (like datagrams)
non_retransmits: bool,
requires_padding: bool,
}

impl SentFrames {
/// Returns whether the packet contains only ACKs
pub fn is_ack_only(&self) -> bool {
!self.acks.is_empty()
&& !self.non_retransmits
&& self.stream_frames.is_empty()
&& self.retransmits.is_empty()
}
}
31 changes: 27 additions & 4 deletions quinn-proto/src/connection/spaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,11 @@ where
x
}

pub(crate) fn can_send(&self) -> bool {
!self.pending.is_empty()
|| (self.permit_ack_only && !self.pending_acks.is_empty())
|| self.ping_pending
pub(crate) fn can_send(&self) -> SendableFrames {
let acks = self.permit_ack_only && !self.pending_acks.is_empty();
let other = !self.pending.is_empty() || self.ping_pending;

SendableFrames { acks, other }
}

/// Verifies sanity of an ECN block and returns whether congestion was encountered.
Expand Down Expand Up @@ -413,6 +414,28 @@ impl Dedup {
}
}

/// Indicates which data is available for sending
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub(crate) struct SendableFrames {
pub acks: bool,
pub other: bool,
}

impl SendableFrames {
/// Returns that no data is available for sending
pub fn empty() -> Self {
Self {
acks: false,
other: false,
}
}

/// Whether no data is sendable
pub fn is_empty(&self) -> bool {
!self.acks && !self.other
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
15 changes: 15 additions & 0 deletions quinn-proto/src/connection/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,21 @@ fn push_pending(pending: &mut BinaryHeap<PendingLevel>, id: StreamId, priority:
return;
}
}

// If there is only a single level and it's empty, repurpose it for the
// required priority
if pending.len() == 1 {
if let Some(mut first) = pending.peek_mut() {
let mut queue = first.queue.borrow_mut();
if queue.is_empty() {
queue.push_back(id);
drop(queue);
first.priority = priority;
return;
}
}
}

let mut queue = VecDeque::new();
queue.push_back(id);
pending.push(PendingLevel {
Expand Down
97 changes: 93 additions & 4 deletions quinn-proto/src/connection/streams/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,9 @@ impl StreamsState {
}

pub fn can_send(&self) -> bool {
!self.pending.is_empty()
self.pending
.peek()
.map_or(false, |head| !head.queue.borrow().is_empty())
}

pub fn write_control_frames(
Expand Down Expand Up @@ -435,6 +437,7 @@ impl StreamsState {
break;
}

let num_levels = self.pending.len();
let mut level = match self.pending.peek_mut() {
Some(x) => x,
None => break,
Expand All @@ -447,8 +450,11 @@ impl StreamsState {
let id = match level.queue.get_mut().pop_front() {
Some(x) => x,
None => {
PeekMut::pop(level);
continue;
debug_assert!(
num_levels == 1,
"An empty queue is only allowed for a single level"
);
break;
}
};
let stream = match self.send.get_mut(&id) {
Expand All @@ -473,13 +479,26 @@ impl StreamsState {
if fin {
stream.fin_pending = false;
}

if stream.is_pending() {
if level.priority == stream.priority {
// Enqueue for the same level
level.queue.get_mut().push_back(id);
} else {
drop(level);
// Enqueue for a different level. If the current level is empty, drop it
if level.queue.borrow().is_empty() && num_levels != 1 {
// We keep the last level around even in empty form so that
// the next insert doesn't have to reallocate the queue
PeekMut::pop(level);
Comment on lines +488 to +492
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we really should find a way to avoid the three duplicated code sections.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the priorities again or don't care about performance :-)

While I think there might be ways to make this prettier, I'm not super inclined to investigate further in this. This is a rather performance-impacting bug that is open for 30 days, and I would rather like to see it fixed ASAP and spend energy into more impactful things than moving code around.

It's also not exactly duplicated.

} else {
drop(level);
}
push_pending(&mut self.pending, id, stream.priority);
}
} else if level.queue.borrow().is_empty() && num_levels != 1 {
// We keep the last level around even in empty form so that
// the next insert doesn't have to reallocate the queue
PeekMut::pop(level);
}

let meta = frame::StreamMeta { id, offsets, fin };
Expand Down Expand Up @@ -1157,6 +1176,76 @@ mod tests {
assert_eq!(meta[0].id, id_high);
assert_eq!(meta[1].id, id_mid);
assert_eq!(meta[2].id, id_low);

assert!(!server.can_send());
assert_eq!(server.pending.len(), 1);
}

#[test]
fn requeue_stream_priority() {
let mut server = make(Side::Server);
server.set_params(&TransportParameters {
initial_max_streams_bidi: 3u32.into(),
initial_max_data: 1000u32.into(),
initial_max_stream_data_bidi_remote: 1000u32.into(),
..Default::default()
});

let (mut pending, state) = (Retransmits::default(), ConnState::Established);
let mut streams = Streams {
state: &mut server,
conn_state: &state,
};

let id_high = streams.open(Dir::Bi).unwrap();
let id_mid = streams.open(Dir::Bi).unwrap();

let mut mid = SendStream {
id: id_mid,
state: &mut server,
pending: &mut pending,
conn_state: &state,
};
assert_eq!(mid.write(b"mid").unwrap(), 3);
assert_eq!(server.pending.len(), 1);

let mut high = SendStream {
id: id_high,
state: &mut server,
pending: &mut pending,
conn_state: &state,
};
high.set_priority(1).unwrap();
assert_eq!(high.write(&[0; 200]).unwrap(), 200);
assert_eq!(server.pending.len(), 2);

// Requeue the high priority stream to lowest priority. The initial send
// still uses high priority since it's queued that way. After that it will
// switch to low priority
let mut high = SendStream {
id: id_high,
state: &mut server,
pending: &mut pending,
conn_state: &state,
};
high.set_priority(-1).unwrap();

let mut buf = Vec::with_capacity(1000);
let meta = server.write_stream_frames(&mut buf, 40);
assert_eq!(meta.len(), 1);
assert_eq!(meta[0].id, id_high);

// After requeuing we should end up with 2 priorities - not 3
assert_eq!(server.pending.len(), 2);

// Send the remaining data. The initial mid priority one should go first now
let meta = server.write_stream_frames(&mut buf, 1000);
assert_eq!(meta.len(), 2);
assert_eq!(meta[0].id, id_mid);
assert_eq!(meta[1].id, id_high);

assert!(!server.can_send());
assert_eq!(server.pending.len(), 1);
}

#[test]
Expand Down