diff --git a/.dockerignore b/.dockerignore index cc95fda49e..324520383b 100644 --- a/.dockerignore +++ b/.dockerignore @@ -5,5 +5,6 @@ !**/*.rs !**/*.h !**/*.hpp +!neqo-crypto/min_version.txt !qns !Cargo.lock diff --git a/.github/actions/pr-comment/action.yml b/.github/actions/pr-comment/action.yml index b7f9bb12da..1e84aa5bb4 100644 --- a/.github/actions/pr-comment/action.yml +++ b/.github/actions/pr-comment/action.yml @@ -5,6 +5,9 @@ inputs: name: description: 'Artifact name to import comment data from.' required: true + mode: + description: 'Mode of operation (upsert/recreate/delete).' + default: 'upsert' token: description: 'A Github PAT' required: true @@ -29,5 +32,6 @@ runs: - uses: thollander/actions-comment-pull-request@v2 with: filePath: contents + mode: ${{ inputs.mode }} pr_number: ${{ steps.pr-number.outputs.number }} comment_tag: ${{ inputs.name }}-comment diff --git a/.github/actions/quic-interop-runner/action.yml b/.github/actions/quic-interop-runner/action.yml index 4d88191646..cdc617d275 100644 --- a/.github/actions/quic-interop-runner/action.yml +++ b/.github/actions/quic-interop-runner/action.yml @@ -93,9 +93,13 @@ runs: run: | echo '[**QUIC Interop Runner**](https://github.com/quic-interop/quic-interop-runner)' >> comment echo '' >> comment - # Ignore all, but table, which starts with "|". - grep -E '^\|' quic-interop-runner/summary >> comment + # Ignore all, but table, which starts with "|". Also reformat it to GitHub Markdown. + grep -E '^\|' quic-interop-runner/summary |\ + awk '(!/^\| *:-/ || (d++ && d < 3))' |\ + sed -E -e 's/✓/:white_check_mark:/gi' -e 's/✕/:x:/gi' -e 's/\?/:grey_question:/gi' \ + >> comment echo '' >> comment + echo "EXPORT_COMMENT=1" >> "$GITHUB_ENV" shell: bash - name: Export PR comment data diff --git a/.github/workflows/qns-comment.yml b/.github/workflows/qns-comment.yml index 71cbcc805b..db9f74f7bf 100644 --- a/.github/workflows/qns-comment.yml +++ b/.github/workflows/qns-comment.yml @@ -18,11 +18,11 @@ jobs: pull-requests: write runs-on: ubuntu-latest if: | - github.event.workflow_run.event == 'pull_request' && - github.event.workflow_run.conclusion == 'failure' + github.event.workflow_run.event == 'pull_request' steps: - uses: actions/checkout@v4 - uses: ./.github/actions/pr-comment with: name: qns + mode: ${{ github.event.workflow_run.conclusion == 'success' && 'delete' || 'upsert' }} token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/qns.yml b/.github/workflows/qns.yml index caadb022df..17cd584a26 100644 --- a/.github/workflows/qns.yml +++ b/.github/workflows/qns.yml @@ -71,6 +71,6 @@ jobs: name: 'neqo-latest' image: ${{ steps.docker_build_and_push.outputs.imageID }} url: https://github.com/mozilla/neqo - test: handshake + test: handshake,keyupdate client: neqo-latest,quic-go,ngtcp2,neqo,msquic server: neqo-latest,quic-go,ngtcp2,neqo,msquic diff --git a/.github/workflows/scorecard.yml b/.github/workflows/scorecard.yml index 01e5fe16a8..651a30be01 100644 --- a/.github/workflows/scorecard.yml +++ b/.github/workflows/scorecard.yml @@ -4,15 +4,16 @@ name: Scorecard supply-chain security on: - # For Branch-Protection check. Only the default branch is supported. See - # https://github.com/ossf/scorecard/blob/main/docs/checks.md#branch-protection - branch_protection_rule: - # To guarantee Maintained check is occasionally updated. See - # https://github.com/ossf/scorecard/blob/main/docs/checks.md#maintained - schedule: - - cron: '26 8 * * 6' - push: - branches: [ "main" ] + workflow_dispatch: +# # For Branch-Protection check. Only the default branch is supported. See +# # https://github.com/ossf/scorecard/blob/main/docs/checks.md#branch-protection +# branch_protection_rule: +# # To guarantee Maintained check is occasionally updated. See +# # https://github.com/ossf/scorecard/blob/main/docs/checks.md#maintained +# schedule: +# - cron: '26 8 * * 6' +# push: +# branches: [ "main" ] # Declare default permissions as read only. permissions: read-all @@ -67,6 +68,6 @@ jobs: # Upload the results to GitHub's code scanning dashboard. - name: "Upload to code-scanning" - uses: github/codeql-action/upload-sarif@1b1aada464948af03b950897e5eb522f92603cc2 # v3.24.9 + uses: github/codeql-action/upload-sarif@4355270be187e1b672a7a1c7c7bae5afdc1ab94a # v3.24.10 with: sarif_file: results.sarif diff --git a/Cargo.toml b/Cargo.toml index 469f6cf1f2..27fe0f9e86 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ resolver = "2" homepage = "https://github.com/mozilla/neqo/" repository = "https://github.com/mozilla/neqo/" authors = ["The Neqo Authors "] -version = "0.7.3" +version = "0.7.4" # Keep in sync with `.rustfmt.toml` `edition`. edition = "2021" license = "MIT OR Apache-2.0" diff --git a/neqo-bin/benches/main.rs b/neqo-bin/benches/main.rs index 6bb8b3161d..59927ebe0c 100644 --- a/neqo-bin/benches/main.rs +++ b/neqo-bin/benches/main.rs @@ -13,8 +13,6 @@ use tokio::runtime::Runtime; struct Benchmark { name: String, requests: Vec, - /// Download resources in series using separate connections. - download_in_series: bool, sample_size: Option, } @@ -27,25 +25,21 @@ fn transfer(c: &mut Criterion) { for Benchmark { name, requests, - download_in_series, sample_size, } in [ Benchmark { name: "1-conn/1-100mb-resp (aka. Download)".to_string(), requests: vec![100 * 1024 * 1024], - download_in_series: false, sample_size: Some(10), }, Benchmark { - name: "1-conn/10_000-1b-seq-resp (aka. RPS)".to_string(), + name: "1-conn/10_000-parallel-1b-resp (aka. RPS)".to_string(), requests: vec![1; 10_000], - download_in_series: false, sample_size: None, }, Benchmark { - name: "100-seq-conn/1-1b-resp (aka. HPS)".to_string(), - requests: vec![1; 100], - download_in_series: true, + name: "1-conn/1-1b-resp (aka. HPS)".to_string(), + requests: vec![1; 1], sample_size: None, }, ] { @@ -61,7 +55,7 @@ fn transfer(c: &mut Criterion) { } group.bench_function("client", |b| { b.to_async(Runtime::new().unwrap()).iter_batched( - || client::client(client::Args::new(&requests, download_in_series)), + || client::client(client::Args::new(&requests)), |client| async move { client.await.unwrap(); }, diff --git a/neqo-bin/src/client/http09.rs b/neqo-bin/src/client/http09.rs index 9bdb6dca85..a9ed12b157 100644 --- a/neqo-bin/src/client/http09.rs +++ b/neqo-bin/src/client/http09.rs @@ -20,12 +20,12 @@ use std::{ use neqo_common::{event::Provider, qdebug, qinfo, qwarn, Datagram}; use neqo_crypto::{AuthenticationStatus, ResumptionToken}; use neqo_transport::{ - Connection, ConnectionEvent, EmptyConnectionIdGenerator, Error, Output, State, StreamId, - StreamType, + Connection, ConnectionError, ConnectionEvent, EmptyConnectionIdGenerator, Error, Output, State, + StreamId, StreamType, }; use url::Url; -use super::{get_output_file, qlog_new, Args, KeyUpdateState, Res}; +use super::{get_output_file, qlog_new, Args, Res}; pub struct Handler<'a> { streams: HashMap>>, @@ -33,7 +33,7 @@ pub struct Handler<'a> { all_paths: Vec, args: &'a Args, token: Option, - key_update: KeyUpdateState, + needs_key_update: bool, } impl<'a> super::Handler for Handler<'a> { @@ -41,6 +41,18 @@ impl<'a> super::Handler for Handler<'a> { fn handle(&mut self, client: &mut Self::Client) -> Res { while let Some(event) = client.next_event() { + if self.needs_key_update { + match client.initiate_key_update() { + Ok(()) => { + qdebug!("Keys updated"); + self.needs_key_update = false; + self.download_urls(client); + } + Err(neqo_transport::Error::KeyUpdateBlocked) => (), + Err(e) => return Err(e.into()), + } + } + match event { ConnectionEvent::AuthenticationNeeded => { client.authenticated(AuthenticationStatus::Ok, Instant::now()); @@ -66,9 +78,6 @@ impl<'a> super::Handler for Handler<'a> { qdebug!("{event:?}"); self.download_urls(client); } - ConnectionEvent::StateChange(State::Confirmed) => { - self.maybe_key_update(client)?; - } ConnectionEvent::ResumptionToken(token) => { self.token = Some(token); } @@ -86,12 +95,6 @@ impl<'a> super::Handler for Handler<'a> { Ok(false) } - fn maybe_key_update(&mut self, c: &mut Self::Client) -> Res<()> { - self.key_update.maybe_update(|| c.initiate_key_update())?; - self.download_urls(c); - Ok(()) - } - fn take_token(&mut self) -> Option { self.token.take() } @@ -138,8 +141,15 @@ pub(crate) fn create_client( } impl super::Client for Connection { - fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { - self.process(dgram, now) + fn process_output(&mut self, now: Instant) -> Output { + self.process_output(now) + } + + fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant) + where + I: IntoIterator, + { + self.process_multiple_input(dgrams, now); } fn close(&mut self, now: Instant, app_error: neqo_transport::AppError, msg: S) @@ -149,8 +159,11 @@ impl super::Client for Connection { self.close(now, app_error, msg); } - fn is_closed(&self) -> bool { - matches!(self.state(), State::Closed(..)) + fn is_closed(&self) -> Option { + if let State::Closed(err) = self.state() { + return Some(err.clone()); + } + None } fn stats(&self) -> neqo_transport::Stats { @@ -159,14 +172,14 @@ impl super::Client for Connection { } impl<'b> Handler<'b> { - pub fn new(url_queue: VecDeque, args: &'b Args, key_update: KeyUpdateState) -> Self { + pub fn new(url_queue: VecDeque, args: &'b Args) -> Self { Self { streams: HashMap::new(), url_queue, all_paths: Vec::new(), args, token: None, - key_update, + needs_key_update: args.key_update, } } @@ -185,7 +198,7 @@ impl<'b> Handler<'b> { } fn download_next(&mut self, client: &mut Connection) -> bool { - if self.key_update.needed() { + if self.needs_key_update { qdebug!("Deferring requests until after first key update"); return false; } diff --git a/neqo-bin/src/client/http3.rs b/neqo-bin/src/client/http3.rs index c88a8448f6..b3f577127e 100644 --- a/neqo-bin/src/client/http3.rs +++ b/neqo-bin/src/client/http3.rs @@ -22,11 +22,12 @@ use neqo_common::{event::Provider, hex, qdebug, qinfo, qwarn, Datagram, Header}; use neqo_crypto::{AuthenticationStatus, ResumptionToken}; use neqo_http3::{Error, Http3Client, Http3ClientEvent, Http3Parameters, Http3State, Priority}; use neqo_transport::{ - AppError, Connection, EmptyConnectionIdGenerator, Error as TransportError, Output, StreamId, + AppError, Connection, ConnectionError, EmptyConnectionIdGenerator, Error as TransportError, + Output, StreamId, }; use url::Url; -use super::{get_output_file, qlog_new, Args, KeyUpdateState, Res}; +use super::{get_output_file, qlog_new, Args, Res}; pub(crate) struct Handler<'a> { #[allow( @@ -35,17 +36,12 @@ pub(crate) struct Handler<'a> { clippy::redundant_field_names )] url_handler: UrlHandler<'a>, - key_update: KeyUpdateState, token: Option, output_read_data: bool, } impl<'a> Handler<'a> { - pub(crate) fn new( - url_queue: VecDeque, - args: &'a Args, - key_update: KeyUpdateState, - ) -> Self { + pub(crate) fn new(url_queue: VecDeque, args: &'a Args) -> Self { let url_handler = UrlHandler { url_queue, stream_handlers: HashMap::new(), @@ -60,7 +56,6 @@ impl<'a> Handler<'a> { Self { url_handler, - key_update, token: None, output_read_data: args.output_read_data, } @@ -111,12 +106,22 @@ pub(crate) fn create_client( } impl super::Client for Http3Client { - fn is_closed(&self) -> bool { - matches!(self.state(), Http3State::Closed(..)) + fn is_closed(&self) -> Option { + if let Http3State::Closed(err) = self.state() { + return Some(err); + } + None } - fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { - self.process(dgram, now) + fn process_output(&mut self, now: Instant) -> Output { + self.process_output(now) + } + + fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant) + where + I: IntoIterator, + { + self.process_multiple_input(dgrams, now); } fn close(&mut self, now: Instant, app_error: AppError, msg: S) @@ -214,12 +219,6 @@ impl<'a> super::Handler for Handler<'a> { Ok(self.url_handler.done()) } - fn maybe_key_update(&mut self, c: &mut Http3Client) -> Res<()> { - self.key_update.maybe_update(|| c.initiate_key_update())?; - self.url_handler.process_urls(c); - Ok(()) - } - fn take_token(&mut self) -> Option { self.token.take() } diff --git a/neqo-bin/src/client/mod.rs b/neqo-bin/src/client/mod.rs index 81721802e1..61e43c00d1 100644 --- a/neqo-bin/src/client/mod.rs +++ b/neqo-bin/src/client/mod.rs @@ -27,7 +27,7 @@ use neqo_crypto::{ init, Cipher, ResumptionToken, }; use neqo_http3::Output; -use neqo_transport::{AppError, ConnectionId, Error as TransportError, Version}; +use neqo_transport::{AppError, ConnectionError, ConnectionId, Error as TransportError, Version}; use qlog::{events::EventImportance, streamer::QlogStreamer}; use tokio::time::Sleep; use url::{Origin, Url}; @@ -46,6 +46,7 @@ pub enum Error { IoError(io::Error), QlogError, TransportError(neqo_transport::Error), + ApplicationError(neqo_transport::AppError), CryptoError(neqo_crypto::Error), } @@ -79,6 +80,15 @@ impl From for Error { } } +impl From for Error { + fn from(err: neqo_transport::ConnectionError) -> Self { + match err { + ConnectionError::Transport(e) => Self::TransportError(e), + ConnectionError::Application(e) => Self::ApplicationError(e), + } + } +} + impl Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "Error: {self:?}")?; @@ -90,39 +100,6 @@ impl std::error::Error for Error {} type Res = Result; -/// Track whether a key update is needed. -#[derive(Debug, PartialEq, Eq)] -struct KeyUpdateState(bool); - -impl KeyUpdateState { - pub fn maybe_update(&mut self, update_fn: F) -> Res<()> - where - F: FnOnce() -> Result<(), E>, - E: Into, - { - if self.0 { - if let Err(e) = update_fn() { - let e = e.into(); - match e { - Error::TransportError(TransportError::KeyUpdateBlocked) - | Error::Http3Error(neqo_http3::Error::TransportError( - TransportError::KeyUpdateBlocked, - )) => (), - _ => return Err(e), - } - } else { - qerror!("Keys updated"); - self.0 = false; - } - } - Ok(()) - } - - fn needed(&self) -> bool { - self.0 - } -} - #[derive(Debug, Parser)] #[command(author, version, about, long_about = None)] #[allow(clippy::struct_excessive_bools)] // Not a good use of that lint. @@ -166,7 +143,7 @@ pub struct Args { /// Use this for 0-RTT: the stack always attempts 0-RTT on resumption. resume: bool, - #[arg(name = "key-update", long)] + #[arg(name = "key-update", long, hide = true)] /// Attempt to initiate a key update immediately after confirming the connection. key_update: bool, @@ -200,7 +177,7 @@ impl Args { #[must_use] #[cfg(feature = "bench")] #[allow(clippy::missing_panics_doc)] - pub fn new(requests: &[u64], download_in_series: bool) -> Self { + pub fn new(requests: &[u64]) -> Self { use std::str::FromStr; Self { verbose: clap_verbosity_flag::Verbosity::::default(), @@ -212,7 +189,7 @@ impl Args { method: "GET".into(), header: vec![], max_concurrent_push_streams: 10, - download_in_series, + download_in_series: false, concurrency: 100, output_read_data: false, output_dir: Some("/dev/null".into()), @@ -245,6 +222,11 @@ impl Args { return; }; + if self.key_update { + qerror!("internal option key_update set by user"); + exit(127) + } + // Only use v1 for most QNS tests. self.shared.quic_parameters.quic_version = vec![Version::Version1]; match testcase.as_str() { @@ -360,18 +342,24 @@ trait Handler { type Client: Client; fn handle(&mut self, client: &mut Self::Client) -> Res; - fn maybe_key_update(&mut self, c: &mut Self::Client) -> Res<()>; fn take_token(&mut self) -> Option; fn has_token(&self) -> bool; } /// Network client, e.g. [`neqo_transport::Connection`] or [`neqo_http3::Http3Client`]. trait Client { - fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output; + fn process_output(&mut self, now: Instant) -> Output; + fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant) + where + I: IntoIterator; fn close(&mut self, now: Instant, app_error: AppError, msg: S) where S: AsRef + Display; - fn is_closed(&self) -> bool; + /// Returns [`Some(_)`] if the connection is closed. + /// + /// Note that connection was closed without error on + /// [`Some(ConnectionError::Transport(TransportError::NoError))`]. + fn is_closed(&self) -> Option; fn stats(&self) -> neqo_transport::Stats; } @@ -390,40 +378,35 @@ impl<'a, H: Handler> Runner<'a, H> { let handler_done = self.handler.handle(&mut self.client)?; match (handler_done, self.args.resume, self.handler.has_token()) { - // Handler isn't done. Continue. - (false, _, _) => {}, - // Handler done. Resumption token needed but not present. Continue. - (true, true, false) => { - qdebug!("Handler done. Waiting for resumption token."); - } - // Handler is done, no resumption token needed. Close. - (true, false, _) | - // Handler is done, resumption token needed and present. Close. - (true, true, true) => { - self.client.close(Instant::now(), 0, "kthxbye!"); - } + // Handler isn't done. Continue. + (false, _, _) => {}, + // Handler done. Resumption token needed but not present. Continue. + (true, true, false) => { + qdebug!("Handler done. Waiting for resumption token."); } + // Handler is done, no resumption token needed. Close. + (true, false, _) | + // Handler is done, resumption token needed and present. Close. + (true, true, true) => { + self.client.close(Instant::now(), 0, "kthxbye!"); + } + } - self.process(None).await?; + self.process_output().await?; - if self.client.is_closed() { + if let Some(reason) = self.client.is_closed() { if self.args.stats { qinfo!("{:?}", self.client.stats()); } - return Ok(self.handler.take_token()); + return match reason { + ConnectionError::Transport(TransportError::NoError) + | ConnectionError::Application(0) => Ok(self.handler.take_token()), + _ => Err(reason.into()), + }; } match ready(self.socket, self.timeout.as_mut()).await? { - Ready::Socket => loop { - let dgrams = self.socket.recv(&self.local_addr)?; - if dgrams.is_empty() { - break; - } - for dgram in &dgrams { - self.process(Some(dgram)).await?; - } - self.handler.maybe_key_update(&mut self.client)?; - }, + Ready::Socket => self.process_multiple_input().await?, Ready::Timeout => { self.timeout = None; } @@ -431,9 +414,9 @@ impl<'a, H: Handler> Runner<'a, H> { } } - async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> { + async fn process_output(&mut self) -> Result<(), io::Error> { loop { - match self.client.process(dgram.take(), Instant::now()) { + match self.client.process_output(Instant::now()) { Output::Datagram(dgram) => { self.socket.writable().await?; self.socket.send(dgram)?; @@ -452,6 +435,20 @@ impl<'a, H: Handler> Runner<'a, H> { Ok(()) } + + async fn process_multiple_input(&mut self) -> Res<()> { + loop { + let dgrams = self.socket.recv(&self.local_addr)?; + if dgrams.is_empty() { + break; + } + self.client + .process_multiple_input(dgrams.iter(), Instant::now()); + self.process_output().await?; + } + + Ok(()) + } } fn qlog_new(args: &Args, hostname: &str, cid: &ConnectionId) -> Res { @@ -551,14 +548,12 @@ pub async fn client(mut args: Args) -> Res<()> { first = false; - let key_update = KeyUpdateState(args.key_update); - token = if args.shared.use_old_http { let client = http09::create_client(&args, real_local, remote_addr, &hostname, token) .expect("failed to create client"); - let handler = http09::Handler::new(to_request, &args, key_update); + let handler = http09::Handler::new(to_request, &args); Runner { args: &args, @@ -574,7 +569,7 @@ pub async fn client(mut args: Args) -> Res<()> { let client = http3::create_client(&args, real_local, remote_addr, &hostname, token) .expect("failed to create client"); - let handler = http3::Handler::new(to_request, &args, key_update); + let handler = http3::Handler::new(to_request, &args); Runner { args: &args, diff --git a/neqo-bin/src/server/mod.rs b/neqo-bin/src/server/mod.rs index e3067ecdf0..3490b3e9b3 100644 --- a/neqo-bin/src/server/mod.rs +++ b/neqo-bin/src/server/mod.rs @@ -10,8 +10,7 @@ use std::{ cmp::min, collections::HashMap, fmt::{self, Display}, - fs::OpenOptions, - io::{self, Read}, + fs, io, net::{SocketAddr, ToSocketAddrs}, path::PathBuf, pin::Pin, @@ -188,28 +187,11 @@ impl Args { } } -fn qns_read_response(filename: &str) -> Option> { - let mut file_path = PathBuf::from("/www"); - file_path.push(filename.trim_matches(|p| p == '/')); - - OpenOptions::new() - .read(true) - .open(&file_path) - .map_err(|_e| qerror!("Could not open {}", file_path.display())) - .ok() - .and_then(|mut f| { - let mut data = Vec::new(); - match f.read_to_end(&mut data) { - Ok(sz) => { - qinfo!("{} bytes read from {}", sz, file_path.display()); - Some(data) - } - Err(e) => { - qerror!("Error reading data: {e:?}"); - None - } - } - }) +fn qns_read_response(filename: &str) -> Result, io::Error> { + let path: PathBuf = ["/www", filename.trim_matches(|p| p == '/')] + .iter() + .collect(); + fs::read(path) } trait HttpServer: Display { @@ -344,27 +326,32 @@ impl HttpServer for SimpleServer { continue; } - let mut response = - if let Some(path) = headers.iter().find(|&h| h.name() == ":path") { - if args.shared.qns_test.is_some() { - if let Some(data) = qns_read_response(path.value()) { - ResponseData::from(data) - } else { - ResponseData::from(Self::MESSAGE) - } - } else if let Ok(count) = - path.value().trim_matches(|p| p == '/').parse::() - { - ResponseData::repeat(Self::MESSAGE, count) - } else { - ResponseData::from(Self::MESSAGE) + let Some(path) = headers.iter().find(|&h| h.name() == ":path") else { + stream + .cancel_fetch(neqo_http3::Error::HttpRequestIncomplete.code()) + .unwrap(); + continue; + }; + + let mut response = if args.shared.qns_test.is_some() { + match qns_read_response(path.value()) { + Ok(data) => ResponseData::from(data), + Err(e) => { + qerror!("Failed to read {}: {e}", path.value()); + stream + .send_headers(&[Header::new(":status", "404")]) + .unwrap(); + stream.stream_close_send().unwrap(); + continue; } - } else { - stream - .cancel_fetch(neqo_http3::Error::HttpRequestIncomplete.code()) - .unwrap(); - continue; - }; + } + } else if let Ok(count) = + path.value().trim_matches(|p| p == '/').parse::() + { + ResponseData::repeat(Self::MESSAGE, count) + } else { + ResponseData::from(Self::MESSAGE) + }; stream .send_headers(&[ diff --git a/neqo-bin/src/server/old_https.rs b/neqo-bin/src/server/old_https.rs index 505a16578f..38f3fdc3a7 100644 --- a/neqo-bin/src/server/old_https.rs +++ b/neqo-bin/src/server/old_https.rs @@ -8,7 +8,7 @@ use std::{ cell::RefCell, collections::HashMap, fmt::Display, path::PathBuf, rc::Rc, time::Instant, }; -use neqo_common::{event::Provider, hex, qdebug, qinfo, qwarn, Datagram}; +use neqo_common::{event::Provider, hex, qdebug, qerror, qinfo, qwarn, Datagram}; use neqo_crypto::{generate_ech_keys, random, AllowZeroRtt, AntiReplay, Cipher}; use neqo_http3::Error; use neqo_transport::{ @@ -142,20 +142,25 @@ impl Http09Server { Regex::new(r"GET +/(\d+)(?:\r)?\n").unwrap() }; let m = re.captures(msg); - let resp = match m.and_then(|m| m.get(1)) { - None => { - self.save_partial(stream_id, buf, conn); - return; - } - Some(path) => { - let path = path.as_str(); - qdebug!("Path = '{path}'"); - if args.shared.qns_test.is_some() { - qns_read_response(path) - } else { - let count = path.parse().unwrap(); - Some(vec![b'a'; count]) + let Some(path) = m.and_then(|m| m.get(1)) else { + self.save_partial(stream_id, buf, conn); + return; + }; + + let resp = { + let path = path.as_str(); + qdebug!("Path = '{path}'"); + if args.shared.qns_test.is_some() { + match qns_read_response(path) { + Ok(data) => Some(data), + Err(e) => { + qerror!("Failed to read {path}: {e}"); + Some(b"404".to_vec()) + } } + } else { + let count = path.parse().unwrap(); + Some(vec![b'a'; count]) } }; self.write(stream_id, resp, conn); diff --git a/neqo-common/Cargo.toml b/neqo-common/Cargo.toml index abc79b2026..6f43ca1dfd 100644 --- a/neqo-common/Cargo.toml +++ b/neqo-common/Cargo.toml @@ -36,3 +36,7 @@ features = ["timeapi"] [lib] # See https://github.com/bheisler/criterion.rs/blob/master/book/src/faq.md#cargo-bench-gives-unrecognized-option-errors-for-valid-command-line-options bench = false + +[[bench]] +name = "timer" +harness = false diff --git a/neqo-common/benches/timer.rs b/neqo-common/benches/timer.rs new file mode 100644 index 0000000000..5ac8019db4 --- /dev/null +++ b/neqo-common/benches/timer.rs @@ -0,0 +1,39 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::time::{Duration, Instant}; + +use criterion::{criterion_group, criterion_main, Criterion}; +use neqo_common::timer::Timer; +use test_fixture::now; + +fn benchmark_timer(c: &mut Criterion) { + c.bench_function("drain a timer quickly", |b| { + b.iter_batched_ref( + make_timer, + |(_now, timer)| { + while let Some(t) = timer.next_time() { + assert!(timer.take_next(t).is_some()); + } + }, + criterion::BatchSize::SmallInput, + ); + }); +} + +fn make_timer() -> (Instant, Timer<()>) { + const TIMES: &[u64] = &[1, 2, 3, 5, 8, 13, 21, 34]; + + let now = now(); + let mut timer = Timer::new(now, Duration::from_millis(777), 100); + for &t in TIMES { + timer.add(now + Duration::from_secs(t), ()); + } + (now, timer) +} + +criterion_group!(benches, benchmark_timer); +criterion_main!(benches); diff --git a/neqo-common/src/lib.rs b/neqo-common/src/lib.rs index 54ac38cda1..fbaf966eea 100644 --- a/neqo-common/src/lib.rs +++ b/neqo-common/src/lib.rs @@ -16,6 +16,7 @@ pub mod hrtime; mod incrdecoder; pub mod log; pub mod qlog; +pub mod timer; pub mod tos; use std::fmt::Write; diff --git a/neqo-common/src/timer.rs b/neqo-common/src/timer.rs new file mode 100644 index 0000000000..3feddb2226 --- /dev/null +++ b/neqo-common/src/timer.rs @@ -0,0 +1,420 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::{ + collections::VecDeque, + mem, + time::{Duration, Instant}, +}; + +/// Internal structure for a timer item. +struct TimerItem { + time: Instant, + item: T, +} + +impl TimerItem { + fn time(ti: &Self) -> Instant { + ti.time + } +} + +/// A timer queue. +/// This uses a classic timer wheel arrangement, with some characteristics that might be considered +/// peculiar. Each slot in the wheel is sorted (complexity O(N) insertions, but O(logN) to find cut +/// points). Time is relative, the wheel has an origin time and it is unable to represent times that +/// are more than `granularity * capacity` past that time. +pub struct Timer { + items: Vec>>, + now: Instant, + granularity: Duration, + cursor: usize, +} + +impl Timer { + /// Construct a new wheel at the given granularity, starting at the given time. + /// + /// # Panics + /// + /// When `capacity` is too large to fit in `u32` or `granularity` is zero. + pub fn new(now: Instant, granularity: Duration, capacity: usize) -> Self { + assert!(u32::try_from(capacity).is_ok()); + assert!(granularity.as_nanos() > 0); + let mut items = Vec::with_capacity(capacity); + items.resize_with(capacity, Default::default); + Self { + items, + now, + granularity, + cursor: 0, + } + } + + /// Return a reference to the time of the next entry. + #[must_use] + pub fn next_time(&self) -> Option { + let idx = self.bucket(0); + for i in idx..self.items.len() { + if let Some(t) = self.items[i].front() { + return Some(t.time); + } + } + for i in 0..idx { + if let Some(t) = self.items[i].front() { + return Some(t.time); + } + } + None + } + + /// Get the full span of time that this can cover. + /// Two timers cannot be more than this far apart. + /// In practice, this value is less by one amount of the timer granularity. + #[inline] + #[allow(clippy::cast_possible_truncation)] // guarded by assertion + #[must_use] + pub fn span(&self) -> Duration { + self.granularity * (self.items.len() as u32) + } + + /// For the given `time`, get the number of whole buckets in the future that is. + #[inline] + #[allow(clippy::cast_possible_truncation)] // guarded by assertion + fn delta(&self, time: Instant) -> usize { + // This really should use Duration::div_duration_f??(), but it can't yet. + ((time - self.now).as_nanos() / self.granularity.as_nanos()) as usize + } + + #[inline] + fn time_bucket(&self, time: Instant) -> usize { + self.bucket(self.delta(time)) + } + + #[inline] + fn bucket(&self, delta: usize) -> usize { + debug_assert!(delta < self.items.len()); + (self.cursor + delta) % self.items.len() + } + + /// Slide forward in time by `n * self.granularity`. + #[allow(clippy::cast_possible_truncation, clippy::reversed_empty_ranges)] + // cast_possible_truncation is ok because we have an assertion guard. + // reversed_empty_ranges is to avoid different types on the if/else. + fn tick(&mut self, n: usize) { + let new = self.bucket(n); + let iter = if new < self.cursor { + (self.cursor..self.items.len()).chain(0..new) + } else { + (self.cursor..new).chain(0..0) + }; + for i in iter { + assert!(self.items[i].is_empty()); + } + self.now += self.granularity * (n as u32); + self.cursor = new; + } + + /// Asserts if the time given is in the past or too far in the future. + /// + /// # Panics + /// + /// When `time` is in the past relative to previous calls. + pub fn add(&mut self, time: Instant, item: T) { + assert!(time >= self.now); + // Skip forward quickly if there is too large a gap. + let short_span = self.span() - self.granularity; + if time >= (self.now + self.span() + short_span) { + // Assert that there aren't any items. + for i in &self.items { + debug_assert!(i.is_empty()); + } + self.now = time.checked_sub(short_span).unwrap(); + self.cursor = 0; + } + + // Adjust time forward the minimum amount necessary. + let mut d = self.delta(time); + if d >= self.items.len() { + self.tick(1 + d - self.items.len()); + d = self.items.len() - 1; + } + + let bucket = self.bucket(d); + let ins = match self.items[bucket].binary_search_by_key(&time, TimerItem::time) { + Ok(j) | Err(j) => j, + }; + self.items[bucket].insert(ins, TimerItem { time, item }); + } + + /// Given knowledge of the time an item was added, remove it. + /// This requires use of a predicate that identifies matching items. + /// + /// # Panics + /// Impossible, I think. + pub fn remove(&mut self, time: Instant, mut selector: F) -> Option + where + F: FnMut(&T) -> bool, + { + if time < self.now { + return None; + } + if time > self.now + self.span() { + return None; + } + let bucket = self.time_bucket(time); + let Ok(start_index) = self.items[bucket].binary_search_by_key(&time, TimerItem::time) + else { + return None; + }; + // start_index is just one of potentially many items with the same time. + // Search backwards for a match, ... + for i in (0..=start_index).rev() { + if self.items[bucket][i].time != time { + break; + } + if selector(&self.items[bucket][i].item) { + return Some(self.items[bucket].remove(i).unwrap().item); + } + } + // ... then forwards. + for i in (start_index + 1)..self.items[bucket].len() { + if self.items[bucket][i].time != time { + break; + } + if selector(&self.items[bucket][i].item) { + return Some(self.items[bucket].remove(i).unwrap().item); + } + } + None + } + + /// Take the next item, unless there are no items with + /// a timeout in the past relative to `until`. + pub fn take_next(&mut self, until: Instant) -> Option { + fn maybe_take(v: &mut VecDeque>, until: Instant) -> Option { + if !v.is_empty() && v[0].time <= until { + Some(v.pop_front().unwrap().item) + } else { + None + } + } + + let idx = self.bucket(0); + for i in idx..self.items.len() { + let res = maybe_take(&mut self.items[i], until); + if res.is_some() { + return res; + } + } + for i in 0..idx { + let res = maybe_take(&mut self.items[i], until); + if res.is_some() { + return res; + } + } + None + } + + /// Create an iterator that takes all items until the given time. + /// Note: Items might be removed even if the iterator is not fully exhausted. + pub fn take_until(&mut self, until: Instant) -> impl Iterator { + let get_item = move |x: TimerItem| x.item; + if until >= self.now + self.span() { + // Drain everything, so a clean sweep. + let mut empty_items = Vec::with_capacity(self.items.len()); + empty_items.resize_with(self.items.len(), VecDeque::default); + let mut items = mem::replace(&mut self.items, empty_items); + self.now = until; + self.cursor = 0; + + let tail = items.split_off(self.cursor); + return tail.into_iter().chain(items).flatten().map(get_item); + } + + // Only returning a partial span, so do it bucket at a time. + let delta = self.delta(until); + let mut buckets = Vec::with_capacity(delta + 1); + + // First, the whole buckets. + for i in 0..delta { + let idx = self.bucket(i); + buckets.push(mem::take(&mut self.items[idx])); + } + self.tick(delta); + + // Now we need to split the last bucket, because there might be + // some items with `item.time > until`. + let bucket = &mut self.items[self.cursor]; + let last_idx = match bucket.binary_search_by_key(&until, TimerItem::time) { + Ok(mut m) => { + // If there are multiple values, the search will hit any of them. + // Make sure to get them all. + while m < bucket.len() && bucket[m].time == until { + m += 1; + } + m + } + Err(ins) => ins, + }; + let tail = bucket.split_off(last_idx); + buckets.push(mem::replace(bucket, tail)); + // This tomfoolery with the empty vector ensures that + // the returned type here matches the one above precisely + // without having to invoke the `either` crate. + buckets.into_iter().chain(vec![]).flatten().map(get_item) + } +} + +#[cfg(test)] +mod test { + use std::sync::OnceLock; + + use super::{Duration, Instant, Timer}; + + fn now() -> Instant { + static NOW: OnceLock = OnceLock::new(); + *NOW.get_or_init(Instant::now) + } + + const GRANULARITY: Duration = Duration::from_millis(10); + const CAPACITY: usize = 10; + #[test] + fn create() { + let t: Timer<()> = Timer::new(now(), GRANULARITY, CAPACITY); + assert_eq!(t.span(), Duration::from_millis(100)); + assert_eq!(None, t.next_time()); + } + + #[test] + fn immediate_entry() { + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + t.add(now(), 12); + assert_eq!(now(), t.next_time().expect("should have an entry")); + let values: Vec<_> = t.take_until(now()).collect(); + assert_eq!(vec![12], values); + } + + #[test] + fn same_time() { + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + let v1 = 12; + let v2 = 13; + t.add(now(), v1); + t.add(now(), v2); + assert_eq!(now(), t.next_time().expect("should have an entry")); + let values: Vec<_> = t.take_until(now()).collect(); + assert!(values.contains(&v1)); + assert!(values.contains(&v2)); + } + + #[test] + fn add() { + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + let near_future = now() + Duration::from_millis(17); + let v = 9; + t.add(near_future, v); + assert_eq!(near_future, t.next_time().expect("should return a value")); + assert_eq!( + t.take_until(near_future.checked_sub(Duration::from_millis(1)).unwrap()) + .count(), + 0 + ); + assert!(t + .take_until(near_future + Duration::from_millis(1)) + .any(|x| x == v)); + } + + #[test] + fn add_future() { + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + let future = now() + Duration::from_millis(117); + let v = 9; + t.add(future, v); + assert_eq!(future, t.next_time().expect("should return a value")); + assert!(t.take_until(future).any(|x| x == v)); + } + + #[test] + fn add_far_future() { + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + let far_future = now() + Duration::from_millis(892); + let v = 9; + t.add(far_future, v); + assert_eq!(far_future, t.next_time().expect("should return a value")); + assert!(t.take_until(far_future).any(|x| x == v)); + } + + const TIMES: &[Duration] = &[ + Duration::from_millis(40), + Duration::from_millis(91), + Duration::from_millis(6), + Duration::from_millis(3), + Duration::from_millis(22), + Duration::from_millis(40), + ]; + + fn with_times() -> Timer { + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + for (i, time) in TIMES.iter().enumerate() { + t.add(now() + *time, i); + } + assert_eq!( + now() + *TIMES.iter().min().unwrap(), + t.next_time().expect("should have a time") + ); + t + } + + #[test] + #[allow(clippy::needless_collect)] // false positive + fn multiple_values() { + let mut t = with_times(); + let values: Vec<_> = t.take_until(now() + *TIMES.iter().max().unwrap()).collect(); + for i in 0..TIMES.len() { + assert!(values.contains(&i)); + } + } + + #[test] + #[allow(clippy::needless_collect)] // false positive + fn take_far_future() { + let mut t = with_times(); + let values: Vec<_> = t.take_until(now() + Duration::from_secs(100)).collect(); + for i in 0..TIMES.len() { + assert!(values.contains(&i)); + } + } + + #[test] + fn remove_each() { + let mut t = with_times(); + for (i, time) in TIMES.iter().enumerate() { + assert_eq!(Some(i), t.remove(now() + *time, |&x| x == i)); + } + assert_eq!(None, t.next_time()); + } + + #[test] + fn remove_future() { + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + let future = now() + Duration::from_millis(117); + let v = 9; + t.add(future, v); + + assert_eq!(Some(v), t.remove(future, |candidate| *candidate == v)); + } + + #[test] + fn remove_too_far_future() { + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + let future = now() + Duration::from_millis(117); + let too_far_future = now() + t.span() + Duration::from_millis(117); + let v = 9; + t.add(future, v); + + assert_eq!(None, t.remove(too_far_future, |candidate| *candidate == v)); + } +} diff --git a/neqo-crypto/src/time.rs b/neqo-crypto/src/time.rs index 0e59c4f5e2..359436a854 100644 --- a/neqo-crypto/src/time.rs +++ b/neqo-crypto/src/time.rs @@ -258,11 +258,11 @@ mod test { #[test] // We allow replace_consts here because - // std::u64::max_value() isn't available + // std::u64::MAX isn't available // in all of our targets fn overflow_interval() { init(); - let interval = Interval::from(Duration::from_micros(u64::max_value())); + let interval = Interval::from(Duration::from_micros(u64::MAX)); let res: Res = interval.try_into(); assert!(res.is_err()); } diff --git a/neqo-http3/src/connection.rs b/neqo-http3/src/connection.rs index cfa78df787..dd45797baa 100644 --- a/neqo-http3/src/connection.rs +++ b/neqo-http3/src/connection.rs @@ -386,8 +386,8 @@ impl Http3Connection { Ok(()) } - /// Inform a `HttpConnection` that a stream has data to send and that `send` should be called - /// for the stream. + /// Inform an [`Http3Connection`] that a stream has data to send and that + /// [`SendStream::send`] should be called for the stream. pub fn stream_has_pending_data(&mut self, stream_id: StreamId) { self.streams_with_pending_data.insert(stream_id); } diff --git a/neqo-http3/src/connection_client.rs b/neqo-http3/src/connection_client.rs index be20126353..4c8772d14a 100644 --- a/neqo-http3/src/connection_client.rs +++ b/neqo-http3/src/connection_client.rs @@ -880,11 +880,10 @@ impl Http3Client { pub fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant) where I: IntoIterator, - I::IntoIter: ExactSizeIterator, { - let dgrams = dgrams.into_iter(); - qtrace!([self], "Process multiple datagrams, len={}", dgrams.len()); - if dgrams.len() == 0 { + let mut dgrams = dgrams.into_iter().peekable(); + qtrace!([self], "Process multiple datagrams"); + if dgrams.peek().is_none() { return; } self.conn.process_multiple_input(dgrams, now); diff --git a/neqo-http3/src/connection_server.rs b/neqo-http3/src/connection_server.rs index dcf759f177..cc887a26fc 100644 --- a/neqo-http3/src/connection_server.rs +++ b/neqo-http3/src/connection_server.rs @@ -64,13 +64,17 @@ impl Http3ServerHandler { data: &[u8], conn: &mut Connection, ) -> Res { - self.base_handler.stream_has_pending_data(stream_id); - self.needs_processing = true; - self.base_handler + let n = self + .base_handler .send_streams .get_mut(&stream_id) .ok_or(Error::InvalidStreamId)? - .send_data(conn, data) + .send_data(conn, data)?; + if n > 0 { + self.base_handler.stream_has_pending_data(stream_id); + } + self.needs_processing = true; + Ok(n) } /// Supply response heeaders for a request. @@ -100,7 +104,6 @@ impl Http3ServerHandler { pub fn stream_close_send(&mut self, stream_id: StreamId, conn: &mut Connection) -> Res<()> { qdebug!([self], "Close sending side stream={}.", stream_id); self.base_handler.stream_close_send(conn, stream_id)?; - self.base_handler.stream_has_pending_data(stream_id); self.needs_processing = true; Ok(()) } diff --git a/neqo-qpack/src/table.rs b/neqo-qpack/src/table.rs index 517e98db09..d5275ec98f 100644 --- a/neqo-qpack/src/table.rs +++ b/neqo-qpack/src/table.rs @@ -94,7 +94,7 @@ impl HeaderTable { capacity: 0, used: 0, base: 0, - acked_inserts_cnt: if encoder { 0 } else { u64::max_value() }, + acked_inserts_cnt: if encoder { 0 } else { u64::MAX }, } } diff --git a/neqo-transport/src/connection/mod.rs b/neqo-transport/src/connection/mod.rs index 7f275c8ab1..7fa8416b4e 100644 --- a/neqo-transport/src/connection/mod.rs +++ b/neqo-transport/src/connection/mod.rs @@ -985,10 +985,9 @@ impl Connection { pub fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant) where I: IntoIterator, - I::IntoIter: ExactSizeIterator, { - let dgrams = dgrams.into_iter(); - if dgrams.len() == 0 { + let mut dgrams = dgrams.into_iter().peekable(); + if dgrams.peek().is_none() { return; } @@ -1839,7 +1838,7 @@ impl Connection { | State::Connected | State::Confirmed => { if let Some(path) = self.paths.select_path() { - let res = self.output_path(&path, now); + let res = self.output_path(&path, now, &None); self.capture_error(Some(path), now, 0, res) } else { Ok(SendOption::default()) @@ -1848,7 +1847,16 @@ impl Connection { State::Closing { .. } | State::Draining { .. } | State::Closed(_) => { if let Some(details) = self.state_signaling.close_frame() { let path = Rc::clone(details.path()); - let res = self.output_close(&details); + // In some error cases, we will not be able to make a new, permanent path. + // For example, if we run out of connection IDs and the error results from + // a packet on a new path, we avoid sending (and the privacy risk) rather + // than reuse a connection ID. + let res = if path.borrow().is_temporary() { + assert!(!cfg!(test), "attempting to close with a temporary path"); + Err(Error::InternalError) + } else { + self.output_path(&path, now, &Some(details)) + }; self.capture_error(Some(path), now, 0, res) } else { Ok(SendOption::default()) @@ -1925,62 +1933,6 @@ impl Connection { } } - fn output_close(&mut self, close: &ClosingFrame) -> Res { - let mut encoder = Encoder::with_capacity(256); - let grease_quic_bit = self.can_grease_quic_bit(); - let version = self.version(); - for space in PacketNumberSpace::iter() { - let Some((cspace, tx)) = self.crypto.states.select_tx_mut(self.version, *space) else { - continue; - }; - - let path = close.path().borrow(); - // In some error cases, we will not be able to make a new, permanent path. - // For example, if we run out of connection IDs and the error results from - // a packet on a new path, we avoid sending (and the privacy risk) rather - // than reuse a connection ID. - if path.is_temporary() { - assert!(!cfg!(test), "attempting to close with a temporary path"); - return Err(Error::InternalError); - } - let (_, mut builder) = Self::build_packet_header( - &path, - cspace, - encoder, - tx, - &AddressValidationInfo::None, - version, - grease_quic_bit, - ); - _ = Self::add_packet_number( - &mut builder, - tx, - self.loss_recovery.largest_acknowledged_pn(*space), - ); - // The builder will set the limit to 0 if there isn't enough space for the header. - if builder.is_full() { - encoder = builder.abort(); - break; - } - builder.set_limit(min(path.amplification_limit(), path.mtu()) - tx.expansion()); - debug_assert!(builder.limit() <= 2048); - - // ConnectionError::Application is only allowed at 1RTT. - let sanitized = if *space == PacketNumberSpace::ApplicationData { - None - } else { - close.sanitize() - }; - sanitized - .as_ref() - .unwrap_or(close) - .write_frame(&mut builder); - encoder = builder.build(tx)?; - } - - Ok(SendOption::Yes(close.path().borrow().datagram(encoder))) - } - /// Write the frames that are exchanged in the application data space. /// The order of calls here determines the relative priority of frames. fn write_appdata_frames( @@ -2201,7 +2153,12 @@ impl Connection { /// Build a datagram, possibly from multiple packets (for different PN /// spaces) and each containing 1+ frames. #[allow(clippy::too_many_lines)] // Yeah, that's just the way it is. - fn output_path(&mut self, path: &PathRef, now: Instant) -> Res { + fn output_path( + &mut self, + path: &PathRef, + now: Instant, + closing_frame: &Option, + ) -> Res { let mut initial_sent = None; let mut needs_padding = false; let grease_quic_bit = self.can_grease_quic_bit(); @@ -2254,8 +2211,23 @@ impl Connection { // Add frames to the packet. let payload_start = builder.len(); - let (tokens, ack_eliciting, padded) = - self.write_frames(path, *space, &profile, &mut builder, now); + let (mut tokens, mut ack_eliciting, mut padded) = (Vec::new(), false, false); + if let Some(ref close) = closing_frame { + // ConnectionError::Application is only allowed at 1RTT. + let sanitized = if *space == PacketNumberSpace::ApplicationData { + None + } else { + close.sanitize() + }; + sanitized + .as_ref() + .unwrap_or(close) + .write_frame(&mut builder); + self.stats.borrow_mut().frame_tx.connection_close += 1; + } else { + (tokens, ack_eliciting, padded) = + self.write_frames(path, *space, &profile, &mut builder, now); + } if builder.packet_empty() { // Nothing to include in this packet. encoder = builder.abort(); @@ -2336,6 +2308,8 @@ impl Connection { mtu ); initial.size += mtu - packets.len(); + // These zeros aren't padding frames, they are an invalid all-zero coalesced + // packet, which is why we don't increase `frame_tx.padding` count here. packets.resize(mtu, 0); } self.loss_recovery.on_packet_sent(path, initial); diff --git a/neqo-transport/src/crypto.rs b/neqo-transport/src/crypto.rs index 54bfe622cf..60d056f2d2 100644 --- a/neqo-transport/src/crypto.rs +++ b/neqo-transport/src/crypto.rs @@ -758,7 +758,7 @@ impl CryptoDxAppData { } pub fn next(&self) -> Res { - if self.dx.epoch == usize::max_value() { + if self.dx.epoch == usize::MAX { // Guard against too many key updates. return Err(Error::KeysExhausted); } diff --git a/neqo-transport/src/lib.rs b/neqo-transport/src/lib.rs index 54ee2afc7b..ee1e2a9ce3 100644 --- a/neqo-transport/src/lib.rs +++ b/neqo-transport/src/lib.rs @@ -76,8 +76,8 @@ const ERROR_AEAD_LIMIT_REACHED: TransportError = 15; #[derive(Clone, Debug, PartialEq, PartialOrd, Ord, Eq)] pub enum Error { NoError, - // Each time tihe error is return a different parameter is supply. - // This will be use to distinguish each occurance of this error. + // Each time this error is returned a different parameter is supplied. + // This will be used to distinguish each occurance of this error. InternalError, ConnectionRefused, FlowControlError, diff --git a/neqo-transport/src/packet/mod.rs b/neqo-transport/src/packet/mod.rs index 04b261a9e0..5c39f59789 100644 --- a/neqo-transport/src/packet/mod.rs +++ b/neqo-transport/src/packet/mod.rs @@ -158,7 +158,7 @@ impl PacketBuilder { } Self { encoder, - pn: u64::max_value(), + pn: u64::MAX, header: header_start..header_start, offsets: PacketBuilderOffsets { first_byte_mask: PACKET_HP_MASK_SHORT, @@ -201,7 +201,7 @@ impl PacketBuilder { Self { encoder, - pn: u64::max_value(), + pn: u64::MAX, header: header_start..header_start, offsets: PacketBuilderOffsets { first_byte_mask: PACKET_HP_MASK_LONG, @@ -555,7 +555,10 @@ impl<'a> PublicPacket<'a> { if packet_type == PacketType::Retry { let header_len = decoder.offset(); let expansion = retry::expansion(version); - let token = Self::opt(decoder.decode(decoder.remaining() - expansion))?; + let token = decoder + .remaining() + .checked_sub(expansion) + .map_or(Err(Error::InvalidPacket), |v| Self::opt(decoder.decode(v)))?; if token.is_empty() { return Err(Error::InvalidPacket); } diff --git a/neqo-transport/src/server.rs b/neqo-transport/src/server.rs index 7d3d144a09..96a6244ef1 100644 --- a/neqo-transport/src/server.rs +++ b/neqo-transport/src/server.rs @@ -15,12 +15,12 @@ use std::{ ops::{Deref, DerefMut}, path::PathBuf, rc::{Rc, Weak}, - time::Instant, + time::{Duration, Instant}, }; use neqo_common::{ self as common, event::Provider, hex, qdebug, qerror, qinfo, qlog::NeqoQlog, qtrace, qwarn, - Datagram, Decoder, Role, + timer::Timer, Datagram, Decoder, Role, }; use neqo_crypto::{ encode_ech_config, AntiReplay, Cipher, PrivateKey, PublicKey, ZeroRttCheckResult, @@ -46,6 +46,13 @@ pub enum InitialResult { /// `MIN_INITIAL_PACKET_SIZE` is the smallest packet that can be used to establish /// a new connection across all QUIC versions this server supports. const MIN_INITIAL_PACKET_SIZE: usize = 1200; +/// The size of timer buckets. This is higher than the actual timer granularity +/// as this depends on there being some distribution of events. +const TIMER_GRANULARITY: Duration = Duration::from_millis(4); +/// The number of buckets in the timer. As mentioned in the definition of `Timer`, +/// the granularity and capacity need to multiply to be larger than the largest +/// delay that might be used. That's the idle timeout (currently 30s). +const TIMER_CAPACITY: usize = 16384; type StateRef = Rc>; type ConnectionTableRef = Rc>>; @@ -54,21 +61,7 @@ type ConnectionTableRef = Rc>>; pub struct ServerConnectionState { c: Connection, active_attempt: Option, - wake_at: Option, -} - -impl ServerConnectionState { - fn set_wake_at(&mut self, at: Instant) { - self.wake_at = Some(at); - } - - fn needs_waking(&self, now: Instant) -> bool { - self.wake_at.map_or(false, |t| t <= now) - } - - fn woken(&mut self) { - self.wake_at = None; - } + last_timer: Instant, } impl Deref for ServerConnectionState { @@ -181,8 +174,8 @@ pub struct Server { active: HashSet, /// The set of connections that need immediate processing. waiting: VecDeque, - /// The latest [`Output::Callback`] returned from [`Server::process`]. - wake_at: Option, + /// Outstanding timers for connections. + timers: Timer, /// Address validation logic, which determines whether we send a Retry. address_validation: Rc>, /// Directory to create qlog traces in @@ -226,10 +219,10 @@ impl Server { connections: Rc::default(), active: HashSet::default(), waiting: VecDeque::default(), + timers: Timer::new(now, TIMER_GRANULARITY, TIMER_CAPACITY), address_validation: Rc::new(RefCell::new(validation)), qlog_dir: None, ech_config: None, - wake_at: None, }) } @@ -267,6 +260,11 @@ impl Server { self.ech_config.as_ref().map_or(&[], |cfg| &cfg.encoded) } + fn remove_timer(&mut self, c: &StateRef) { + let last = c.borrow().last_timer; + self.timers.remove(last, |t| Rc::ptr_eq(t, c)); + } + fn process_connection( &mut self, c: &StateRef, @@ -282,12 +280,16 @@ impl Server { } Output::Callback(delay) => { let next = now + delay; - c.borrow_mut().set_wake_at(next); - if self.wake_at.map_or(true, |c| c > next) { - self.wake_at = Some(next); + if next != c.borrow().last_timer { + qtrace!([self], "Change timer to {:?}", next); + self.remove_timer(c); + c.borrow_mut().last_timer = next; + self.timers.add(next, Rc::clone(c)); } } - Output::None => {} + Output::None => { + self.remove_timer(c); + } } if c.borrow().has_events() { qtrace!([self], "Connection active: {:?}", c); @@ -505,7 +507,7 @@ impl Server { self.setup_connection(&mut c, &attempt_key, initial, orig_dcid); let c = Rc::new(RefCell::new(ServerConnectionState { c, - wake_at: None, + last_timer: now, active_attempt: Some(attempt_key.clone()), })); cid_mgr.borrow_mut().set_connection(&c); @@ -644,28 +646,24 @@ impl Server { return Some(d); } } - - qtrace!([self], "No packet to send still, check wake up times"); - loop { - let connection = self - .connections - .borrow() - .values() - .find(|c| c.borrow().needs_waking(now)) - .cloned()?; - let datagram = self.process_connection(&connection, None, now); - connection.borrow_mut().woken(); - if datagram.is_some() { - return datagram; + qtrace!([self], "No packet to send still, run timers"); + while let Some(c) = self.timers.take_next(now) { + if let Some(d) = self.process_connection(&c, None, now) { + return Some(d); } } + None } - pub fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { - if self.wake_at.map_or(false, |c| c <= now) { - self.wake_at = None; + fn next_time(&mut self, now: Instant) -> Option { + if self.waiting.is_empty() { + self.timers.next_time().map(|x| x - now) + } else { + Some(Duration::new(0, 0)) } + } + pub fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { dgram .and_then(|d| self.process_input(d, now)) .or_else(|| self.process_next_output(now)) @@ -673,7 +671,12 @@ impl Server { qtrace!([self], "Send packet: {:?}", d); Output::Datagram(d) }) - .or_else(|| self.wake_at.take().map(|c| Output::Callback(c - now))) + .or_else(|| { + self.next_time(now).map(|delay| { + qtrace!([self], "Wait: {:?}", delay); + Output::Callback(delay) + }) + }) .unwrap_or_else(|| { qtrace!([self], "Go dormant"); Output::None diff --git a/qns/Dockerfile b/qns/Dockerfile index eed7d3f986..cdb192f203 100644 --- a/qns/Dockerfile +++ b/qns/Dockerfile @@ -1,7 +1,7 @@ FROM martenseemann/quic-network-simulator-endpoint:latest AS buildimage RUN apt-get update && apt-get install -y --no-install-recommends \ - curl git mercurial \ + curl git mercurial coreutils \ build-essential libclang-dev lld \ gyp ninja-build zlib1g-dev python \ && apt-get autoremove -y && apt-get clean -y \ @@ -30,7 +30,7 @@ ADD . /neqo RUN set -eux; \ cd /neqo; \ - RUSTFLAGS="-g -C link-arg=-fuse-ld=lld" cargo build --release \ + RUSTFLAGS="-C link-arg=-fuse-ld=lld" cargo build --release \ --bin neqo-client --bin neqo-server # Copy only binaries to the final image to keep it small. diff --git a/qns/interop.sh b/qns/interop.sh index 4baa6b7e8f..e216e49866 100755 --- a/qns/interop.sh +++ b/qns/interop.sh @@ -10,30 +10,27 @@ export PATH="${PATH}:/neqo/bin" [ -n "$QLOGDIR" ] case "$ROLE" in - client) - /wait-for-it.sh sim:57832 -s -t 30 - sleep 5 - neqo-client --help | head -n 1 - RUST_LOG=debug RUST_BACKTRACE=1 neqo-client --cc cubic --qns-test "$TESTCASE" \ - --qlog-dir "$QLOGDIR" --output-dir /downloads $REQUESTS - ;; +client) + /wait-for-it.sh sim:57832 -s -t 30 + RUST_LOG=debug RUST_BACKTRACE=1 neqo-client --cc cubic --qns-test "$TESTCASE" \ + --qlog-dir "$QLOGDIR" --output-dir /downloads $REQUESTS 2> >(tee -i -a "/logs/$ROLE.log" >&2) + ;; - server) - DB=/neqo/db - CERT=cert - P12CERT=$(mktemp) - mkdir -p "$DB" - certutil -N -d "sql:$DB" --empty-password - openssl pkcs12 -export -nodes -in /certs/cert.pem -inkey /certs/priv.key \ - -name "$CERT" -passout pass: -out "$P12CERT" - pk12util -d "sql:$DB" -i "$P12CERT" -W '' - certutil -L -d "sql:$DB" -n "$CERT" - neqo-server --help | head -n 1 - RUST_LOG=info RUST_BACKTRACE=1 neqo-server --cc cubic --qns-test "$TESTCASE" \ - --qlog-dir "$QLOGDIR" -d "$DB" -k "$CERT" [::]:443 - ;; +server) + DB=/neqo/db + CERT=cert + P12CERT=$(mktemp) + mkdir -p "$DB" + certutil -N -d "sql:$DB" --empty-password + openssl pkcs12 -export -nodes -in /certs/cert.pem -inkey /certs/priv.key \ + -name "$CERT" -passout pass: -out "$P12CERT" + pk12util -d "sql:$DB" -i "$P12CERT" -W '' + certutil -L -d "sql:$DB" -n "$CERT" + RUST_LOG=info RUST_BACKTRACE=1 neqo-server --cc cubic --qns-test "$TESTCASE" \ + --qlog-dir "$QLOGDIR" -d "$DB" -k "$CERT" '[::]:443' 2> >(tee -i -a "/logs/$ROLE.log" >&2) + ;; - *) - exit 1 - ;; +*) + exit 1 + ;; esac diff --git a/test/test.sh b/test/test.sh new file mode 100755 index 0000000000..dc02b2161c --- /dev/null +++ b/test/test.sh @@ -0,0 +1,40 @@ +#! /usr/bin/env bash + +# This script builds the client and server binaries and runs them in a tmux +# session side-by-side. The client connects to the server and the server +# responds with a simple HTTP response. The client and server are run with +# verbose logging and the qlog output is stored in a temporary directory. The +# script also runs tcpdump to capture the packets exchanged between the client +# and server. The script uses tmux to create a split terminal window to display +# the qlog output and the packet capture. + +set -e +tmp=$(mktemp -d) +trap 'rm -rf "$tmp"' EXIT + +cargo build --bin neqo-client --bin neqo-server + +addr=127.0.0.1 +port=4433 +path=/20000 +flags="--verbose --qlog-dir $tmp --use-old-http --alpn hq-interop --quic-version 1" +if [ "$(uname -s)" != "Linux" ]; then + iface=lo0 +else + iface=lo +fi + +client="./target/debug/neqo-client $flags --output-dir $tmp --stats https://$addr:$port$path" +server="SSLKEYLOGFILE=$tmp/test.tlskey ./target/debug/neqo-server $flags $addr:$port" + +tcpdump -U -i "$iface" -w "$tmp/test.pcap" host $addr and port $port >/dev/null 2>&1 & +tcpdump_pid=$! + +tmux -CC \ + set-option -g default-shell "$(which bash)" \; \ + new-session "$client && kill -USR2 $tcpdump_pid && touch $tmp/done" \; \ + split-window -h "$server" \; \ + split-window -v -f "\ + until [ -e $tmp/done ]; do sleep 1; done && \ + tshark -r $tmp/test.pcap -o tls.keylog_file:$tmp/test.tlskey" \; \ + set remain-on-exit on