Skip to content

Commit

Permalink
Prevent sending of ACK-only packets
Browse files Browse the repository at this point in the history
The priority-based send state implementation had an issue where if returned
`self.can_send() == true` even if no data was available for sending.
This condition lead the `poll_transmit` method to initiate a transmission which
passed the ACK-only check since it assumed there was stream data to send.
However when trying to write stream frames no actual stream data could be
written.

The reason for this was that the `can_send()` condition only checked for the
length of the binary heap. However the length of this one was only reduced
in `write_stream_frames()` when the level was inspected the next time, and
not after data of one level was actually written.

This change fixes that, and drops the unused levels immediately after data
was written. There is on exception however: If only one level is left, it is
kept around and reused for future transmits to avoid deallocating and
reallocating the transmit queue continuously. However the `can_send` check
was updated to account for the empty level.

I also added some additional debug asserts which cross-checks
if an operation which announced to write more than just ACKs
still ended up only writing ACKs. That should make it easier to
determine similar issues in the future.

**Ack only transmissions in benchmark before change:** 1462

**Ack only transmissions in benchmark after change:** 45
  • Loading branch information
Matthias247 committed May 24, 2021
1 parent 8c89bfb commit d21032b
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 12 deletions.
58 changes: 50 additions & 8 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,8 @@ where
}

// Is there data or a close message to send in this space?
if !self.space_can_send(space_id) && !close {
let can_send = self.space_can_send(space_id);
if can_send == CanSendResult::NoDataAvailable && !close {
space_idx += 1;
continue;
}
Expand Down Expand Up @@ -703,6 +704,16 @@ where
}

let sent = self.populate_packet(space_id, &mut buf, buf_capacity - builder.tag_len);

// ACK-only packets should only be sent when explicitely allowed
// If we write them due to another reason there is a bug somewhere
// which leads to one component announcing write readiness and not writing any data.
// This degrades performance.
debug_assert!(
!sent.is_ack_only() || can_send == CanSendResult::AckDataAvailable,
"Send reason was {:?}, but only ACKs have been written",
can_send
);
pad_datagram |= sent.requires_padding;

// If we sent any acks, don't immediately resend them. Setting this even if ack_only is
Expand Down Expand Up @@ -757,23 +768,28 @@ where
})
}

