Skip to content

Commit

Permalink
refactor(client,server): use quinn_udp for I/O (#1604)
Browse files Browse the repository at this point in the history
* Initial commit

* refactor(server): replace mio with tokio

* Move ready logic into fn

* Extend expect docs

* Restrict tokio features

* Only process datagram once

* Remove superfluous pub

* fmt

* Fix send path

* Fix receive path

* Instantiate socket state once

* Fix busy loop

* Have neqo-client use quinn-udp

* Add TODO

* Await writable

* Unify tx and rx

* Introduce wrapper type Socket

* Move bind to common

* Check if datagram was sent as a whole and avoid allocation

* Make into_data pub(crate)

* Refactor send

* Reference issue

* Remove debugs

* Fix test

* Reduce diff

* Reduce diff

* Pin quinn-udp to rev

* Address clippy lints

* fmt

* fmt

* clippy

* clippy

* Pass None ttl

Not yet supported by quinn-udp.

* Debug race condition on Windows

* Debug windows failure

* Have receiver use random port

* Test with Ect1 instead of Ce

Windows does not allow setting Ce:

> Your application isn't allowed to specify the Congestion Encountered (CE) code point when sending datagrams. The send will return with error WSAEINVAL.

https://learn.microsoft.com/en-us/windows/win32/winsock/winsock-ecn

* Revert "Debug windows failure"

This reverts commit e9ac36c.

* Revert "Debug race condition on Windows"

This reverts commit 6f330d3.

* Fold tos_tx

* Add reason to clippy lint ignore

* fix: include quinn-udp IPv4-mapped IPv6 patch

quinn-rs/quinn#1765

---------

Co-authored-by: Lars Eggert <lars@eggert.org>
  • Loading branch information
mxinden and larseggert authored Feb 26, 2024
1 parent 2ac86b4 commit fa8ce91
Show file tree
Hide file tree
Showing 9 changed files with 233 additions and 146 deletions.
2 changes: 1 addition & 1 deletion neqo-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ clap = { version = "4.4", features = ["derive"] }
futures = "0.3"
hex = "0.4"
log = { version = "0.4", default-features = false }
neqo-common = { path = "./../neqo-common" }
neqo-common = { path = "./../neqo-common", features = ["udp"] }
neqo-crypto = { path = "./../neqo-crypto" }
neqo-http3 = { path = "./../neqo-http3" }
neqo-qpack = { path = "./../neqo-qpack" }
Expand Down
95 changes: 17 additions & 78 deletions neqo-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ use std::{
};

use clap::Parser;
use common::IpTos;
use futures::{
future::{select, Either},
FutureExt, TryFutureExt,
};
use neqo_common::{
self as common, event::Provider, hex, qdebug, qinfo, qlog::NeqoQlog, Datagram, Role,
self as common, event::Provider, hex, qdebug, qinfo, qlog::NeqoQlog, udp, Datagram, Role,
};
use neqo_crypto::{
constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256},
Expand All @@ -42,7 +41,7 @@ use neqo_transport::{
EmptyConnectionIdGenerator, Error as TransportError, StreamId, StreamType, Version,
};
use qlog::{events::EventImportance, streamer::QlogStreamer};
use tokio::{net::UdpSocket, time::Sleep};
use tokio::time::Sleep;
use url::{Origin, Url};

#[derive(Debug)]
Expand Down Expand Up @@ -351,21 +350,6 @@ impl QuicParameters {
}
}

async fn emit_datagram(socket: &UdpSocket, out_dgram: Datagram) -> Result<(), io::Error> {
let sent = match socket.send_to(&out_dgram, &out_dgram.destination()).await {
Ok(res) => res,
Err(ref err) if err.kind() != io::ErrorKind::WouldBlock => {
eprintln!("UDP send error: {err:?}");
0
}
Err(e) => return Err(e),
};
if sent != out_dgram.len() {
eprintln!("Unable to send all {} bytes of datagram", out_dgram.len());
}
Ok(())
}

