Skip to content

Commit

Permalink
fix(qns): fallback if server is not sending NEW_TOKEN (#1837)
Browse files Browse the repository at this point in the history
* feat(qns): add resumption testcase

Test failure reported in #1786 (comment).

Signed-off-by: Max Inden <mail@max-inden.de>

* fix: fallback for servers not sending NEW_TOKEN frame

See https://github.com/mozilla/neqo/blob/main/neqo-transport/src/connection/mod.rs#L665-L676 for details.

Fixes regression introduced in #1676.

* Trigger CI

* Still wait if there is none

* Revert "Still wait if there is none"

This reverts commit 710c500.

* Refactor resumption logic

---------

Signed-off-by: Max Inden <mail@max-inden.de>
  • Loading branch information
mxinden authored Apr 23, 2024
1 parent e53d60f commit da9766e
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 53 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/qns.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,6 @@ jobs:
name: 'neqo-latest'
image: ${{ steps.docker_build_and_push.outputs.imageID }}
url: https://github.com/mozilla/neqo
test: handshake,ecn,keyupdate
test: handshake,ecn,keyupdate,resumption
client: neqo-latest,quic-go,ngtcp2,neqo,msquic
server: neqo-latest,quic-go,ngtcp2,neqo,msquic
30 changes: 18 additions & 12 deletions neqo-bin/src/client/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,23 @@ impl<'a> super::Handler for Handler<'a> {
}
}

if self.streams.is_empty() && self.url_queue.is_empty() {
// Handler is done.
return Ok(true);
if !self.streams.is_empty() || !self.url_queue.is_empty() {
return Ok(false);
}

Ok(false)
if self.args.resume && self.token.is_none() {
let Some(token) = client.take_resumption_token(Instant::now()) else {
return Ok(false);
};
self.token = Some(token);
}

Ok(true)
}

fn take_token(&mut self) -> Option<ResumptionToken> {
self.token.take()
}

fn has_token(&self) -> bool {
self.token.is_some()
}
}

pub(crate) fn create_client(
Expand Down Expand Up @@ -161,11 +163,15 @@ impl super::Client for Connection {
}
}

fn is_closed(&self) -> Option<ConnectionError> {
if let State::Closed(err) = self.state() {
return Some(err.clone());
fn is_closed(&self) -> Result<bool, ConnectionError> {
match self.state() {
State::Closed(
ConnectionError::Transport(neqo_transport::Error::NoError)
| ConnectionError::Application(0),
) => Ok(true),
State::Closed(err) => Err(err.clone()),
_ => Ok(false),
}
None
}

fn stats(&self) -> neqo_transport::Stats {
Expand Down
16 changes: 8 additions & 8 deletions neqo-bin/src/client/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,15 @@ pub(crate) fn create_client(
}

impl super::Client for Http3Client {
fn is_closed(&self) -> Option<ConnectionError> {
if let Http3State::Closed(err) = self.state() {
return Some(err);
fn is_closed(&self) -> Result<bool, ConnectionError> {
match self.state() {
Http3State::Closed(
ConnectionError::Transport(neqo_transport::Error::NoError)
| ConnectionError::Application(0),
) => Ok(true),
Http3State::Closed(err) => Err(err.clone()),
_ => Ok(false),
}
None
}

fn process_output(&mut self, now: Instant) -> Output {
Expand Down Expand Up @@ -226,10 +230,6 @@ impl<'a> super::Handler for Handler<'a> {
fn take_token(&mut self) -> Option<ResumptionToken> {
self.token.take()
}

fn has_token(&self) -> bool {
self.token.is_some()
}
}

trait StreamHandler {
Expand Down
58 changes: 26 additions & 32 deletions neqo-bin/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use neqo_crypto::{
init, Cipher, ResumptionToken,
};
use neqo_http3::Output;
use neqo_transport::{AppError, ConnectionError, ConnectionId, Error as TransportError, Version};
use neqo_transport::{AppError, ConnectionError, ConnectionId, Version};
use qlog::{events::EventImportance, streamer::QlogStreamer};
use tokio::time::Sleep;
use url::{Origin, Url};
Expand Down Expand Up @@ -137,7 +137,7 @@ pub struct Args {
/// Save contents of fetched URLs to a directory
output_dir: Option<PathBuf>,

#[arg(short = 'r', long)]
#[arg(short = 'r', long, hide = true)]
/// Client attempts to resume by making multiple connections to servers.
/// Requires that 2 or more URLs are listed for each server.
/// Use this for 0-RTT: the stack always attempts 0-RTT on resumption.
Expand Down Expand Up @@ -227,6 +227,11 @@ impl Args {
exit(127)
}

if self.resume {
qerror!("internal option resume set by user");
exit(127)
}

// Only use v1 for most QNS tests.
self.shared.quic_parameters.quic_version = vec![Version::Version1];
match testcase.as_str() {
Expand Down Expand Up @@ -342,7 +347,6 @@ trait Handler {

fn handle(&mut self, client: &mut Self::Client) -> Res<bool>;
fn take_token(&mut self) -> Option<ResumptionToken>;
fn has_token(&self) -> bool;
}

/// Network client, e.g. [`neqo_transport::Connection`] or [`neqo_http3::Http3Client`].
Expand All @@ -359,7 +363,7 @@ trait Client {
///
/// Note that connection was closed without error on
/// [`Some(ConnectionError::Transport(TransportError::NoError))`].
fn is_closed(&self) -> Option<ConnectionError>;
fn is_closed(&self) -> Result<bool, ConnectionError>;
fn stats(&self) -> neqo_transport::Stats;
}

Expand All @@ -376,46 +380,36 @@ impl<'a, H: Handler> Runner<'a, H> {
async fn run(mut self) -> Res<Option<ResumptionToken>> {
loop {
let handler_done = self.handler.handle(&mut self.client)?;

match (handler_done, self.args.resume, self.handler.has_token()) {
// Handler isn't done. Continue.
(false, _, _) => {},
// Handler done. Resumption token needed but not present. Continue.
(true, true, false) => {
qdebug!("Handler done. Waiting for resumption token.");
}
// Handler is done, no resumption token needed. Close.
(true, false, _) |
// Handler is done, resumption token needed and present. Close.
(true, true, true) => {
self.client.close(Instant::now(), 0, "kthxbye!");
}
}

self.process_output().await?;

if let Some(reason) = self.client.is_closed() {
if self.args.stats {
qinfo!("{:?}", self.client.stats());
}
return match reason {
ConnectionError::Transport(TransportError::NoError)
| ConnectionError::Application(0) => Ok(self.handler.take_token()),
_ => Err(reason.into()),
};
}

if self.client.has_events() {
continue;
}

match (handler_done, self.client.is_closed()?) {
// more work
(false, _) => {}
// no more work, closing connection
(true, false) => {
self.client.close(Instant::now(), 0, "kthxbye!");
continue;
}
// no more work, connection closed, terminating
(true, true) => break,
}

match ready(self.socket, self.timeout.as_mut()).await? {
Ready::Socket => self.process_multiple_input().await?,
Ready::Timeout => {
self.timeout = None;
}
}
}

if self.args.stats {
qinfo!("{:?}", self.client.stats());
}

Ok(self.handler.take_token())
}

async fn process_output(&mut self) -> Result<(), io::Error> {
Expand Down

0 comments on commit da9766e

Please sign in to comment.