Skip to content

Commit

Permalink
fix(s2n-quic-transport): decrement handshake count for non-accepted f…
Browse files Browse the repository at this point in the history
…inalized connections (#1911)
  • Loading branch information
WesleyRosenblum authored Aug 14, 2023
1 parent b1b5470 commit c7e3f51
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 41 deletions.
49 changes: 29 additions & 20 deletions quic/s2n-quic-transport/src/connection/connection_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,13 @@ impl<C: connection::Trait, L: connection::Lock<C>> InterestLists<C, L> {
}
}

if node.inner.read(|conn| !conn.is_accepted())? {
// Decrement the inflight handshakes because the connection has
// been finalized before it was handed back to the application
// and thus this count was not decremented previously
self.handshake_connections -= 1;
}

insert_interest!(done_connections, push_back);
} else {
unreachable!("Done connections should never report not done later");
Expand Down Expand Up @@ -867,35 +874,24 @@ impl<C: connection::Trait, L: connection::Lock<C>> ConnectionContainer<C, L> {
self.interest_lists.handshake_connections = self.count_handshaking_connections();
}

self.ensure_counter_consistency();
self.finalize_done_connections();
self.ensure_counter_consistency();

Some((result, interests))
}

/// Removes all Connections in the `done` state from the `ConnectionContainer`.
pub fn finalize_done_connections(&mut self) {
fn finalize_done_connections(&mut self) {
debug_assert_eq!(
self.interest_lists.handshake_connections + self.count_done_handshaking_connections(),
self.count_handshaking_connections()
);

for connection in self.interest_lists.done_connections.take() {
self.remove_node(&connection);

// If the connection is still handshaking then it must have timed out.
let result = connection.inner.read(|conn| conn.is_handshaking());
match result {
Ok(true) => {
self.interest_lists.handshake_connections -= 1;
self.ensure_counter_consistency();
}
Ok(false) => {
// nothing to do
}
Err(_) => {
// The connection panicked so we need to recompute all of the handshaking
// connections
self.interest_lists.handshake_connections =
self.count_handshaking_connections();
}
}
}

debug_assert_eq!(0, self.count_done_handshaking_connections());
}

fn count_handshaking_connections(&self) -> usize {
Expand All @@ -910,6 +906,19 @@ impl<C: connection::Trait, L: connection::Lock<C>> ConnectionContainer<C, L> {
.count()
}

fn count_done_handshaking_connections(&self) -> usize {
self.interest_lists
.done_connections
.iter()
.filter(|conn| {
conn.inner
.read(|conn| conn.is_handshaking())
.ok()
.unwrap_or(false)
})
.count()
}

fn ensure_counter_consistency(&self) {
if cfg!(debug_assertions) {
let expected = self.count_handshaking_connections();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
use super::*;
use crate::{
connection::{
self, connection_interests::ConnectionInterests,
self, connection_impl::AcceptState, connection_interests::ConnectionInterests,
internal_connection_id::InternalConnectionId, InternalConnectionIdGenerator,
ProcessingError,
ProcessingError, Trait,
},
endpoint, path, stream,
};
Expand Down Expand Up @@ -36,8 +36,7 @@ use s2n_quic_core::{
use std::sync::Mutex;

struct TestConnection {
is_handshaking: bool,
has_been_accepted: bool,
accept_state: AcceptState,
is_closed: bool,
interests: ConnectionInterests,
close_timer: Timer,
Expand All @@ -46,8 +45,7 @@ struct TestConnection {
impl Default for TestConnection {
fn default() -> Self {
Self {
is_handshaking: true,
has_been_accepted: false,
accept_state: AcceptState::Handshaking,
is_closed: false,
interests: ConnectionInterests {
transmission: true,
Expand All @@ -70,7 +68,11 @@ impl connection::Trait for TestConnection {
}

fn is_handshaking(&self) -> bool {
self.is_handshaking
self.accept_state == AcceptState::Handshaking
}

fn is_accepted(&self) -> bool {
self.accept_state == AcceptState::Active
}

fn close(
Expand All @@ -88,8 +90,8 @@ impl connection::Trait for TestConnection {
}

fn mark_as_accepted(&mut self) {
assert!(!self.has_been_accepted);
self.has_been_accepted = true;
assert!(!self.is_accepted());
self.accept_state = AcceptState::Active;
self.interests.accept = false;
}

Expand Down Expand Up @@ -391,6 +393,10 @@ enum Operation {
timeout: Option<u16>,
},
CloseApp,
HandshakeCompleted {
index: usize,
closed: bool,
},
Receive,
Timeout(u16),
Transmit(u16),
Expand Down Expand Up @@ -421,12 +427,6 @@ fn container_test() {
let connection = TestConnection::default();
container.insert_connection(connection, id);
connections.push(id);

let mut was_called = false;
container.with_connection(id, |_conn| {
was_called = true;
});
assert!(was_called);
}
Operation::UpdateInterests {
index,
Expand Down Expand Up @@ -459,12 +459,9 @@ fn container_test() {
}

i.closing = *closing;
if !conn.has_been_accepted {
if conn.accept_state == AcceptState::HandshakeCompleted {
i.accept = *accept;
}
if *accept {
conn.is_handshaking = false;
}
i.transmission = *transmission;
i.new_connection_id = *new_connection_id;
i.timeout = timeout.map(|ms| now + Duration::from_millis(ms as _));
Expand All @@ -484,6 +481,34 @@ fn container_test() {
Operation::CloseApp => {
handle = None;
}
Operation::HandshakeCompleted { index, closed } => {
if connections.is_empty() {
continue;
}
let index = index % connections.len();
let id = connections[index];

let mut was_called = false;
container.with_connection(id, |conn| {
if conn.is_handshaking() {
conn.accept_state = AcceptState::HandshakeCompleted;
if *closed {
// The connection was closed immediately following the
// handshake being completed and before it could be accepted
conn.is_closed = true;
conn.interests = ConnectionInterests {
finalization: true,
..Default::default()
};
connections.remove(index);
} else {
conn.interests.accept = true;
}
}
was_called = true;
});
assert!(was_called);
}
Operation::Receive => {
if let Some(handle) = handle.as_mut() {
while let Poll::Ready(Some(_accepted)) = handle
Expand Down Expand Up @@ -566,7 +591,7 @@ fn container_test() {
let mut cursor = container.connection_map.front();

while let Some(conn) = cursor.get() {
assert_eq!(conn.internal_connection_id, connections.next().unwrap());
assert_eq!(Some(conn.internal_connection_id), connections.next());
cursor.move_next();
}

Expand Down
6 changes: 5 additions & 1 deletion quic/s2n-quic-transport/src/connection/connection_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ use s2n_quic_core::{
/// Possible states for handing over a connection from the endpoint to the
/// application.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum AcceptState {
pub(crate) enum AcceptState {
/// The connection is handshaking on the server side and not yet visible
/// to the application.
Handshaking,
Expand Down Expand Up @@ -555,6 +555,10 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
self.accept_state == AcceptState::Handshaking
}

fn is_accepted(&self) -> bool {
self.accept_state == AcceptState::Active
}

/// Creates a new `Connection` instance with the given configuration
fn new(parameters: ConnectionParameters<Self::Config>) -> Result<Self, connection::Error> {
let mut event_context = EventContext {
Expand Down
4 changes: 4 additions & 0 deletions quic/s2n-quic-transport/src/connection/connection_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ pub trait ConnectionTrait: 'static + Send + Sized {
/// Returns whether the connection is in the handshake state
fn is_handshaking(&self) -> bool;

/// Returns true if the handshake has completed and the connection
/// has been handed off to the application
fn is_accepted(&self) -> bool;

/// Initiates closing the connection as described in
/// https://www.rfc-editor.org/rfc/rfc9000#section-10
fn close(
Expand Down

0 comments on commit c7e3f51

Please sign in to comment.