fn get_output_file(
url: &Url,
output_dir: &Option<PathBuf>,
Expand Down Expand Up @@ -415,7 +399,7 @@ enum Ready {

// Wait for the socket to be readable or the timeout to fire.
async fn ready(
socket: &UdpSocket,
socket: &udp::Socket,
mut timeout: Option<&mut Pin<Box<Sleep>>>,
) -> Result<Ready, io::Error> {
let socket_ready = Box::pin(socket.readable()).map_ok(|()| Ready::Socket);
Expand All @@ -426,43 +410,6 @@ async fn ready(
select(socket_ready, timeout_ready).await.factor_first().0
}

fn read_dgram(
socket: &UdpSocket,
local_address: &SocketAddr,
) -> Result<Option<Datagram>, io::Error> {
let buf = &mut [0u8; 2048];
let (sz, remote_addr) = match socket.try_recv_from(&mut buf[..]) {
Err(ref err)
if err.kind() == io::ErrorKind::WouldBlock
|| err.kind() == io::ErrorKind::Interrupted =>
{
return Ok(None)
}
Err(err) => {
eprintln!("UDP recv error: {err:?}");
return Err(err);
}
Ok(res) => res,
};

if sz == buf.len() {
eprintln!("Might have received more than {} bytes", buf.len());
}

if sz == 0 {
eprintln!("zero length datagram received?");
Ok(None)
} else {
Ok(Some(Datagram::new(
remote_addr,
*local_address,
IpTos::default(),
None,
&buf[..sz],
)))
}
}

trait StreamHandler {
fn process_header_ready(&mut self, stream_id: StreamId, fin: bool, headers: Vec<Header>);
fn process_data_readable(
Expand Down Expand Up @@ -817,7 +764,7 @@ fn to_headers(values: &[impl AsRef<str>]) -> Vec<Header> {

struct ClientRunner<'a> {
local_addr: SocketAddr,
socket: &'a UdpSocket,
socket: &'a udp::Socket,
client: Http3Client,
handler: Handler<'a>,
timeout: Option<Pin<Box<Sleep>>>,
Expand All @@ -827,7 +774,7 @@ struct ClientRunner<'a> {
impl<'a> ClientRunner<'a> {
fn new(
args: &'a mut Args,
socket: &'a UdpSocket,
socket: &'a udp::Socket,
local_addr: SocketAddr,
remote_addr: SocketAddr,
hostname: &str,
Expand Down Expand Up @@ -880,7 +827,7 @@ impl<'a> ClientRunner<'a> {

match ready(self.socket, self.timeout.as_mut()).await? {
Ready::Socket => loop {
let dgram = read_dgram(self.socket, &self.local_addr)?;
let dgram = self.socket.recv(&self.local_addr)?;
if dgram.is_none() {
break;
}
Expand Down Expand Up @@ -915,7 +862,8 @@ impl<'a> ClientRunner<'a> {
loop {
match self.client.process(dgram.take(), Instant::now()) {
Output::Datagram(dgram) => {
emit_datagram(self.socket, dgram).await?;
self.socket.writable().await?;
self.socket.send(dgram)?;
}
Output::Callback(new_timeout) => {
qinfo!("Setting timeout of {:?}", new_timeout);
Expand Down Expand Up @@ -1051,16 +999,7 @@ async fn main() -> Res<()> {
SocketAddr::V6(..) => SocketAddr::new(IpAddr::V6(Ipv6Addr::from([0; 16])), 0),
};

let socket = match std::net::UdpSocket::bind(local_addr) {
Err(e) => {
eprintln!("Unable to bind UDP socket: {e}");
exit(1)
}
Ok(s) => s,
};
socket.set_nonblocking(true)?;
let socket = UdpSocket::from_std(socket)?;

let socket = udp::Socket::bind(local_addr)?;
let real_local = socket.local_addr().unwrap();
println!(
"{} Client connecting: {:?} -> {:?}",
Expand Down Expand Up @@ -1125,17 +1064,16 @@ mod old {
time::Instant,
};

use neqo_common::{event::Provider, qdebug, qinfo, Datagram};
use neqo_common::{event::Provider, qdebug, qinfo, udp, Datagram};
use neqo_crypto::{AuthenticationStatus, ResumptionToken};
use neqo_transport::{
Connection, ConnectionEvent, EmptyConnectionIdGenerator, Error, Output, State, StreamId,
StreamType,
};
use tokio::{net::UdpSocket, time::Sleep};
use tokio::time::Sleep;
use url::Url;

use super::{get_output_file, qlog_new, read_dgram, ready, Args, KeyUpdateState, Ready, Res};
use crate::emit_datagram;
use super::{get_output_file, qlog_new, ready, Args, KeyUpdateState, Ready, Res};

struct HandlerOld<'b> {
streams: HashMap<StreamId, Option<File>>,
Expand Down Expand Up @@ -1321,7 +1259,7 @@ mod old {

pub struct ClientRunner<'a> {
local_addr: SocketAddr,
socket: &'a UdpSocket,
socket: &'a udp::Socket,
client: Connection,
handler: HandlerOld<'a>,
timeout: Option<Pin<Box<Sleep>>>,
Expand All @@ -1331,7 +1269,7 @@ mod old {
impl<'a> ClientRunner<'a> {
pub fn new(
args: &'a Args,
socket: &'a UdpSocket,
socket: &'a udp::Socket,
local_addr: SocketAddr,
remote_addr: SocketAddr,
origin: &str,
Expand Down Expand Up @@ -1394,7 +1332,7 @@ mod old {

match ready(self.socket, self.timeout.as_mut()).await? {
Ready::Socket => loop {
let dgram = read_dgram(self.socket, &self.local_addr)?;
let dgram = self.socket.recv(&self.local_addr)?;
if dgram.is_none() {
break;
}
Expand Down Expand Up @@ -1430,7 +1368,8 @@ mod old {
loop {
match self.client.process(dgram.take(), Instant::now()) {
Output::Datagram(dgram) => {
emit_datagram(self.socket, dgram).await?;
self.socket.writable().await?;
self.socket.send(dgram)?;
}
Output::Callback(new_timeout) => {
qinfo!("Setting timeout of {:?}", new_timeout);
Expand Down
3 changes: 3 additions & 0 deletions neqo-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@ enum-map = "2.7"
env_logger = { version = "0.10", default-features = false }
log = { version = "0.4", default-features = false }
qlog = "0.12"
quinn-udp = { git = "https://github.com/quinn-rs/quinn/", rev = "a947962131aba8a6521253d03cc948b20098a2d6", optional = true }
time = { version = "0.3", features = ["formatting"] }
tokio = { version = "1", features = ["net", "time", "macros", "rt", "rt-multi-thread"], optional = true }

[dev-dependencies]
test-fixture = { path = "../test-fixture" }

[features]
ci = []
udp = ["dep:quinn-udp", "dep:tokio"]

[target."cfg(windows)".dependencies.winapi]
version = "0.3"
Expand Down
5 changes: 5 additions & 0 deletions neqo-common/src/datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ impl Datagram {
pub fn ttl(&self) -> Option<u8> {
self.ttl
}

#[must_use]
pub(crate) fn into_data(self) -> Vec<u8> {
self.d
}
}

impl Deref for Datagram {
Expand Down
2 changes: 2 additions & 0 deletions neqo-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub mod log;
pub mod qlog;
pub mod timer;
pub mod tos;
#[cfg(feature = "udp")]
pub mod udp;

use std::fmt::Write;

Expand Down
35 changes: 35 additions & 0 deletions neqo-common/src/tos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ impl From<u8> for IpTosEcn {
}
}

impl From<IpTos> for IpTosEcn {
fn from(value: IpTos) -> Self {
IpTosEcn::from(value.0 & 0x3)
}
}

/// Diffserv Codepoints, mapped to the upper six bits of the TOS field.
/// <https://www.iana.org/assignments/dscp-registry/dscp-registry.xhtml>
#[derive(Copy, Clone, PartialEq, Eq, Enum, Default, Debug)]
Expand Down Expand Up @@ -159,6 +165,12 @@ impl From<u8> for IpTosDscp {
}
}

impl From<IpTos> for IpTosDscp {
fn from(value: IpTos) -> Self {
IpTosDscp::from(value.0 & 0xfc)
}
}

/// The type-of-service field in an IP packet.
#[allow(clippy::module_name_repetitions)]
#[derive(Copy, Clone, PartialEq, Eq)]
Expand All @@ -169,22 +181,37 @@ impl From<IpTosEcn> for IpTos {
Self(u8::from(v))
}
}

impl From<IpTosDscp> for IpTos {
fn from(v: IpTosDscp) -> Self {
Self(u8::from(v))
}
}

impl From<(IpTosDscp, IpTosEcn)> for IpTos {
fn from(v: (IpTosDscp, IpTosEcn)) -> Self {
Self(u8::from(v.0) | u8::from(v.1))
}
}

impl From<(IpTosEcn, IpTosDscp)> for IpTos {
fn from(v: (IpTosEcn, IpTosDscp)) -> Self {
Self(u8::from(v.0) | u8::from(v.1))
}
}

impl From<IpTos> for u8 {
fn from(v: IpTos) -> Self {
v.0
}
}

impl From<u8> for IpTos {
fn from(v: u8) -> Self {
Self(v)
}
}

impl Debug for IpTos {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("IpTos")
Expand Down Expand Up @@ -287,4 +314,12 @@ mod tests {
let iptos_dscp: IpTos = dscp.into();
assert_eq!(u8::from(iptos_dscp), dscp as u8);
}

#[test]
fn u8_to_iptos() {
let tos = 0x8b;
let iptos: IpTos = (IpTosEcn::Ce, IpTosDscp::Af41).into();
assert_eq!(tos, u8::from(iptos));
assert_eq!(IpTos::from(tos), iptos);
}
}
Loading

0 comments on commit fa8ce91

Please sign in to comment.