From e38e3cfa0c688725e88f5cdd7fe850833c910f0a Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 8 Apr 2024 07:01:23 +0200 Subject: [PATCH 01/19] fix(bin/bench): benchmark handshakes per second (HPS) without zero RTT (#1795) The "100-seq-conn/1-1b-resp (aka. HPS)" benchmark supposedly measures the time for 100 uniform handshakes. Though `neqo-client` by default uses a resumption token for zero RTT if available. Thus the first is a normal and all following 99 handshakes are a zero RTT handshake. To simplify the benchmark and to measure the same handshake type, establish a single connection within the benchmark function and have criterion do all the iterating. --- neqo-bin/benches/main.rs | 12 +++--------- neqo-bin/src/client/mod.rs | 4 ++-- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/neqo-bin/benches/main.rs b/neqo-bin/benches/main.rs index 6bb8b3161d..3afcc5d127 100644 --- a/neqo-bin/benches/main.rs +++ b/neqo-bin/benches/main.rs @@ -13,8 +13,6 @@ use tokio::runtime::Runtime; struct Benchmark { name: String, requests: Vec, - /// Download resources in series using separate connections. - download_in_series: bool, sample_size: Option, } @@ -27,25 +25,21 @@ fn transfer(c: &mut Criterion) { for Benchmark { name, requests, - download_in_series, sample_size, } in [ Benchmark { name: "1-conn/1-100mb-resp (aka. Download)".to_string(), requests: vec![100 * 1024 * 1024], - download_in_series: false, sample_size: Some(10), }, Benchmark { name: "1-conn/10_000-1b-seq-resp (aka. RPS)".to_string(), requests: vec![1; 10_000], - download_in_series: false, sample_size: None, }, Benchmark { - name: "100-seq-conn/1-1b-resp (aka. HPS)".to_string(), - requests: vec![1; 100], - download_in_series: true, + name: "1-conn/1-1b-resp (aka. HPS)".to_string(), + requests: vec![1; 1], sample_size: None, }, ] { @@ -61,7 +55,7 @@ fn transfer(c: &mut Criterion) { } group.bench_function("client", |b| { b.to_async(Runtime::new().unwrap()).iter_batched( - || client::client(client::Args::new(&requests, download_in_series)), + || client::client(client::Args::new(&requests)), |client| async move { client.await.unwrap(); }, diff --git a/neqo-bin/src/client/mod.rs b/neqo-bin/src/client/mod.rs index 81721802e1..ad6e34e15b 100644 --- a/neqo-bin/src/client/mod.rs +++ b/neqo-bin/src/client/mod.rs @@ -200,7 +200,7 @@ impl Args { #[must_use] #[cfg(feature = "bench")] #[allow(clippy::missing_panics_doc)] - pub fn new(requests: &[u64], download_in_series: bool) -> Self { + pub fn new(requests: &[u64]) -> Self { use std::str::FromStr; Self { verbose: clap_verbosity_flag::Verbosity::::default(), @@ -212,7 +212,7 @@ impl Args { method: "GET".into(), header: vec![], max_concurrent_push_streams: 10, - download_in_series, + download_in_series: false, concurrency: 100, output_read_data: false, output_dir: Some("/dev/null".into()), From aca1352b6bf93d8fe2cf0f627dc2fdf83a0bf95d Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 8 Apr 2024 07:01:58 +0200 Subject: [PATCH 02/19] fix(bin/bench): rename 1-conn/10_000-1b-seq-resp to parallel (#1796) The benchmark is using the default concurrency factor. By default `neqo-client` runs up to `100` requests in parallel. https://github.com/mozilla/neqo/blob/5dfe106669ccb695187511305c21b8e8a8775e91/neqo-bin/src/client/mod.rs#L151-L153 Thus the benchmark name is wrong, i.e. the requests are run in parallel and not sequentially. --- neqo-bin/benches/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/neqo-bin/benches/main.rs b/neqo-bin/benches/main.rs index 3afcc5d127..59927ebe0c 100644 --- a/neqo-bin/benches/main.rs +++ b/neqo-bin/benches/main.rs @@ -33,7 +33,7 @@ fn transfer(c: &mut Criterion) { sample_size: Some(10), }, Benchmark { - name: "1-conn/10_000-1b-seq-resp (aka. RPS)".to_string(), + name: "1-conn/10_000-parallel-1b-resp (aka. RPS)".to_string(), requests: vec![1; 10_000], sample_size: None, }, From 992d588ea09b896b7877692e95e2c561d22aa907 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 8 Apr 2024 13:05:32 +0300 Subject: [PATCH 03/19] build(deps): bump github/codeql-action from 3.24.9 to 3.24.10 (#1797) Bumps [github/codeql-action](https://github.com/github/codeql-action) from 3.24.9 to 3.24.10. - [Release notes](https://github.com/github/codeql-action/releases) - [Changelog](https://github.com/github/codeql-action/blob/main/CHANGELOG.md) - [Commits](https://github.com/github/codeql-action/compare/1b1aada464948af03b950897e5eb522f92603cc2...4355270be187e1b672a7a1c7c7bae5afdc1ab94a) --- updated-dependencies: - dependency-name: github/codeql-action dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/scorecard.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/scorecard.yml b/.github/workflows/scorecard.yml index 01e5fe16a8..2c0b04d09e 100644 --- a/.github/workflows/scorecard.yml +++ b/.github/workflows/scorecard.yml @@ -67,6 +67,6 @@ jobs: # Upload the results to GitHub's code scanning dashboard. - name: "Upload to code-scanning" - uses: github/codeql-action/upload-sarif@1b1aada464948af03b950897e5eb522f92603cc2 # v3.24.9 + uses: github/codeql-action/upload-sarif@4355270be187e1b672a7a1c7c7bae5afdc1ab94a # v3.24.10 with: sarif_file: results.sarif From a65d945344a2dee7ddb2e81f0875bdb24c00ebec Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Mon, 8 Apr 2024 14:31:53 +0300 Subject: [PATCH 04/19] chore: Fix clippy `usage of a legacy numeric method` (#1798) `u64::max_value()` -> `u64::MAX` --- neqo-crypto/src/time.rs | 4 ++-- neqo-qpack/src/table.rs | 2 +- neqo-transport/src/crypto.rs | 2 +- neqo-transport/src/packet/mod.rs | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/neqo-crypto/src/time.rs b/neqo-crypto/src/time.rs index 0e59c4f5e2..359436a854 100644 --- a/neqo-crypto/src/time.rs +++ b/neqo-crypto/src/time.rs @@ -258,11 +258,11 @@ mod test { #[test] // We allow replace_consts here because - // std::u64::max_value() isn't available + // std::u64::MAX isn't available // in all of our targets fn overflow_interval() { init(); - let interval = Interval::from(Duration::from_micros(u64::max_value())); + let interval = Interval::from(Duration::from_micros(u64::MAX)); let res: Res = interval.try_into(); assert!(res.is_err()); } diff --git a/neqo-qpack/src/table.rs b/neqo-qpack/src/table.rs index 517e98db09..d5275ec98f 100644 --- a/neqo-qpack/src/table.rs +++ b/neqo-qpack/src/table.rs @@ -94,7 +94,7 @@ impl HeaderTable { capacity: 0, used: 0, base: 0, - acked_inserts_cnt: if encoder { 0 } else { u64::max_value() }, + acked_inserts_cnt: if encoder { 0 } else { u64::MAX }, } } diff --git a/neqo-transport/src/crypto.rs b/neqo-transport/src/crypto.rs index 54bfe622cf..60d056f2d2 100644 --- a/neqo-transport/src/crypto.rs +++ b/neqo-transport/src/crypto.rs @@ -758,7 +758,7 @@ impl CryptoDxAppData { } pub fn next(&self) -> Res { - if self.dx.epoch == usize::max_value() { + if self.dx.epoch == usize::MAX { // Guard against too many key updates. return Err(Error::KeysExhausted); } diff --git a/neqo-transport/src/packet/mod.rs b/neqo-transport/src/packet/mod.rs index d435ac0dd8..f5e8320ccb 100644 --- a/neqo-transport/src/packet/mod.rs +++ b/neqo-transport/src/packet/mod.rs @@ -158,7 +158,7 @@ impl PacketBuilder { } Self { encoder, - pn: u64::max_value(), + pn: u64::MAX, header: header_start..header_start, offsets: PacketBuilderOffsets { first_byte_mask: PACKET_HP_MASK_SHORT, @@ -201,7 +201,7 @@ impl PacketBuilder { Self { encoder, - pn: u64::max_value(), + pn: u64::MAX, header: header_start..header_start, offsets: PacketBuilderOffsets { first_byte_mask: PACKET_HP_MASK_LONG, From 32a2a59e4ea0c09bbb5ddb0c0a42fc6f0658dc70 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 8 Apr 2024 15:49:03 +0200 Subject: [PATCH 05/19] fix(client): exit with non-zero on error (#1786) * fix(client): exit with non-zero on error When a connection closes with an error, surface the error to the user and exit with non-zero. * Trigger CI --- neqo-bin/src/client/http09.rs | 11 +++++++---- neqo-bin/src/client/http3.rs | 10 +++++++--- neqo-bin/src/client/mod.rs | 26 ++++++++++++++++++++++---- neqo-transport/src/lib.rs | 4 ++-- 4 files changed, 38 insertions(+), 13 deletions(-) diff --git a/neqo-bin/src/client/http09.rs b/neqo-bin/src/client/http09.rs index 9bdb6dca85..b157a6a13f 100644 --- a/neqo-bin/src/client/http09.rs +++ b/neqo-bin/src/client/http09.rs @@ -20,8 +20,8 @@ use std::{ use neqo_common::{event::Provider, qdebug, qinfo, qwarn, Datagram}; use neqo_crypto::{AuthenticationStatus, ResumptionToken}; use neqo_transport::{ - Connection, ConnectionEvent, EmptyConnectionIdGenerator, Error, Output, State, StreamId, - StreamType, + Connection, ConnectionError, ConnectionEvent, EmptyConnectionIdGenerator, Error, Output, State, + StreamId, StreamType, }; use url::Url; @@ -149,8 +149,11 @@ impl super::Client for Connection { self.close(now, app_error, msg); } - fn is_closed(&self) -> bool { - matches!(self.state(), State::Closed(..)) + fn is_closed(&self) -> Option { + if let State::Closed(err) = self.state() { + return Some(err.clone()); + } + None } fn stats(&self) -> neqo_transport::Stats { diff --git a/neqo-bin/src/client/http3.rs b/neqo-bin/src/client/http3.rs index c88a8448f6..d56af5eda9 100644 --- a/neqo-bin/src/client/http3.rs +++ b/neqo-bin/src/client/http3.rs @@ -22,7 +22,8 @@ use neqo_common::{event::Provider, hex, qdebug, qinfo, qwarn, Datagram, Header}; use neqo_crypto::{AuthenticationStatus, ResumptionToken}; use neqo_http3::{Error, Http3Client, Http3ClientEvent, Http3Parameters, Http3State, Priority}; use neqo_transport::{ - AppError, Connection, EmptyConnectionIdGenerator, Error as TransportError, Output, StreamId, + AppError, Connection, ConnectionError, EmptyConnectionIdGenerator, Error as TransportError, + Output, StreamId, }; use url::Url; @@ -111,8 +112,11 @@ pub(crate) fn create_client( } impl super::Client for Http3Client { - fn is_closed(&self) -> bool { - matches!(self.state(), Http3State::Closed(..)) + fn is_closed(&self) -> Option { + if let Http3State::Closed(err) = self.state() { + return Some(err); + } + None } fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { diff --git a/neqo-bin/src/client/mod.rs b/neqo-bin/src/client/mod.rs index ad6e34e15b..49c116aa95 100644 --- a/neqo-bin/src/client/mod.rs +++ b/neqo-bin/src/client/mod.rs @@ -27,7 +27,7 @@ use neqo_crypto::{ init, Cipher, ResumptionToken, }; use neqo_http3::Output; -use neqo_transport::{AppError, ConnectionId, Error as TransportError, Version}; +use neqo_transport::{AppError, ConnectionError, ConnectionId, Error as TransportError, Version}; use qlog::{events::EventImportance, streamer::QlogStreamer}; use tokio::time::Sleep; use url::{Origin, Url}; @@ -46,6 +46,7 @@ pub enum Error { IoError(io::Error), QlogError, TransportError(neqo_transport::Error), + ApplicationError(neqo_transport::AppError), CryptoError(neqo_crypto::Error), } @@ -79,6 +80,15 @@ impl From for Error { } } +impl From for Error { + fn from(err: neqo_transport::ConnectionError) -> Self { + match err { + ConnectionError::Transport(e) => Self::TransportError(e), + ConnectionError::Application(e) => Self::ApplicationError(e), + } + } +} + impl Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "Error: {self:?}")?; @@ -371,7 +381,11 @@ trait Client { fn close(&mut self, now: Instant, app_error: AppError, msg: S) where S: AsRef + Display; - fn is_closed(&self) -> bool; + /// Returns [`Some(_)`] if the connection is closed. + /// + /// Note that connection was closed without error on + /// [`Some(ConnectionError::Transport(TransportError::NoError))`]. + fn is_closed(&self) -> Option; fn stats(&self) -> neqo_transport::Stats; } @@ -406,11 +420,15 @@ impl<'a, H: Handler> Runner<'a, H> { self.process(None).await?; - if self.client.is_closed() { + if let Some(reason) = self.client.is_closed() { if self.args.stats { qinfo!("{:?}", self.client.stats()); } - return Ok(self.handler.take_token()); + return match reason { + ConnectionError::Transport(TransportError::NoError) + | ConnectionError::Application(0) => Ok(self.handler.take_token()), + _ => Err(reason.into()), + }; } match ready(self.socket, self.timeout.as_mut()).await? { diff --git a/neqo-transport/src/lib.rs b/neqo-transport/src/lib.rs index 8fabbeb9a3..5488472b58 100644 --- a/neqo-transport/src/lib.rs +++ b/neqo-transport/src/lib.rs @@ -70,8 +70,8 @@ const ERROR_AEAD_LIMIT_REACHED: TransportError = 15; #[derive(Clone, Debug, PartialEq, PartialOrd, Ord, Eq)] pub enum Error { NoError, - // Each time tihe error is return a different parameter is supply. - // This will be use to distinguish each occurance of this error. + // Each time this error is returned a different parameter is supplied. + // This will be used to distinguish each occurance of this error. InternalError, ConnectionRefused, FlowControlError, From 6daede078b19c99a53ea5f4405c124683723b08e Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Tue, 9 Apr 2024 08:49:51 +0300 Subject: [PATCH 06/19] chore: Fix various QNS issues (#1799) * Record client and server logs during run * Fix result comment table formatting * Fix docker image build (on macOS at least) --- .dockerignore | 1 + .../actions/quic-interop-runner/action.yml | 11 ++++- qns/Dockerfile | 4 +- qns/interop.sh | 45 +++++++++---------- 4 files changed, 33 insertions(+), 28 deletions(-) diff --git a/.dockerignore b/.dockerignore index cc95fda49e..324520383b 100644 --- a/.dockerignore +++ b/.dockerignore @@ -5,5 +5,6 @@ !**/*.rs !**/*.h !**/*.hpp +!neqo-crypto/min_version.txt !qns !Cargo.lock diff --git a/.github/actions/quic-interop-runner/action.yml b/.github/actions/quic-interop-runner/action.yml index 4d88191646..7fd055f184 100644 --- a/.github/actions/quic-interop-runner/action.yml +++ b/.github/actions/quic-interop-runner/action.yml @@ -91,15 +91,22 @@ runs: - name: Format GitHub comment if: always() run: | + if [ -s quic-interop-runner/summary ]; then + exit 0 + fi echo '[**QUIC Interop Runner**](https://github.com/quic-interop/quic-interop-runner)' >> comment echo '' >> comment # Ignore all, but table, which starts with "|". - grep -E '^\|' quic-interop-runner/summary >> comment + grep -E '^\|' quic-interop-runner/summary |\ + awk '(!/^\| *:-/ || (d++ && d < 3))' |\ + sed -E -e 's/✓/:white_check_mark:/gi' -e 's/✕/:x:/gi' -e 's/\?/:grey_question:/gi' \ + >> comment echo '' >> comment + echo "EXPORT_COMMENT=1" >> "$GITHUB_ENV" shell: bash - name: Export PR comment data - if: always() + if: env.EXPORT_COMMENT == '1' uses: ./.github/actions/pr-comment-data-export with: name: qns diff --git a/qns/Dockerfile b/qns/Dockerfile index eed7d3f986..cdb192f203 100644 --- a/qns/Dockerfile +++ b/qns/Dockerfile @@ -1,7 +1,7 @@ FROM martenseemann/quic-network-simulator-endpoint:latest AS buildimage RUN apt-get update && apt-get install -y --no-install-recommends \ - curl git mercurial \ + curl git mercurial coreutils \ build-essential libclang-dev lld \ gyp ninja-build zlib1g-dev python \ && apt-get autoremove -y && apt-get clean -y \ @@ -30,7 +30,7 @@ ADD . /neqo RUN set -eux; \ cd /neqo; \ - RUSTFLAGS="-g -C link-arg=-fuse-ld=lld" cargo build --release \ + RUSTFLAGS="-C link-arg=-fuse-ld=lld" cargo build --release \ --bin neqo-client --bin neqo-server # Copy only binaries to the final image to keep it small. diff --git a/qns/interop.sh b/qns/interop.sh index 4baa6b7e8f..e216e49866 100755 --- a/qns/interop.sh +++ b/qns/interop.sh @@ -10,30 +10,27 @@ export PATH="${PATH}:/neqo/bin" [ -n "$QLOGDIR" ] case "$ROLE" in - client) - /wait-for-it.sh sim:57832 -s -t 30 - sleep 5 - neqo-client --help | head -n 1 - RUST_LOG=debug RUST_BACKTRACE=1 neqo-client --cc cubic --qns-test "$TESTCASE" \ - --qlog-dir "$QLOGDIR" --output-dir /downloads $REQUESTS - ;; +client) + /wait-for-it.sh sim:57832 -s -t 30 + RUST_LOG=debug RUST_BACKTRACE=1 neqo-client --cc cubic --qns-test "$TESTCASE" \ + --qlog-dir "$QLOGDIR" --output-dir /downloads $REQUESTS 2> >(tee -i -a "/logs/$ROLE.log" >&2) + ;; - server) - DB=/neqo/db - CERT=cert - P12CERT=$(mktemp) - mkdir -p "$DB" - certutil -N -d "sql:$DB" --empty-password - openssl pkcs12 -export -nodes -in /certs/cert.pem -inkey /certs/priv.key \ - -name "$CERT" -passout pass: -out "$P12CERT" - pk12util -d "sql:$DB" -i "$P12CERT" -W '' - certutil -L -d "sql:$DB" -n "$CERT" - neqo-server --help | head -n 1 - RUST_LOG=info RUST_BACKTRACE=1 neqo-server --cc cubic --qns-test "$TESTCASE" \ - --qlog-dir "$QLOGDIR" -d "$DB" -k "$CERT" [::]:443 - ;; +server) + DB=/neqo/db + CERT=cert + P12CERT=$(mktemp) + mkdir -p "$DB" + certutil -N -d "sql:$DB" --empty-password + openssl pkcs12 -export -nodes -in /certs/cert.pem -inkey /certs/priv.key \ + -name "$CERT" -passout pass: -out "$P12CERT" + pk12util -d "sql:$DB" -i "$P12CERT" -W '' + certutil -L -d "sql:$DB" -n "$CERT" + RUST_LOG=info RUST_BACKTRACE=1 neqo-server --cc cubic --qns-test "$TESTCASE" \ + --qlog-dir "$QLOGDIR" -d "$DB" -k "$CERT" '[::]:443' 2> >(tee -i -a "/logs/$ROLE.log" >&2) + ;; - *) - exit 1 - ;; +*) + exit 1 + ;; esac From 342e4e785e9edbe0ec4d4c6c6f51716a293e27a6 Mon Sep 17 00:00:00 2001 From: Kershaw Date: Tue, 9 Apr 2024 07:46:47 +0200 Subject: [PATCH 07/19] Revert "perf(transport): remove Server::timers (#1784)" (#1800) This reverts commit 61fcd282c420da03a566cf3324b4dc86b04415e4. --- neqo-common/Cargo.toml | 4 + neqo-common/benches/timer.rs | 39 ++++ neqo-common/src/lib.rs | 1 + neqo-common/src/timer.rs | 420 +++++++++++++++++++++++++++++++++++ neqo-transport/src/server.rs | 87 ++++---- 5 files changed, 509 insertions(+), 42 deletions(-) create mode 100644 neqo-common/benches/timer.rs create mode 100644 neqo-common/src/timer.rs diff --git a/neqo-common/Cargo.toml b/neqo-common/Cargo.toml index 0cb4bcbf4f..069d67b834 100644 --- a/neqo-common/Cargo.toml +++ b/neqo-common/Cargo.toml @@ -34,3 +34,7 @@ features = ["timeapi"] [lib] # See https://github.com/bheisler/criterion.rs/blob/master/book/src/faq.md#cargo-bench-gives-unrecognized-option-errors-for-valid-command-line-options bench = false + +[[bench]] +name = "timer" +harness = false diff --git a/neqo-common/benches/timer.rs b/neqo-common/benches/timer.rs new file mode 100644 index 0000000000..5ac8019db4 --- /dev/null +++ b/neqo-common/benches/timer.rs @@ -0,0 +1,39 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::time::{Duration, Instant}; + +use criterion::{criterion_group, criterion_main, Criterion}; +use neqo_common::timer::Timer; +use test_fixture::now; + +fn benchmark_timer(c: &mut Criterion) { + c.bench_function("drain a timer quickly", |b| { + b.iter_batched_ref( + make_timer, + |(_now, timer)| { + while let Some(t) = timer.next_time() { + assert!(timer.take_next(t).is_some()); + } + }, + criterion::BatchSize::SmallInput, + ); + }); +} + +fn make_timer() -> (Instant, Timer<()>) { + const TIMES: &[u64] = &[1, 2, 3, 5, 8, 13, 21, 34]; + + let now = now(); + let mut timer = Timer::new(now, Duration::from_millis(777), 100); + for &t in TIMES { + timer.add(now + Duration::from_secs(t), ()); + } + (now, timer) +} + +criterion_group!(benches, benchmark_timer); +criterion_main!(benches); diff --git a/neqo-common/src/lib.rs b/neqo-common/src/lib.rs index f3e8e63023..e988c6071d 100644 --- a/neqo-common/src/lib.rs +++ b/neqo-common/src/lib.rs @@ -14,6 +14,7 @@ pub mod hrtime; mod incrdecoder; pub mod log; pub mod qlog; +pub mod timer; pub mod tos; use std::fmt::Write; diff --git a/neqo-common/src/timer.rs b/neqo-common/src/timer.rs new file mode 100644 index 0000000000..3feddb2226 --- /dev/null +++ b/neqo-common/src/timer.rs @@ -0,0 +1,420 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::{ + collections::VecDeque, + mem, + time::{Duration, Instant}, +}; + +/// Internal structure for a timer item. +struct TimerItem { + time: Instant, + item: T, +} + +impl TimerItem { + fn time(ti: &Self) -> Instant { + ti.time + } +} + +/// A timer queue. +/// This uses a classic timer wheel arrangement, with some characteristics that might be considered +/// peculiar. Each slot in the wheel is sorted (complexity O(N) insertions, but O(logN) to find cut +/// points). Time is relative, the wheel has an origin time and it is unable to represent times that +/// are more than `granularity * capacity` past that time. +pub struct Timer { + items: Vec>>, + now: Instant, + granularity: Duration, + cursor: usize, +} + +impl Timer { + /// Construct a new wheel at the given granularity, starting at the given time. + /// + /// # Panics + /// + /// When `capacity` is too large to fit in `u32` or `granularity` is zero. + pub fn new(now: Instant, granularity: Duration, capacity: usize) -> Self { + assert!(u32::try_from(capacity).is_ok()); + assert!(granularity.as_nanos() > 0); + let mut items = Vec::with_capacity(capacity); + items.resize_with(capacity, Default::default); + Self { + items, + now, + granularity, + cursor: 0, + } + } + + /// Return a reference to the time of the next entry. + #[must_use] + pub fn next_time(&self) -> Option { + let idx = self.bucket(0); + for i in idx..self.items.len() { + if let Some(t) = self.items[i].front() { + return Some(t.time); + } + } + for i in 0..idx { + if let Some(t) = self.items[i].front() { + return Some(t.time); + } + } + None + } + + /// Get the full span of time that this can cover. + /// Two timers cannot be more than this far apart. + /// In practice, this value is less by one amount of the timer granularity. + #[inline] + #[allow(clippy::cast_possible_truncation)] // guarded by assertion + #[must_use] + pub fn span(&self) -> Duration { + self.granularity * (self.items.len() as u32) + } + + /// For the given `time`, get the number of whole buckets in the future that is. + #[inline] + #[allow(clippy::cast_possible_truncation)] // guarded by assertion + fn delta(&self, time: Instant) -> usize { + // This really should use Duration::div_duration_f??(), but it can't yet. + ((time - self.now).as_nanos() / self.granularity.as_nanos()) as usize + } + + #[inline] + fn time_bucket(&self, time: Instant) -> usize { + self.bucket(self.delta(time)) + } + + #[inline] + fn bucket(&self, delta: usize) -> usize { + debug_assert!(delta < self.items.len()); + (self.cursor + delta) % self.items.len() + } + + /// Slide forward in time by `n * self.granularity`. + #[allow(clippy::cast_possible_truncation, clippy::reversed_empty_ranges)] + // cast_possible_truncation is ok because we have an assertion guard. + // reversed_empty_ranges is to avoid different types on the if/else. + fn tick(&mut self, n: usize) { + let new = self.bucket(n); + let iter = if new < self.cursor { + (self.cursor..self.items.len()).chain(0..new) + } else { + (self.cursor..new).chain(0..0) + }; + for i in iter { + assert!(self.items[i].is_empty()); + } + self.now += self.granularity * (n as u32); + self.cursor = new; + } + + /// Asserts if the time given is in the past or too far in the future. + /// + /// # Panics + /// + /// When `time` is in the past relative to previous calls. + pub fn add(&mut self, time: Instant, item: T) { + assert!(time >= self.now); + // Skip forward quickly if there is too large a gap. + let short_span = self.span() - self.granularity; + if time >= (self.now + self.span() + short_span) { + // Assert that there aren't any items. + for i in &self.items { + debug_assert!(i.is_empty()); + } + self.now = time.checked_sub(short_span).unwrap(); + self.cursor = 0; + } + + // Adjust time forward the minimum amount necessary. + let mut d = self.delta(time); + if d >= self.items.len() { + self.tick(1 + d - self.items.len()); + d = self.items.len() - 1; + } + + let bucket = self.bucket(d); + let ins = match self.items[bucket].binary_search_by_key(&time, TimerItem::time) { + Ok(j) | Err(j) => j, + }; + self.items[bucket].insert(ins, TimerItem { time, item }); + } + + /// Given knowledge of the time an item was added, remove it. + /// This requires use of a predicate that identifies matching items. + /// + /// # Panics + /// Impossible, I think. + pub fn remove(&mut self, time: Instant, mut selector: F) -> Option + where + F: FnMut(&T) -> bool, + { + if time < self.now { + return None; + } + if time > self.now + self.span() { + return None; + } + let bucket = self.time_bucket(time); + let Ok(start_index) = self.items[bucket].binary_search_by_key(&time, TimerItem::time) + else { + return None; + }; + // start_index is just one of potentially many items with the same time. + // Search backwards for a match, ... + for i in (0..=start_index).rev() { + if self.items[bucket][i].time != time { + break; + } + if selector(&self.items[bucket][i].item) { + return Some(self.items[bucket].remove(i).unwrap().item); + } + } + // ... then forwards. + for i in (start_index + 1)..self.items[bucket].len() { + if self.items[bucket][i].time != time { + break; + } + if selector(&self.items[bucket][i].item) { + return Some(self.items[bucket].remove(i).unwrap().item); + } + } + None + } + + /// Take the next item, unless there are no items with + /// a timeout in the past relative to `until`. + pub fn take_next(&mut self, until: Instant) -> Option { + fn maybe_take(v: &mut VecDeque>, until: Instant) -> Option { + if !v.is_empty() && v[0].time <= until { + Some(v.pop_front().unwrap().item) + } else { + None + } + } + + let idx = self.bucket(0); + for i in idx..self.items.len() { + let res = maybe_take(&mut self.items[i], until); + if res.is_some() { + return res; + } + } + for i in 0..idx { + let res = maybe_take(&mut self.items[i], until); + if res.is_some() { + return res; + } + } + None + } + + /// Create an iterator that takes all items until the given time. + /// Note: Items might be removed even if the iterator is not fully exhausted. + pub fn take_until(&mut self, until: Instant) -> impl Iterator { + let get_item = move |x: TimerItem| x.item; + if until >= self.now + self.span() { + // Drain everything, so a clean sweep. + let mut empty_items = Vec::with_capacity(self.items.len()); + empty_items.resize_with(self.items.len(), VecDeque::default); + let mut items = mem::replace(&mut self.items, empty_items); + self.now = until; + self.cursor = 0; + + let tail = items.split_off(self.cursor); + return tail.into_iter().chain(items).flatten().map(get_item); + } + + // Only returning a partial span, so do it bucket at a time. + let delta = self.delta(until); + let mut buckets = Vec::with_capacity(delta + 1); + + // First, the whole buckets. + for i in 0..delta { + let idx = self.bucket(i); + buckets.push(mem::take(&mut self.items[idx])); + } + self.tick(delta); + + // Now we need to split the last bucket, because there might be + // some items with `item.time > until`. + let bucket = &mut self.items[self.cursor]; + let last_idx = match bucket.binary_search_by_key(&until, TimerItem::time) { + Ok(mut m) => { + // If there are multiple values, the search will hit any of them. + // Make sure to get them all. + while m < bucket.len() && bucket[m].time == until { + m += 1; + } + m + } + Err(ins) => ins, + }; + let tail = bucket.split_off(last_idx); + buckets.push(mem::replace(bucket, tail)); + // This tomfoolery with the empty vector ensures that + // the returned type here matches the one above precisely + // without having to invoke the `either` crate. + buckets.into_iter().chain(vec![]).flatten().map(get_item) + } +} + +#[cfg(test)] +mod test { + use std::sync::OnceLock; + + use super::{Duration, Instant, Timer}; + + fn now() -> Instant { + static NOW: OnceLock = OnceLock::new(); + *NOW.get_or_init(Instant::now) + } + + const GRANULARITY: Duration = Duration::from_millis(10); + const CAPACITY: usize = 10; + #[test] + fn create() { + let t: Timer<()> = Timer::new(now(), GRANULARITY, CAPACITY); + assert_eq!(t.span(), Duration::from_millis(100)); + assert_eq!(None, t.next_time()); + } + + #[test] + fn immediate_entry() { + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + t.add(now(), 12); + assert_eq!(now(), t.next_time().expect("should have an entry")); + let values: Vec<_> = t.take_until(now()).collect(); + assert_eq!(vec![12], values); + } + + #[test] + fn same_time() { + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + let v1 = 12; + let v2 = 13; + t.add(now(), v1); + t.add(now(), v2); + assert_eq!(now(), t.next_time().expect("should have an entry")); + let values: Vec<_> = t.take_until(now()).collect(); + assert!(values.contains(&v1)); + assert!(values.contains(&v2)); + } + + #[test] + fn add() { + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + let near_future = now() + Duration::from_millis(17); + let v = 9; + t.add(near_future, v); + assert_eq!(near_future, t.next_time().expect("should return a value")); + assert_eq!( + t.take_until(near_future.checked_sub(Duration::from_millis(1)).unwrap()) + .count(), + 0 + ); + assert!(t + .take_until(near_future + Duration::from_millis(1)) + .any(|x| x == v)); + } + + #[test] + fn add_future() { + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + let future = now() + Duration::from_millis(117); + let v = 9; + t.add(future, v); + assert_eq!(future, t.next_time().expect("should return a value")); + assert!(t.take_until(future).any(|x| x == v)); + } + + #[test] + fn add_far_future() { + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + let far_future = now() + Duration::from_millis(892); + let v = 9; + t.add(far_future, v); + assert_eq!(far_future, t.next_time().expect("should return a value")); + assert!(t.take_until(far_future).any(|x| x == v)); + } + + const TIMES: &[Duration] = &[ + Duration::from_millis(40), + Duration::from_millis(91), + Duration::from_millis(6), + Duration::from_millis(3), + Duration::from_millis(22), + Duration::from_millis(40), + ]; + + fn with_times() -> Timer { + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + for (i, time) in TIMES.iter().enumerate() { + t.add(now() + *time, i); + } + assert_eq!( + now() + *TIMES.iter().min().unwrap(), + t.next_time().expect("should have a time") + ); + t + } + + #[test] + #[allow(clippy::needless_collect)] // false positive + fn multiple_values() { + let mut t = with_times(); + let values: Vec<_> = t.take_until(now() + *TIMES.iter().max().unwrap()).collect(); + for i in 0..TIMES.len() { + assert!(values.contains(&i)); + } + } + + #[test] + #[allow(clippy::needless_collect)] // false positive + fn take_far_future() { + let mut t = with_times(); + let values: Vec<_> = t.take_until(now() + Duration::from_secs(100)).collect(); + for i in 0..TIMES.len() { + assert!(values.contains(&i)); + } + } + + #[test] + fn remove_each() { + let mut t = with_times(); + for (i, time) in TIMES.iter().enumerate() { + assert_eq!(Some(i), t.remove(now() + *time, |&x| x == i)); + } + assert_eq!(None, t.next_time()); + } + + #[test] + fn remove_future() { + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + let future = now() + Duration::from_millis(117); + let v = 9; + t.add(future, v); + + assert_eq!(Some(v), t.remove(future, |candidate| *candidate == v)); + } + + #[test] + fn remove_too_far_future() { + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + let future = now() + Duration::from_millis(117); + let too_far_future = now() + t.span() + Duration::from_millis(117); + let v = 9; + t.add(future, v); + + assert_eq!(None, t.remove(too_far_future, |candidate| *candidate == v)); + } +} diff --git a/neqo-transport/src/server.rs b/neqo-transport/src/server.rs index 7d3d144a09..96a6244ef1 100644 --- a/neqo-transport/src/server.rs +++ b/neqo-transport/src/server.rs @@ -15,12 +15,12 @@ use std::{ ops::{Deref, DerefMut}, path::PathBuf, rc::{Rc, Weak}, - time::Instant, + time::{Duration, Instant}, }; use neqo_common::{ self as common, event::Provider, hex, qdebug, qerror, qinfo, qlog::NeqoQlog, qtrace, qwarn, - Datagram, Decoder, Role, + timer::Timer, Datagram, Decoder, Role, }; use neqo_crypto::{ encode_ech_config, AntiReplay, Cipher, PrivateKey, PublicKey, ZeroRttCheckResult, @@ -46,6 +46,13 @@ pub enum InitialResult { /// `MIN_INITIAL_PACKET_SIZE` is the smallest packet that can be used to establish /// a new connection across all QUIC versions this server supports. const MIN_INITIAL_PACKET_SIZE: usize = 1200; +/// The size of timer buckets. This is higher than the actual timer granularity +/// as this depends on there being some distribution of events. +const TIMER_GRANULARITY: Duration = Duration::from_millis(4); +/// The number of buckets in the timer. As mentioned in the definition of `Timer`, +/// the granularity and capacity need to multiply to be larger than the largest +/// delay that might be used. That's the idle timeout (currently 30s). +const TIMER_CAPACITY: usize = 16384; type StateRef = Rc>; type ConnectionTableRef = Rc>>; @@ -54,21 +61,7 @@ type ConnectionTableRef = Rc>>; pub struct ServerConnectionState { c: Connection, active_attempt: Option, - wake_at: Option, -} - -impl ServerConnectionState { - fn set_wake_at(&mut self, at: Instant) { - self.wake_at = Some(at); - } - - fn needs_waking(&self, now: Instant) -> bool { - self.wake_at.map_or(false, |t| t <= now) - } - - fn woken(&mut self) { - self.wake_at = None; - } + last_timer: Instant, } impl Deref for ServerConnectionState { @@ -181,8 +174,8 @@ pub struct Server { active: HashSet, /// The set of connections that need immediate processing. waiting: VecDeque, - /// The latest [`Output::Callback`] returned from [`Server::process`]. - wake_at: Option, + /// Outstanding timers for connections. + timers: Timer, /// Address validation logic, which determines whether we send a Retry. address_validation: Rc>, /// Directory to create qlog traces in @@ -226,10 +219,10 @@ impl Server { connections: Rc::default(), active: HashSet::default(), waiting: VecDeque::default(), + timers: Timer::new(now, TIMER_GRANULARITY, TIMER_CAPACITY), address_validation: Rc::new(RefCell::new(validation)), qlog_dir: None, ech_config: None, - wake_at: None, }) } @@ -267,6 +260,11 @@ impl Server { self.ech_config.as_ref().map_or(&[], |cfg| &cfg.encoded) } + fn remove_timer(&mut self, c: &StateRef) { + let last = c.borrow().last_timer; + self.timers.remove(last, |t| Rc::ptr_eq(t, c)); + } + fn process_connection( &mut self, c: &StateRef, @@ -282,12 +280,16 @@ impl Server { } Output::Callback(delay) => { let next = now + delay; - c.borrow_mut().set_wake_at(next); - if self.wake_at.map_or(true, |c| c > next) { - self.wake_at = Some(next); + if next != c.borrow().last_timer { + qtrace!([self], "Change timer to {:?}", next); + self.remove_timer(c); + c.borrow_mut().last_timer = next; + self.timers.add(next, Rc::clone(c)); } } - Output::None => {} + Output::None => { + self.remove_timer(c); + } } if c.borrow().has_events() { qtrace!([self], "Connection active: {:?}", c); @@ -505,7 +507,7 @@ impl Server { self.setup_connection(&mut c, &attempt_key, initial, orig_dcid); let c = Rc::new(RefCell::new(ServerConnectionState { c, - wake_at: None, + last_timer: now, active_attempt: Some(attempt_key.clone()), })); cid_mgr.borrow_mut().set_connection(&c); @@ -644,28 +646,24 @@ impl Server { return Some(d); } } - - qtrace!([self], "No packet to send still, check wake up times"); - loop { - let connection = self - .connections - .borrow() - .values() - .find(|c| c.borrow().needs_waking(now)) - .cloned()?; - let datagram = self.process_connection(&connection, None, now); - connection.borrow_mut().woken(); - if datagram.is_some() { - return datagram; + qtrace!([self], "No packet to send still, run timers"); + while let Some(c) = self.timers.take_next(now) { + if let Some(d) = self.process_connection(&c, None, now) { + return Some(d); } } + None } - pub fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { - if self.wake_at.map_or(false, |c| c <= now) { - self.wake_at = None; + fn next_time(&mut self, now: Instant) -> Option { + if self.waiting.is_empty() { + self.timers.next_time().map(|x| x - now) + } else { + Some(Duration::new(0, 0)) } + } + pub fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { dgram .and_then(|d| self.process_input(d, now)) .or_else(|| self.process_next_output(now)) @@ -673,7 +671,12 @@ impl Server { qtrace!([self], "Send packet: {:?}", d); Output::Datagram(d) }) - .or_else(|| self.wake_at.take().map(|c| Output::Callback(c - now))) + .or_else(|| { + self.next_time(now).map(|delay| { + qtrace!([self], "Wait: {:?}", delay); + Output::Callback(delay) + }) + }) .unwrap_or_else(|| { qtrace!([self], "Go dormant"); Output::None From 5f9c3e7096db0e33c20ab92bea25d2715cbbbb86 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 9 Apr 2024 07:47:49 +0200 Subject: [PATCH 08/19] refactor(client): use process_output and process_multiple_input (#1794) * refactor(client): use process_output and process_multiple_input `neqo_transport::Connection` offers 4 process methods: - `process` - `process_output` - `process_input` - `process_multiple_input` Where `process` is a wrapper around `process_input` and `process_output` calling both in sequence. https://github.com/mozilla/neqo/blob/5dfe106669ccb695187511305c21b8e8a8775e91/neqo-transport/src/connection/mod.rs#L1099-L1107 Where `process_input` delegates to `process_multiple_input`. https://github.com/mozilla/neqo/blob/5dfe106669ccb695187511305c21b8e8a8775e91/neqo-transport/src/connection/mod.rs#L979-L1000 Previously `neqo-client` would use `process` only. Thus continuously interleaving output and input. Say `neqo-client` would have multiple datagrams buffered through a GRO read, it could potentially have to do a write in between each `process` calls, as each call to `process` with an input datagram might return an output datagram to be written. With this commit `neqo-client` uses `process_output` and `process_multiple_input` directly, thus reducing interleaving on batch reads (GRO and in the future recvmmsg) and in the future batch writes (GSO and sendmmsg). By using `process_multiple_input` instead of `process` or `process_input`, auxiliarry logic, like `self.cleanup_closed_streams` only has to run per input datagram batch, and not for each input datagram. Extracted from https://github.com/mozilla/neqo/pull/1741. * process_output before handle * process_ouput after each input batch --- neqo-bin/src/client/http09.rs | 11 ++++- neqo-bin/src/client/http3.rs | 11 ++++- neqo-bin/src/client/mod.rs | 61 ++++++++++++++++------------ neqo-http3/src/connection_client.rs | 7 ++-- neqo-transport/src/connection/mod.rs | 5 +-- 5 files changed, 58 insertions(+), 37 deletions(-) diff --git a/neqo-bin/src/client/http09.rs b/neqo-bin/src/client/http09.rs index b157a6a13f..e0b254f67b 100644 --- a/neqo-bin/src/client/http09.rs +++ b/neqo-bin/src/client/http09.rs @@ -138,8 +138,15 @@ pub(crate) fn create_client( } impl super::Client for Connection { - fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { - self.process(dgram, now) + fn process_output(&mut self, now: Instant) -> Output { + self.process_output(now) + } + + fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant) + where + I: IntoIterator, + { + self.process_multiple_input(dgrams, now); } fn close(&mut self, now: Instant, app_error: neqo_transport::AppError, msg: S) diff --git a/neqo-bin/src/client/http3.rs b/neqo-bin/src/client/http3.rs index d56af5eda9..09a30461bf 100644 --- a/neqo-bin/src/client/http3.rs +++ b/neqo-bin/src/client/http3.rs @@ -119,8 +119,15 @@ impl super::Client for Http3Client { None } - fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { - self.process(dgram, now) + fn process_output(&mut self, now: Instant) -> Output { + self.process_output(now) + } + + fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant) + where + I: IntoIterator, + { + self.process_multiple_input(dgrams, now); } fn close(&mut self, now: Instant, app_error: AppError, msg: S) diff --git a/neqo-bin/src/client/mod.rs b/neqo-bin/src/client/mod.rs index 49c116aa95..791e2a6366 100644 --- a/neqo-bin/src/client/mod.rs +++ b/neqo-bin/src/client/mod.rs @@ -377,7 +377,10 @@ trait Handler { /// Network client, e.g. [`neqo_transport::Connection`] or [`neqo_http3::Http3Client`]. trait Client { - fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output; + fn process_output(&mut self, now: Instant) -> Output; + fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant) + where + I: IntoIterator; fn close(&mut self, now: Instant, app_error: AppError, msg: S) where S: AsRef + Display; @@ -404,21 +407,21 @@ impl<'a, H: Handler> Runner<'a, H> { let handler_done = self.handler.handle(&mut self.client)?; match (handler_done, self.args.resume, self.handler.has_token()) { - // Handler isn't done. Continue. - (false, _, _) => {}, - // Handler done. Resumption token needed but not present. Continue. - (true, true, false) => { - qdebug!("Handler done. Waiting for resumption token."); - } - // Handler is done, no resumption token needed. Close. - (true, false, _) | - // Handler is done, resumption token needed and present. Close. - (true, true, true) => { - self.client.close(Instant::now(), 0, "kthxbye!"); - } + // Handler isn't done. Continue. + (false, _, _) => {}, + // Handler done. Resumption token needed but not present. Continue. + (true, true, false) => { + qdebug!("Handler done. Waiting for resumption token."); + } + // Handler is done, no resumption token needed. Close. + (true, false, _) | + // Handler is done, resumption token needed and present. Close. + (true, true, true) => { + self.client.close(Instant::now(), 0, "kthxbye!"); } + } - self.process(None).await?; + self.process_output().await?; if let Some(reason) = self.client.is_closed() { if self.args.stats { @@ -432,16 +435,7 @@ impl<'a, H: Handler> Runner<'a, H> { } match ready(self.socket, self.timeout.as_mut()).await? { - Ready::Socket => loop { - let dgrams = self.socket.recv(&self.local_addr)?; - if dgrams.is_empty() { - break; - } - for dgram in &dgrams { - self.process(Some(dgram)).await?; - } - self.handler.maybe_key_update(&mut self.client)?; - }, + Ready::Socket => self.process_multiple_input().await?, Ready::Timeout => { self.timeout = None; } @@ -449,9 +443,9 @@ impl<'a, H: Handler> Runner<'a, H> { } } - async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> { + async fn process_output(&mut self) -> Result<(), io::Error> { loop { - match self.client.process(dgram.take(), Instant::now()) { + match self.client.process_output(Instant::now()) { Output::Datagram(dgram) => { self.socket.writable().await?; self.socket.send(dgram)?; @@ -470,6 +464,21 @@ impl<'a, H: Handler> Runner<'a, H> { Ok(()) } + + async fn process_multiple_input(&mut self) -> Res<()> { + loop { + let dgrams = self.socket.recv(&self.local_addr)?; + if dgrams.is_empty() { + break; + } + self.client + .process_multiple_input(dgrams.iter(), Instant::now()); + self.process_output().await?; + self.handler.maybe_key_update(&mut self.client)?; + } + + Ok(()) + } } fn qlog_new(args: &Args, hostname: &str, cid: &ConnectionId) -> Res { diff --git a/neqo-http3/src/connection_client.rs b/neqo-http3/src/connection_client.rs index be20126353..4c8772d14a 100644 --- a/neqo-http3/src/connection_client.rs +++ b/neqo-http3/src/connection_client.rs @@ -880,11 +880,10 @@ impl Http3Client { pub fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant) where I: IntoIterator, - I::IntoIter: ExactSizeIterator, { - let dgrams = dgrams.into_iter(); - qtrace!([self], "Process multiple datagrams, len={}", dgrams.len()); - if dgrams.len() == 0 { + let mut dgrams = dgrams.into_iter().peekable(); + qtrace!([self], "Process multiple datagrams"); + if dgrams.peek().is_none() { return; } self.conn.process_multiple_input(dgrams, now); diff --git a/neqo-transport/src/connection/mod.rs b/neqo-transport/src/connection/mod.rs index 9cddcdac28..e471c29c25 100644 --- a/neqo-transport/src/connection/mod.rs +++ b/neqo-transport/src/connection/mod.rs @@ -985,10 +985,9 @@ impl Connection { pub fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant) where I: IntoIterator, - I::IntoIter: ExactSizeIterator, { - let dgrams = dgrams.into_iter(); - if dgrams.len() == 0 { + let mut dgrams = dgrams.into_iter().peekable(); + if dgrams.peek().is_none() { return; } From 7feb7cb19a95c83ec6531bcd7721679076ddfb17 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 9 Apr 2024 07:49:35 +0200 Subject: [PATCH 09/19] fix(http3): only add to stream_has_pending_data on has_data_to_send (#1793) * fix(http3): only add to stream_has_pending_data on has_data_to_send `::send_data` attempts to send a slice of data down into the QUIC layer, more specifically `neqo_transport::Connection::stream_send_atomic`. While it attempts to send any existing buffered data at the http3 layer first, it does not itself fill the http3 layer buffer, but instead only sends data, if the lower QUIC layer has capacity, i.e. only if it can send the data down to the QUIC layer right away. https://github.com/mozilla/neqo/blob/5dfe106669ccb695187511305c21b8e8a8775e91/neqo-http3/src/send_message.rs#L168-L221 `::send_data` is called via `Http3ServerHandler::send_data`. The wrapper first marks the stream as `stream_has_pending_data`, marks itself as `needs_processing` and then calls down into `::send_data`. https://github.com/mozilla/neqo/blob/5dfe106669ccb695187511305c21b8e8a8775e91/neqo-http3/src/connection_server.rs#L51-L74 Thus the latter always marks the former as `stream_has_pending_data` even though the former never writes into the buffer and thus might actually not have pending data. Why is this problematic? 1. Say that the buffer of the `BufferedStream` of `SendMessage` is empty. 2. Say that the user attempts to write data via `Http3ServerHandler::send_data`. Despite not filling the http3 layer buffer, the stream is marked as `stream_has_pending_data`. 3. Say that next the user calls `Http3Server::process`, which will call `Http3Server::process_http3`, which will call `Http3ServerHandler::process_http3`, which will call `Http3Connection::process_sending`, which will call `Http3Connection::send_non_control_streams`. `Http3Connection::send_non_control_streams` will attempt to flush all http3 layer buffers of streams marked via `stream_has_pending_data`, e.g. the stream from step (2). Thus it will call `::send` (note `send` and not the previous `send_data`). This function will attempt the stream's http3 layer buffer. In the case where the http3 layer stream buffer is empty, it will enqueue a `DataWritable` event for the user. Given that the buffer of our stream is empty (see (1)) such `DataWritable` event is always emitted. https://github.com/mozilla/neqo/blob/5dfe106669ccb695187511305c21b8e8a8775e91/neqo-http3/src/send_message.rs#L236-L264 The user, on receiving the `DataWritable` event will attempt to write to it via `Http3ServerHandler::send_data`, back to step (2), thus closing the busy loop. How to break the loop? This commit adds an additional check to the `stream_has_pending_data` function to ensure it indeed does have pending data. This breaks the above busy loop. In addition, it renames the function to `stream_might_have_pending_data`. * Address review * Revert comment but keep links --- neqo-http3/src/connection.rs | 4 ++-- neqo-http3/src/connection_server.rs | 13 ++++++++----- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/neqo-http3/src/connection.rs b/neqo-http3/src/connection.rs index cfa78df787..dd45797baa 100644 --- a/neqo-http3/src/connection.rs +++ b/neqo-http3/src/connection.rs @@ -386,8 +386,8 @@ impl Http3Connection { Ok(()) } - /// Inform a `HttpConnection` that a stream has data to send and that `send` should be called - /// for the stream. + /// Inform an [`Http3Connection`] that a stream has data to send and that + /// [`SendStream::send`] should be called for the stream. pub fn stream_has_pending_data(&mut self, stream_id: StreamId) { self.streams_with_pending_data.insert(stream_id); } diff --git a/neqo-http3/src/connection_server.rs b/neqo-http3/src/connection_server.rs index dcf759f177..cc887a26fc 100644 --- a/neqo-http3/src/connection_server.rs +++ b/neqo-http3/src/connection_server.rs @@ -64,13 +64,17 @@ impl Http3ServerHandler { data: &[u8], conn: &mut Connection, ) -> Res { - self.base_handler.stream_has_pending_data(stream_id); - self.needs_processing = true; - self.base_handler + let n = self + .base_handler .send_streams .get_mut(&stream_id) .ok_or(Error::InvalidStreamId)? - .send_data(conn, data) + .send_data(conn, data)?; + if n > 0 { + self.base_handler.stream_has_pending_data(stream_id); + } + self.needs_processing = true; + Ok(n) } /// Supply response heeaders for a request. @@ -100,7 +104,6 @@ impl Http3ServerHandler { pub fn stream_close_send(&mut self, stream_id: StreamId, conn: &mut Connection) -> Res<()> { qdebug!([self], "Close sending side stream={}.", stream_id); self.base_handler.stream_close_send(conn, stream_id)?; - self.base_handler.stream_has_pending_data(stream_id); self.needs_processing = true; Ok(()) } From 3f3c5f6f66d1530c3d7e26aa3bc1b5f0618ef016 Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Tue, 9 Apr 2024 11:26:51 +0300 Subject: [PATCH 10/19] test: Add my `test.sh` script (#1802) Because I keep blowing it away when I `git clean`... --- test/test.sh | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100755 test/test.sh diff --git a/test/test.sh b/test/test.sh new file mode 100755 index 0000000000..8d4cccd3d8 --- /dev/null +++ b/test/test.sh @@ -0,0 +1,41 @@ +#! /usr/bin/env bash + +# This script builds the client and server binaries and runs them in a tmux +# session side-by-side. The client connects to the server and the server +# responds with a simple HTTP response. The client and server are run with +# verbose logging and the qlog output is stored in a temporary directory. The +# script also runs tcpdump to capture the packets exchanged between the client +# and server. The script uses tmux to create a split terminal window to display +# the qlog output and the packet capture. + +set -e + +cargo build --bin neqo-client --bin neqo-server + +tmp=$(mktemp -d) +addr=127.0.0.1 +port=4433 +path=/20000 +flags="--verbose --qlog-dir $tmp --use-old-http --alpn hq-interop --quic-version 1" +if [ "$(uname -s)" != "Linux" ]; then + iface=lo0 +else + iface=lo +fi + +client="./target/debug/neqo-client $flags --output-dir $tmp --stats https://$addr:$port$path" +server="SSLKEYLOGFILE=$tmp/test.tlskey ./target/debug/neqo-server $flags $addr:$port" + +tcpdump -U -i "$iface" -w "$tmp/test.pcap" host $addr and port $port >/dev/null 2>&1 & +tcpdump_pid=$! + +tmux -CC \ + set-option -g default-shell "$(which bash)" \; \ + new-session "$client && kill -USR2 $tcpdump_pid && touch $tmp/done" \; \ + split-window -h "$server" \; \ + split-window -v -f "\ + until [ -e $tmp/done ]; do sleep 1; done && \ + tshark -r $tmp/test.pcap -o tls.keylog_file:$tmp/test.tlskey" \; \ + set remain-on-exit on + +# rm -rf "$tmp" From 166b84c5a3307d678f38d9994af9b56b68c6b695 Mon Sep 17 00:00:00 2001 From: Kershaw Date: Tue, 9 Apr 2024 10:49:40 +0200 Subject: [PATCH 11/19] neqo v0.7.4 (#1804) --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 48f3b7d74b..1a7fc1eeb2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ resolver = "2" homepage = "https://github.com/mozilla/neqo/" repository = "https://github.com/mozilla/neqo/" authors = ["The Neqo Authors "] -version = "0.7.3" +version = "0.7.4" # Keep in sync with `.rustfmt.toml` `edition`. edition = "2021" license = "MIT OR Apache-2.0" From 7ff76c7c875daf4fb1a0df25a3499d12f55da5b8 Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Tue, 9 Apr 2024 14:03:47 +0300 Subject: [PATCH 12/19] fix: Correctly track sent `CONNECTION_CLOSE` frames (#1805) * fix: Correctly track sent `CONNECTION_CLOSE` frames And as a side effect, remove `output_close()`. * Add comment * Address code review * Move check --- neqo-transport/src/connection/mod.rs | 97 +++++++++++----------------- 1 file changed, 36 insertions(+), 61 deletions(-) diff --git a/neqo-transport/src/connection/mod.rs b/neqo-transport/src/connection/mod.rs index e471c29c25..8522507a69 100644 --- a/neqo-transport/src/connection/mod.rs +++ b/neqo-transport/src/connection/mod.rs @@ -1826,7 +1826,7 @@ impl Connection { | State::Connected | State::Confirmed => { if let Some(path) = self.paths.select_path() { - let res = self.output_path(&path, now); + let res = self.output_path(&path, now, &None); self.capture_error(Some(path), now, 0, res) } else { Ok(SendOption::default()) @@ -1835,7 +1835,16 @@ impl Connection { State::Closing { .. } | State::Draining { .. } | State::Closed(_) => { if let Some(details) = self.state_signaling.close_frame() { let path = Rc::clone(details.path()); - let res = self.output_close(&details); + // In some error cases, we will not be able to make a new, permanent path. + // For example, if we run out of connection IDs and the error results from + // a packet on a new path, we avoid sending (and the privacy risk) rather + // than reuse a connection ID. + let res = if path.borrow().is_temporary() { + assert!(!cfg!(test), "attempting to close with a temporary path"); + Err(Error::InternalError) + } else { + self.output_path(&path, now, &Some(details)) + }; self.capture_error(Some(path), now, 0, res) } else { Ok(SendOption::default()) @@ -1912,62 +1921,6 @@ impl Connection { } } - fn output_close(&mut self, close: &ClosingFrame) -> Res { - let mut encoder = Encoder::with_capacity(256); - let grease_quic_bit = self.can_grease_quic_bit(); - let version = self.version(); - for space in PacketNumberSpace::iter() { - let Some((cspace, tx)) = self.crypto.states.select_tx_mut(self.version, *space) else { - continue; - }; - - let path = close.path().borrow(); - // In some error cases, we will not be able to make a new, permanent path. - // For example, if we run out of connection IDs and the error results from - // a packet on a new path, we avoid sending (and the privacy risk) rather - // than reuse a connection ID. - if path.is_temporary() { - assert!(!cfg!(test), "attempting to close with a temporary path"); - return Err(Error::InternalError); - } - let (_, mut builder) = Self::build_packet_header( - &path, - cspace, - encoder, - tx, - &AddressValidationInfo::None, - version, - grease_quic_bit, - ); - _ = Self::add_packet_number( - &mut builder, - tx, - self.loss_recovery.largest_acknowledged_pn(*space), - ); - // The builder will set the limit to 0 if there isn't enough space for the header. - if builder.is_full() { - encoder = builder.abort(); - break; - } - builder.set_limit(min(path.amplification_limit(), path.mtu()) - tx.expansion()); - debug_assert!(builder.limit() <= 2048); - - // ConnectionError::Application is only allowed at 1RTT. - let sanitized = if *space == PacketNumberSpace::ApplicationData { - None - } else { - close.sanitize() - }; - sanitized - .as_ref() - .unwrap_or(close) - .write_frame(&mut builder); - encoder = builder.build(tx)?; - } - - Ok(SendOption::Yes(close.path().borrow().datagram(encoder))) - } - /// Write the frames that are exchanged in the application data space. /// The order of calls here determines the relative priority of frames. fn write_appdata_frames( @@ -2188,7 +2141,12 @@ impl Connection { /// Build a datagram, possibly from multiple packets (for different PN /// spaces) and each containing 1+ frames. #[allow(clippy::too_many_lines)] // Yeah, that's just the way it is. - fn output_path(&mut self, path: &PathRef, now: Instant) -> Res { + fn output_path( + &mut self, + path: &PathRef, + now: Instant, + closing_frame: &Option, + ) -> Res { let mut initial_sent = None; let mut needs_padding = false; let grease_quic_bit = self.can_grease_quic_bit(); @@ -2241,8 +2199,23 @@ impl Connection { // Add frames to the packet. let payload_start = builder.len(); - let (tokens, ack_eliciting, padded) = - self.write_frames(path, *space, &profile, &mut builder, now); + let (mut tokens, mut ack_eliciting, mut padded) = (Vec::new(), false, false); + if let Some(ref close) = closing_frame { + // ConnectionError::Application is only allowed at 1RTT. + let sanitized = if *space == PacketNumberSpace::ApplicationData { + None + } else { + close.sanitize() + }; + sanitized + .as_ref() + .unwrap_or(close) + .write_frame(&mut builder); + self.stats.borrow_mut().frame_tx.connection_close += 1; + } else { + (tokens, ack_eliciting, padded) = + self.write_frames(path, *space, &profile, &mut builder, now); + } if builder.packet_empty() { // Nothing to include in this packet. encoder = builder.abort(); @@ -2323,6 +2296,8 @@ impl Connection { mtu ); initial.size += mtu - packets.len(); + // These zeros aren't padding frames, they are an invalid all-zero coalesced + // packet, which is why we don't increase `frame_tx.padding` count here. packets.resize(mtu, 0); } self.loss_recovery.on_packet_sent(path, initial); From c6c60ba5cc3f1f536fee6ceed647396bea1da7cc Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Tue, 9 Apr 2024 16:34:25 +0300 Subject: [PATCH 13/19] ci: Delete QNS PR comment on successful run (#1807) * ci: Always add or update the QNS PR comment Because we only add one when there is a failure, and when things then later get fixed we don't update it again (because we skip the update on success). So the outdated, failed state keeps being shown. * Delete comment on success * Update .github/actions/pr-comment/action.yml Co-authored-by: Max Inden Signed-off-by: Lars Eggert * Address code review --------- Signed-off-by: Lars Eggert Co-authored-by: Max Inden --- .github/actions/pr-comment/action.yml | 4 ++++ .github/actions/quic-interop-runner/action.yml | 7 ++----- .github/workflows/qns-comment.yml | 4 ++-- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/.github/actions/pr-comment/action.yml b/.github/actions/pr-comment/action.yml index b7f9bb12da..1e84aa5bb4 100644 --- a/.github/actions/pr-comment/action.yml +++ b/.github/actions/pr-comment/action.yml @@ -5,6 +5,9 @@ inputs: name: description: 'Artifact name to import comment data from.' required: true + mode: + description: 'Mode of operation (upsert/recreate/delete).' + default: 'upsert' token: description: 'A Github PAT' required: true @@ -29,5 +32,6 @@ runs: - uses: thollander/actions-comment-pull-request@v2 with: filePath: contents + mode: ${{ inputs.mode }} pr_number: ${{ steps.pr-number.outputs.number }} comment_tag: ${{ inputs.name }}-comment diff --git a/.github/actions/quic-interop-runner/action.yml b/.github/actions/quic-interop-runner/action.yml index 7fd055f184..cdc617d275 100644 --- a/.github/actions/quic-interop-runner/action.yml +++ b/.github/actions/quic-interop-runner/action.yml @@ -91,12 +91,9 @@ runs: - name: Format GitHub comment if: always() run: | - if [ -s quic-interop-runner/summary ]; then - exit 0 - fi echo '[**QUIC Interop Runner**](https://github.com/quic-interop/quic-interop-runner)' >> comment echo '' >> comment - # Ignore all, but table, which starts with "|". + # Ignore all, but table, which starts with "|". Also reformat it to GitHub Markdown. grep -E '^\|' quic-interop-runner/summary |\ awk '(!/^\| *:-/ || (d++ && d < 3))' |\ sed -E -e 's/✓/:white_check_mark:/gi' -e 's/✕/:x:/gi' -e 's/\?/:grey_question:/gi' \ @@ -106,7 +103,7 @@ runs: shell: bash - name: Export PR comment data - if: env.EXPORT_COMMENT == '1' + if: always() uses: ./.github/actions/pr-comment-data-export with: name: qns diff --git a/.github/workflows/qns-comment.yml b/.github/workflows/qns-comment.yml index 71cbcc805b..db9f74f7bf 100644 --- a/.github/workflows/qns-comment.yml +++ b/.github/workflows/qns-comment.yml @@ -18,11 +18,11 @@ jobs: pull-requests: write runs-on: ubuntu-latest if: | - github.event.workflow_run.event == 'pull_request' && - github.event.workflow_run.conclusion == 'failure' + github.event.workflow_run.event == 'pull_request' steps: - uses: actions/checkout@v4 - uses: ./.github/actions/pr-comment with: name: qns + mode: ${{ github.event.workflow_run.conclusion == 'success' && 'delete' || 'upsert' }} token: ${{ secrets.GITHUB_TOKEN }} From 98f7b26c658a9af2eec5a90aae25fa62237a7e3b Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Tue, 9 Apr 2024 16:37:11 +0300 Subject: [PATCH 14/19] ci: Disable `ossf/scorecard-action` until it's allowlisted (#1809) --- .github/workflows/scorecard.yml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/scorecard.yml b/.github/workflows/scorecard.yml index 2c0b04d09e..34bad4d875 100644 --- a/.github/workflows/scorecard.yml +++ b/.github/workflows/scorecard.yml @@ -6,13 +6,13 @@ name: Scorecard supply-chain security on: # For Branch-Protection check. Only the default branch is supported. See # https://github.com/ossf/scorecard/blob/main/docs/checks.md#branch-protection - branch_protection_rule: + # branch_protection_rule: # To guarantee Maintained check is occasionally updated. See # https://github.com/ossf/scorecard/blob/main/docs/checks.md#maintained - schedule: - - cron: '26 8 * * 6' - push: - branches: [ "main" ] + # schedule: + # - cron: '26 8 * * 6' + # push: + # branches: [ "main" ] # Declare default permissions as read only. permissions: read-all From 087e095c6d2fb5e6f52bae4bc4dd9c91beff3655 Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Tue, 9 Apr 2024 16:46:20 +0300 Subject: [PATCH 15/19] `on:` can't be empty Signed-off-by: Lars Eggert --- .github/workflows/scorecard.yml | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/.github/workflows/scorecard.yml b/.github/workflows/scorecard.yml index 34bad4d875..651a30be01 100644 --- a/.github/workflows/scorecard.yml +++ b/.github/workflows/scorecard.yml @@ -4,15 +4,16 @@ name: Scorecard supply-chain security on: - # For Branch-Protection check. Only the default branch is supported. See - # https://github.com/ossf/scorecard/blob/main/docs/checks.md#branch-protection - # branch_protection_rule: - # To guarantee Maintained check is occasionally updated. See - # https://github.com/ossf/scorecard/blob/main/docs/checks.md#maintained - # schedule: - # - cron: '26 8 * * 6' - # push: - # branches: [ "main" ] + workflow_dispatch: +# # For Branch-Protection check. Only the default branch is supported. See +# # https://github.com/ossf/scorecard/blob/main/docs/checks.md#branch-protection +# branch_protection_rule: +# # To guarantee Maintained check is occasionally updated. See +# # https://github.com/ossf/scorecard/blob/main/docs/checks.md#maintained +# schedule: +# - cron: '26 8 * * 6' +# push: +# branches: [ "main" ] # Declare default permissions as read only. permissions: read-all From 1cc44f0920d53357aa528a7da6ed1ae1aae281f4 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 9 Apr 2024 15:33:01 +0200 Subject: [PATCH 16/19] fix(server): log error when failing to read file (#1788) * fix(server): log error when failing to read file As part of the QUIC Interop testcases, a server needs to be able to respond to a request with the content of a local file. Previously, when failing to read a file, the server would simply reply with the predefined `SELF::MESSAGE`. Now the server logs the error and moves on. * Send 404 --- neqo-bin/src/server/mod.rs | 75 +++++++++++++------------------- neqo-bin/src/server/old_https.rs | 33 ++++++++------ 2 files changed, 50 insertions(+), 58 deletions(-) diff --git a/neqo-bin/src/server/mod.rs b/neqo-bin/src/server/mod.rs index e3067ecdf0..3490b3e9b3 100644 --- a/neqo-bin/src/server/mod.rs +++ b/neqo-bin/src/server/mod.rs @@ -10,8 +10,7 @@ use std::{ cmp::min, collections::HashMap, fmt::{self, Display}, - fs::OpenOptions, - io::{self, Read}, + fs, io, net::{SocketAddr, ToSocketAddrs}, path::PathBuf, pin::Pin, @@ -188,28 +187,11 @@ impl Args { } } -fn qns_read_response(filename: &str) -> Option> { - let mut file_path = PathBuf::from("/www"); - file_path.push(filename.trim_matches(|p| p == '/')); - - OpenOptions::new() - .read(true) - .open(&file_path) - .map_err(|_e| qerror!("Could not open {}", file_path.display())) - .ok() - .and_then(|mut f| { - let mut data = Vec::new(); - match f.read_to_end(&mut data) { - Ok(sz) => { - qinfo!("{} bytes read from {}", sz, file_path.display()); - Some(data) - } - Err(e) => { - qerror!("Error reading data: {e:?}"); - None - } - } - }) +fn qns_read_response(filename: &str) -> Result, io::Error> { + let path: PathBuf = ["/www", filename.trim_matches(|p| p == '/')] + .iter() + .collect(); + fs::read(path) } trait HttpServer: Display { @@ -344,27 +326,32 @@ impl HttpServer for SimpleServer { continue; } - let mut response = - if let Some(path) = headers.iter().find(|&h| h.name() == ":path") { - if args.shared.qns_test.is_some() { - if let Some(data) = qns_read_response(path.value()) { - ResponseData::from(data) - } else { - ResponseData::from(Self::MESSAGE) - } - } else if let Ok(count) = - path.value().trim_matches(|p| p == '/').parse::() - { - ResponseData::repeat(Self::MESSAGE, count) - } else { - ResponseData::from(Self::MESSAGE) + let Some(path) = headers.iter().find(|&h| h.name() == ":path") else { + stream + .cancel_fetch(neqo_http3::Error::HttpRequestIncomplete.code()) + .unwrap(); + continue; + }; + + let mut response = if args.shared.qns_test.is_some() { + match qns_read_response(path.value()) { + Ok(data) => ResponseData::from(data), + Err(e) => { + qerror!("Failed to read {}: {e}", path.value()); + stream + .send_headers(&[Header::new(":status", "404")]) + .unwrap(); + stream.stream_close_send().unwrap(); + continue; } - } else { - stream - .cancel_fetch(neqo_http3::Error::HttpRequestIncomplete.code()) - .unwrap(); - continue; - }; + } + } else if let Ok(count) = + path.value().trim_matches(|p| p == '/').parse::() + { + ResponseData::repeat(Self::MESSAGE, count) + } else { + ResponseData::from(Self::MESSAGE) + }; stream .send_headers(&[ diff --git a/neqo-bin/src/server/old_https.rs b/neqo-bin/src/server/old_https.rs index 505a16578f..38f3fdc3a7 100644 --- a/neqo-bin/src/server/old_https.rs +++ b/neqo-bin/src/server/old_https.rs @@ -8,7 +8,7 @@ use std::{ cell::RefCell, collections::HashMap, fmt::Display, path::PathBuf, rc::Rc, time::Instant, }; -use neqo_common::{event::Provider, hex, qdebug, qinfo, qwarn, Datagram}; +use neqo_common::{event::Provider, hex, qdebug, qerror, qinfo, qwarn, Datagram}; use neqo_crypto::{generate_ech_keys, random, AllowZeroRtt, AntiReplay, Cipher}; use neqo_http3::Error; use neqo_transport::{ @@ -142,20 +142,25 @@ impl Http09Server { Regex::new(r"GET +/(\d+)(?:\r)?\n").unwrap() }; let m = re.captures(msg); - let resp = match m.and_then(|m| m.get(1)) { - None => { - self.save_partial(stream_id, buf, conn); - return; - } - Some(path) => { - let path = path.as_str(); - qdebug!("Path = '{path}'"); - if args.shared.qns_test.is_some() { - qns_read_response(path) - } else { - let count = path.parse().unwrap(); - Some(vec![b'a'; count]) + let Some(path) = m.and_then(|m| m.get(1)) else { + self.save_partial(stream_id, buf, conn); + return; + }; + + let resp = { + let path = path.as_str(); + qdebug!("Path = '{path}'"); + if args.shared.qns_test.is_some() { + match qns_read_response(path) { + Ok(data) => Some(data), + Err(e) => { + qerror!("Failed to read {path}: {e}"); + Some(b"404".to_vec()) + } } + } else { + let count = path.parse().unwrap(); + Some(vec![b'a'; count]) } }; self.write(stream_id, resp, conn); From 329af2fdc07a153d1335e08f748b1560ce25dc19 Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Tue, 9 Apr 2024 17:08:06 +0300 Subject: [PATCH 17/19] test: Clean up tmp directory on script exit (#1810) --- test/test.sh | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/test/test.sh b/test/test.sh index 8d4cccd3d8..dc02b2161c 100755 --- a/test/test.sh +++ b/test/test.sh @@ -9,10 +9,11 @@ # the qlog output and the packet capture. set -e +tmp=$(mktemp -d) +trap 'rm -rf "$tmp"' EXIT cargo build --bin neqo-client --bin neqo-server -tmp=$(mktemp -d) addr=127.0.0.1 port=4433 path=/20000 @@ -37,5 +38,3 @@ tmux -CC \ until [ -e $tmp/done ]; do sleep 1; done && \ tshark -r $tmp/test.pcap -o tls.keylog_file:$tmp/test.tlskey" \; \ set remain-on-exit on - -# rm -rf "$tmp" From c44e53647d922359ef859d18d86dd9eb7fe4891c Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 10 Apr 2024 07:46:42 +0200 Subject: [PATCH 18/19] refactor(client): simplify keyupdate testcase implementation (#1808) The QUIC Interop Runner `keyupdate` testcase has a client establish a connection to the server and then trigger a key update. https://github.com/quic-interop/quic-interop-runner/blob/2a2534a1284d50d99ff92884d4f1ecf98fb41e4c/testcases.py#L889 This testcase always uses the `http09` client and server implementation. This commit simplifies the testcase implementation: - Given that it is only used with `http09`, move it to `http09.rs`. - Reduce the `KeyUpdateState` `struct` to a single `bool`. - Mark the `--key-update` command line argument as hidden, given that it is only set indirectly through the `-t keyupdate` flag. - Try to run `client.initiate_key_update` on events only, not on ever new received datagram. In addition it enables the `keyupdate` test on the Neqo `qns.yml` CI workflow. --- .github/workflows/qns.yml | 2 +- neqo-bin/src/client/http09.rs | 31 ++++++++++++---------- neqo-bin/src/client/http3.rs | 16 ++---------- neqo-bin/src/client/mod.rs | 48 ++++++----------------------------- 4 files changed, 28 insertions(+), 69 deletions(-) diff --git a/.github/workflows/qns.yml b/.github/workflows/qns.yml index caadb022df..17cd584a26 100644 --- a/.github/workflows/qns.yml +++ b/.github/workflows/qns.yml @@ -71,6 +71,6 @@ jobs: name: 'neqo-latest' image: ${{ steps.docker_build_and_push.outputs.imageID }} url: https://github.com/mozilla/neqo - test: handshake + test: handshake,keyupdate client: neqo-latest,quic-go,ngtcp2,neqo,msquic server: neqo-latest,quic-go,ngtcp2,neqo,msquic diff --git a/neqo-bin/src/client/http09.rs b/neqo-bin/src/client/http09.rs index e0b254f67b..a9ed12b157 100644 --- a/neqo-bin/src/client/http09.rs +++ b/neqo-bin/src/client/http09.rs @@ -25,7 +25,7 @@ use neqo_transport::{ }; use url::Url; -use super::{get_output_file, qlog_new, Args, KeyUpdateState, Res}; +use super::{get_output_file, qlog_new, Args, Res}; pub struct Handler<'a> { streams: HashMap>>, @@ -33,7 +33,7 @@ pub struct Handler<'a> { all_paths: Vec, args: &'a Args, token: Option, - key_update: KeyUpdateState, + needs_key_update: bool, } impl<'a> super::Handler for Handler<'a> { @@ -41,6 +41,18 @@ impl<'a> super::Handler for Handler<'a> { fn handle(&mut self, client: &mut Self::Client) -> Res { while let Some(event) = client.next_event() { + if self.needs_key_update { + match client.initiate_key_update() { + Ok(()) => { + qdebug!("Keys updated"); + self.needs_key_update = false; + self.download_urls(client); + } + Err(neqo_transport::Error::KeyUpdateBlocked) => (), + Err(e) => return Err(e.into()), + } + } + match event { ConnectionEvent::AuthenticationNeeded => { client.authenticated(AuthenticationStatus::Ok, Instant::now()); @@ -66,9 +78,6 @@ impl<'a> super::Handler for Handler<'a> { qdebug!("{event:?}"); self.download_urls(client); } - ConnectionEvent::StateChange(State::Confirmed) => { - self.maybe_key_update(client)?; - } ConnectionEvent::ResumptionToken(token) => { self.token = Some(token); } @@ -86,12 +95,6 @@ impl<'a> super::Handler for Handler<'a> { Ok(false) } - fn maybe_key_update(&mut self, c: &mut Self::Client) -> Res<()> { - self.key_update.maybe_update(|| c.initiate_key_update())?; - self.download_urls(c); - Ok(()) - } - fn take_token(&mut self) -> Option { self.token.take() } @@ -169,14 +172,14 @@ impl super::Client for Connection { } impl<'b> Handler<'b> { - pub fn new(url_queue: VecDeque, args: &'b Args, key_update: KeyUpdateState) -> Self { + pub fn new(url_queue: VecDeque, args: &'b Args) -> Self { Self { streams: HashMap::new(), url_queue, all_paths: Vec::new(), args, token: None, - key_update, + needs_key_update: args.key_update, } } @@ -195,7 +198,7 @@ impl<'b> Handler<'b> { } fn download_next(&mut self, client: &mut Connection) -> bool { - if self.key_update.needed() { + if self.needs_key_update { qdebug!("Deferring requests until after first key update"); return false; } diff --git a/neqo-bin/src/client/http3.rs b/neqo-bin/src/client/http3.rs index 09a30461bf..b3f577127e 100644 --- a/neqo-bin/src/client/http3.rs +++ b/neqo-bin/src/client/http3.rs @@ -27,7 +27,7 @@ use neqo_transport::{ }; use url::Url; -use super::{get_output_file, qlog_new, Args, KeyUpdateState, Res}; +use super::{get_output_file, qlog_new, Args, Res}; pub(crate) struct Handler<'a> { #[allow( @@ -36,17 +36,12 @@ pub(crate) struct Handler<'a> { clippy::redundant_field_names )] url_handler: UrlHandler<'a>, - key_update: KeyUpdateState, token: Option, output_read_data: bool, } impl<'a> Handler<'a> { - pub(crate) fn new( - url_queue: VecDeque, - args: &'a Args, - key_update: KeyUpdateState, - ) -> Self { + pub(crate) fn new(url_queue: VecDeque, args: &'a Args) -> Self { let url_handler = UrlHandler { url_queue, stream_handlers: HashMap::new(), @@ -61,7 +56,6 @@ impl<'a> Handler<'a> { Self { url_handler, - key_update, token: None, output_read_data: args.output_read_data, } @@ -225,12 +219,6 @@ impl<'a> super::Handler for Handler<'a> { Ok(self.url_handler.done()) } - fn maybe_key_update(&mut self, c: &mut Http3Client) -> Res<()> { - self.key_update.maybe_update(|| c.initiate_key_update())?; - self.url_handler.process_urls(c); - Ok(()) - } - fn take_token(&mut self) -> Option { self.token.take() } diff --git a/neqo-bin/src/client/mod.rs b/neqo-bin/src/client/mod.rs index 791e2a6366..61e43c00d1 100644 --- a/neqo-bin/src/client/mod.rs +++ b/neqo-bin/src/client/mod.rs @@ -100,39 +100,6 @@ impl std::error::Error for Error {} type Res = Result; -/// Track whether a key update is needed. -#[derive(Debug, PartialEq, Eq)] -struct KeyUpdateState(bool); - -impl KeyUpdateState { - pub fn maybe_update(&mut self, update_fn: F) -> Res<()> - where - F: FnOnce() -> Result<(), E>, - E: Into, - { - if self.0 { - if let Err(e) = update_fn() { - let e = e.into(); - match e { - Error::TransportError(TransportError::KeyUpdateBlocked) - | Error::Http3Error(neqo_http3::Error::TransportError( - TransportError::KeyUpdateBlocked, - )) => (), - _ => return Err(e), - } - } else { - qerror!("Keys updated"); - self.0 = false; - } - } - Ok(()) - } - - fn needed(&self) -> bool { - self.0 - } -} - #[derive(Debug, Parser)] #[command(author, version, about, long_about = None)] #[allow(clippy::struct_excessive_bools)] // Not a good use of that lint. @@ -176,7 +143,7 @@ pub struct Args { /// Use this for 0-RTT: the stack always attempts 0-RTT on resumption. resume: bool, - #[arg(name = "key-update", long)] + #[arg(name = "key-update", long, hide = true)] /// Attempt to initiate a key update immediately after confirming the connection. key_update: bool, @@ -255,6 +222,11 @@ impl Args { return; }; + if self.key_update { + qerror!("internal option key_update set by user"); + exit(127) + } + // Only use v1 for most QNS tests. self.shared.quic_parameters.quic_version = vec![Version::Version1]; match testcase.as_str() { @@ -370,7 +342,6 @@ trait Handler { type Client: Client; fn handle(&mut self, client: &mut Self::Client) -> Res; - fn maybe_key_update(&mut self, c: &mut Self::Client) -> Res<()>; fn take_token(&mut self) -> Option; fn has_token(&self) -> bool; } @@ -474,7 +445,6 @@ impl<'a, H: Handler> Runner<'a, H> { self.client .process_multiple_input(dgrams.iter(), Instant::now()); self.process_output().await?; - self.handler.maybe_key_update(&mut self.client)?; } Ok(()) @@ -578,14 +548,12 @@ pub async fn client(mut args: Args) -> Res<()> { first = false; - let key_update = KeyUpdateState(args.key_update); - token = if args.shared.use_old_http { let client = http09::create_client(&args, real_local, remote_addr, &hostname, token) .expect("failed to create client"); - let handler = http09::Handler::new(to_request, &args, key_update); + let handler = http09::Handler::new(to_request, &args); Runner { args: &args, @@ -601,7 +569,7 @@ pub async fn client(mut args: Args) -> Res<()> { let client = http3::create_client(&args, real_local, remote_addr, &hostname, token) .expect("failed to create client"); - let handler = http3::Handler::new(to_request, &args, key_update); + let handler = http3::Handler::new(to_request, &args); Runner { args: &args, From b3cf65f39c179cbbfe4de6cf6d2e4cbad2558c7a Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Thu, 11 Apr 2024 08:12:48 +0300 Subject: [PATCH 19/19] Merge pull request from GHSA-hvhj-4r52-8568 --- neqo-transport/src/packet/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/neqo-transport/src/packet/mod.rs b/neqo-transport/src/packet/mod.rs index f5e8320ccb..f1112e90fa 100644 --- a/neqo-transport/src/packet/mod.rs +++ b/neqo-transport/src/packet/mod.rs @@ -555,7 +555,10 @@ impl<'a> PublicPacket<'a> { if packet_type == PacketType::Retry { let header_len = decoder.offset(); let expansion = retry::expansion(version); - let token = Self::opt(decoder.decode(decoder.remaining() - expansion))?; + let token = decoder + .remaining() + .checked_sub(expansion) + .map_or(Err(Error::InvalidPacket), |v| Self::opt(decoder.decode(v)))?; if token.is_empty() { return Err(Error::InvalidPacket); }