/// Returns `true` if a space has outgoing data to send
fn space_can_send(&self, space_id: SpaceId) -> bool {
/// Returns `if a space has outgoing data to send and what the expected type of data is
fn space_can_send(&self, space_id: SpaceId) -> CanSendResult {
if self.spaces[space_id].crypto.is_some() && self.spaces[space_id].can_send() {
return true;
return CanSendResult::AckDataAvailable;
}

if space_id != SpaceId::Data {
return false;
return CanSendResult::NoDataAvailable;
}

if self.spaces[space_id].crypto.is_some() && self.can_send_1rtt() {
return true;
return CanSendResult::OneRttAvailable;
}

self.zero_rtt_crypto.is_some()
if self.zero_rtt_crypto.is_some()
&& self.side.is_client()
&& (self.spaces[space_id].can_send() || self.can_send_1rtt())
{
return CanSendResult::ZeroRttAvailable;
}

CanSendResult::NoDataAvailable
}

/// Process `ConnectionEvent`s generated by the associated `Endpoint`
Expand Down Expand Up @@ -2564,6 +2580,7 @@ where
if mem::replace(&mut space.ping_pending, false) {
trace!("PING");
buf.write(frame::Type::PING);
sent.non_retransmits = true;
self.stats.frame_tx.ping += 1;
}

Expand All @@ -2588,6 +2605,7 @@ where
if let Some(token) = self.path.challenge {
// But only send a packet solely for that purpose at most once
self.path.challenge_pending = false;
sent.non_retransmits = true;
sent.requires_padding = true;
trace!("PATH_CHALLENGE {:08x}", token);
buf.write(frame::Type::PATH_CHALLENGE);
Expand All @@ -2599,6 +2617,7 @@ where
// PATH_RESPONSE
if buf.len() + 9 < max_size && space_id == SpaceId::Data {
if let Some(response) = self.path_response.take() {
sent.non_retransmits = true;
sent.requires_padding = true;
trace!("PATH_RESPONSE {:08x}", response.token);
buf.write(frame::Type::PATH_RESPONSE);
Expand Down Expand Up @@ -2697,7 +2716,10 @@ where
// DATAGRAM
while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
match self.datagrams.write(buf, max_size) {
true => self.stats.frame_tx.datagram += 1,
true => {
sent.non_retransmits = true;
self.stats.frame_tx.datagram += 1;
}
false => break,
}
}
Expand Down Expand Up @@ -3180,5 +3202,25 @@ struct SentFrames {
retransmits: ThinRetransmits,
acks: ArrayRangeSet,
stream_frames: StreamMetaVec,
/// Whether the packet contains non-retransmittable frames (like datagrams)
non_retransmits: bool,
requires_padding: bool,
}

impl SentFrames {
/// Returns whether the packet contains only ACKs
pub fn is_ack_only(&self) -> bool {
!self.acks.is_empty()
&& !self.non_retransmits
&& self.stream_frames.is_empty()
&& self.retransmits.is_empty()
}
}

#[derive(Clone, Copy, PartialEq, Eq, Debug)]
enum CanSendResult {
OneRttAvailable,
AckDataAvailable,
ZeroRttAvailable,
NoDataAvailable,
}
15 changes: 15 additions & 0 deletions quinn-proto/src/connection/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,20 @@ fn push_pending(pending: &mut BinaryHeap<PendingLevel>, id: StreamId, priority:
return;
}
}

// If there is only a single level and it's empty, repurpose it for the
// required priority
if pending.len() == 1 {
let mut first = pending.peek_mut().unwrap();
let mut queue = first.queue.borrow_mut();
if queue.is_empty() {
queue.push_back(id);
drop(queue);
first.priority = priority;
return;
}
}

let mut queue = VecDeque::new();
queue.push_back(id);
pending.push(PendingLevel {
Expand All @@ -318,6 +332,7 @@ fn push_pending(pending: &mut BinaryHeap<PendingLevel>, id: StreamId, priority:
});
}

#[derive(Debug)]
struct PendingLevel {
// RefCell is needed because BinaryHeap doesn't have an iter_mut()
queue: RefCell<VecDeque<StreamId>>,
Expand Down
100 changes: 96 additions & 4 deletions quinn-proto/src/connection/streams/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,10 @@ impl StreamsState {
}

pub fn can_send(&self) -> bool {
!self.pending.is_empty()
match self.pending.peek() {
Some(head) => head.queue.borrow().len() != 0,
None => false,
}
}

pub fn write_control_frames(
Expand Down Expand Up @@ -433,6 +436,7 @@ impl StreamsState {
break;
}

let num_levels = self.pending.len();
let mut level = match self.pending.peek_mut() {
Some(x) => x,
None => break,
Expand All @@ -445,8 +449,11 @@ impl StreamsState {
let id = match level.queue.get_mut().pop_front() {
Some(x) => x,
None => {
PeekMut::pop(level);
continue;
debug_assert!(
num_levels == 1,
"An empty queue is only allowed for a single level"
);
break;
}
};
let stream = match self.send.get_mut(&id) {
Expand All @@ -471,13 +478,28 @@ impl StreamsState {
if fin {
stream.fin_pending = false;
}

if stream.is_pending() {
if level.priority == stream.priority {
// Enqueue for the same level
level.queue.get_mut().push_back(id);
} else {
drop(level);
// Enqueue for a different level. If the current level is empty, drop it
if level.queue.borrow().is_empty() && num_levels != 1 {
// We keep the last level around even in empty form so that
// the next insert doesn't have to reallocate the queue
PeekMut::pop(level);
} else {
drop(level);
}
push_pending(&mut self.pending, id, stream.priority);
}
} else {
if level.queue.borrow().is_empty() && num_levels != 1 {
// We keep the last level around even in empty form so that
// the next insert doesn't have to reallocate the queue
PeekMut::pop(level);
}
}

let meta = frame::StreamMeta { id, offsets, fin };
Expand Down Expand Up @@ -1145,6 +1167,76 @@ mod tests {
assert_eq!(meta[0].id, id_high);
assert_eq!(meta[1].id, id_mid);
assert_eq!(meta[2].id, id_low);

assert!(!server.can_send());
assert_eq!(server.pending.len(), 1);
}

#[test]
fn requeue_stream_priority() {
let mut server = make(Side::Server);
server.set_params(&TransportParameters {
initial_max_streams_bidi: 3u32.into(),
initial_max_data: 1000u32.into(),
initial_max_stream_data_bidi_remote: 1000u32.into(),
..Default::default()
});

let (mut pending, state) = (Retransmits::default(), ConnState::Established);
let mut streams = Streams {
state: &mut server,
conn_state: &state,
};

let id_high = streams.open(Dir::Bi).unwrap();
let id_mid = streams.open(Dir::Bi).unwrap();

let mut mid = SendStream {
id: id_mid,
state: &mut server,
pending: &mut pending,
conn_state: &state,
};
assert_eq!(mid.write(b"mid").unwrap(), 3);
assert_eq!(server.pending.len(), 1);

let mut high = SendStream {
id: id_high,
state: &mut server,
pending: &mut pending,
conn_state: &state,
};
high.set_priority(1).unwrap();
assert_eq!(high.write(&[0; 200]).unwrap(), 200);
assert_eq!(server.pending.len(), 2);

// Requeue the high priority stream to lowest priority. The initial send
// still uses high priority since it's queued that way. After that it will
// switch to low priority
let mut high = SendStream {
id: id_high,
state: &mut server,
pending: &mut pending,
conn_state: &state,
};
high.set_priority(-1).unwrap();

let mut buf = Vec::with_capacity(1000);
let meta = server.write_stream_frames(&mut buf, 40);
assert_eq!(meta.len(), 1);
assert_eq!(meta[0].id, id_high);

// After requeuing we should end up with 2 priorities - not 3
assert_eq!(server.pending.len(), 2);

// Send the remaining data. The initial mid priority one should go first now
let meta = server.write_stream_frames(&mut buf, 1000);
assert_eq!(meta.len(), 2);
assert_eq!(meta[0].id, id_mid);
assert_eq!(meta[1].id, id_high);

assert!(!server.can_send());
assert_eq!(server.pending.len(), 1);
}

#[test]
Expand Down

0 comments on commit d21032b

Please sign in to comment.