Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

quinn_udp: use async_io instead of tokio #1183

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[workspace]
members = ["quinn", "quinn-proto", "interop", "bench", "perf", "fuzz"]
default-members = ["quinn", "quinn-proto", "interop", "bench", "perf"]
members = ["quinn", "quinn-proto", "quinn-udp", "interop", "bench", "perf", "fuzz"]
default-members = ["quinn", "quinn-proto", "quinn-udp", "interop", "bench", "perf"]

[profile.bench]
debug = true
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ This library is at [draft 32][current-draft].

- **quinn:** High-level async API based on tokio, see for usage. This will be used by most developers. (Basic benchmarks are included.)
- **quinn-proto:** Deterministic state machine of the protocol which performs [**no** I/O][sans-io] internally and is suitable for use with custom event loops (and potentially a C or C++ API).
- **quinn-udp:** UDP sockets with ECN information tuned for the protocol.
- **quinn-h3:** Contains an implementation of HTTP-3 and QPACK. It is split internally in a deterministic state machine and a tokio-based high-level async API.
- **bench:** Benchmarks without any framework.
- **interop:** Tooling that helps to run interoperability tests.
Expand Down
2 changes: 1 addition & 1 deletion quinn-proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "quinn-proto"
version = "0.7.0"
authors = ["Benjamin Saunders <ben.e.saunders@gmail.com>", "Dirkjan Ochtman <dirkjan@ochtman.nl>"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/djc/quinn"
repository = "https://github.com/quinn-rs/quinn"
description = "State machine for the QUIC transport protocol"
keywords = ["quic"]
categories = [ "network-programming", "asynchronous" ]
Expand Down
34 changes: 34 additions & 0 deletions quinn-udp/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
[package]
name = "quinn-udp"
version = "0.7.0"
authors = ["Benjamin Saunders <ben.e.saunders@gmail.com>", "Dirkjan Ochtman <dirkjan@ochtman.nl>", "David Craven <david@craven.ch>"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/quinn-rs/quinn"
description = "UDP sockets with ECN information for the QUIC transport protocol"
keywords = ["quic"]
categories = [ "network-programming", "asynchronous" ]
workspace = ".."
edition = "2018"

[package.metadata.docs.rs]
all-features = true

[badges]
maintenance = { status = "experimental" }

[dependencies]
async-io = "1.3.1"
futures-lite = "1.11.3"
libc = "0.2.69"
proto = { package = "quinn-proto", path = "../quinn-proto", version = "0.7" }
socket2 = "0.4"
tracing = "0.1.10"

[target.'cfg(unix)'.dependencies]
lazy_static = "1"

[dev-dependencies]
anyhow = "1.0.40"
async-global-executor = "2.0.2"
env_logger = "0.8.3"
log = "0.4.14"
66 changes: 66 additions & 0 deletions quinn-udp/examples/simple.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use anyhow::Result;
use proto::{EcnCodepoint, Transmit};
use quinn_udp::{RecvMeta, UdpSocket, BATCH_SIZE};
use std::io::IoSliceMut;
use std::net::Ipv4Addr;
use std::time::Instant;

fn main() -> Result<()> {
env_logger::init();
let mut socket1 = UdpSocket::bind((Ipv4Addr::LOCALHOST, 0).into())?;
let socket2 = UdpSocket::bind((Ipv4Addr::LOCALHOST, 0).into())?;
let addr2 = socket2.local_addr()?;

let mut transmits = Vec::with_capacity(BATCH_SIZE);
for i in 0..BATCH_SIZE {
let contents = (i as u64).to_be_bytes().to_vec();
transmits.push(Transmit {
destination: addr2,
ecn: Some(EcnCodepoint::Ce),
segment_size: Some(1200),
contents,
src_ip: Some(Ipv4Addr::LOCALHOST.into()),
});
}

let task1 = async_global_executor::spawn(async move {
log::debug!("before send");
socket1.send(&transmits).await.unwrap();
log::debug!("after send");
});

let task2 = async_global_executor::spawn(async move {
let mut storage = [[0u8; 1200]; BATCH_SIZE];
let mut buffers = Vec::with_capacity(BATCH_SIZE);
let mut rest = &mut storage[..];
for _ in 0..BATCH_SIZE {
let (b, r) = rest.split_at_mut(1);
rest = r;
buffers.push(IoSliceMut::new(&mut b[0]));
}

let mut meta = [RecvMeta::default(); BATCH_SIZE];
let n = socket2.recv(&mut buffers, &mut meta).await.unwrap();
for i in 0..n {
log::debug!(
"received {} {:?} {:?}",
i,
&buffers[i][..meta[i].len],
&meta[i]
);
}
});

async_global_executor::block_on(async move {
let start = Instant::now();
task1.await;
task2.await;
println!(
"sent {} packets in {}ms",
BATCH_SIZE,
start.elapsed().as_millis()
);
});

Ok(())
}
5 changes: 1 addition & 4 deletions quinn/src/platform/cmsg.rs → quinn-udp/src/cmsg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,7 @@ impl<'a> Drop for Encoder<'a> {
/// `cmsg` must refer to a cmsg containing a payload of type `T`
pub unsafe fn decode<T: Copy>(cmsg: &libc::cmsghdr) -> T {
assert!(mem::align_of::<T>() <= mem::align_of::<libc::cmsghdr>());
debug_assert_eq!(
cmsg.cmsg_len as usize,
libc::CMSG_LEN(mem::size_of::<T>() as _) as usize
);
debug_assert!(cmsg.cmsg_len as usize <= libc::CMSG_LEN(mem::size_of::<T>() as _) as usize);
ptr::read(libc::CMSG_DATA(cmsg) as *const T)
}

Expand Down
52 changes: 52 additions & 0 deletions quinn-udp/src/fallback.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use crate::RecvMeta;
use proto::Transmit;

use std::io::{IoSliceMut, Result};

pub fn init(_socket: &std::net::UdpSocket) -> Result<()> {
// We do nothing with the given socket.
Ok(())
}

pub fn send(socket: &std::net::UdpSocket, transmits: &[Transmit]) -> Result<usize> {
let mut sent = 0;
for transmit in transmits {
match socket.send_to(&transmit.contents, &transmit.destination) {
Ok(_) => {
sent += 1;
}
Err(_) if sent != 0 => {
// We need to report that some packets were sent in this case, so we rely on
// errors being either harmlessly transient (in the case of WouldBlock) or
// recurring on the next call.
return Ok(sent);
}
Err(e) => {
return Err(e);
}
}
}
Ok(sent)
}

pub fn recv(
socket: &std::net::UdpSocket,
buffers: &mut [IoSliceMut<'_>],
meta: &mut [RecvMeta],
) -> Result<usize> {
let (len, addr) = socket.recv_from(&mut buffers[0])?;
meta[0] = RecvMeta {
addr,
len,
ecn: None,
dst_ip: None,
};
Ok(1)
}

/// Returns the platforms UDP socket capabilities
pub fn max_gso_segments() -> Result<usize> {
Ok(1)
}

pub const BATCH_SIZE: usize = 1;
53 changes: 53 additions & 0 deletions quinn-udp/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
//! Uniform interface to send/recv UDP packets with ECN information.
use std::net::{IpAddr, Ipv6Addr, SocketAddr};

use proto::EcnCodepoint;

#[cfg(unix)]
mod cmsg;

mod socket;

#[cfg(unix)]
#[path = "unix.rs"]
mod platform;

// No ECN support
#[cfg(not(unix))]
#[path = "fallback.rs"]
mod platform;

pub use socket::UdpSocket;

/// Number of UDP packets to send/receive at a time when using sendmmsg/recvmmsg.
pub const BATCH_SIZE: usize = platform::BATCH_SIZE;

/// The capabilities a UDP socket suppports on a certain platform
#[derive(Debug, Clone, Copy)]
pub struct UdpCapabilities {
/// The maximum amount of segments which can be transmitted if a platform
/// supports Generic Send Offload (GSO).
/// This is 1 if the platform doesn't support GSO.
pub max_gso_segments: usize,
}

#[derive(Debug, Copy, Clone)]
pub struct RecvMeta {
pub addr: SocketAddr,
pub len: usize,
pub ecn: Option<EcnCodepoint>,
/// The destination IP address which was encoded in this datagram
pub dst_ip: Option<IpAddr>,
}

impl Default for RecvMeta {
/// Constructs a value with arbitrary fields, intended to be overwritten
fn default() -> Self {
Self {
addr: SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 0),
len: 0,
ecn: None,
dst_ip: None,
}
}
}
130 changes: 130 additions & 0 deletions quinn-udp/src/socket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use crate::{RecvMeta, UdpCapabilities};
use async_io::Async;
use futures_lite::future::poll_fn;
use proto::Transmit;
use std::io::{self, IoSliceMut, Result};
use std::net::SocketAddr;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use tracing::warn;

use crate::platform;

/// Async-io-compatible UDP socket with some useful specializations.
///
/// Unlike a standard UDP socket, this allows ECN bits to be read
/// and written on some platforms.
#[derive(Debug)]
pub struct UdpSocket {
inner: Async<std::net::UdpSocket>,
last_send_error: Instant,
}

impl UdpSocket {
/// Returns the platforms UDP socket capabilities
pub fn capabilities() -> Result<UdpCapabilities> {
Ok(UdpCapabilities {
max_gso_segments: platform::max_gso_segments()?,
})
}

pub fn from_std(socket: std::net::UdpSocket) -> Result<Self> {
platform::init(&socket)?;
let now = Instant::now();
Ok(Self {
inner: Async::new(socket)?,
last_send_error: now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now),
})
}

pub fn bind(addr: SocketAddr) -> Result<Self> {
let socket = std::net::UdpSocket::bind(addr)?;
Self::from_std(socket)
}

pub fn local_addr(&self) -> Result<SocketAddr> {
self.inner.get_ref().local_addr()
}

pub fn ttl(&self) -> Result<u8> {
let ttl = self.inner.get_ref().ttl()?;
Ok(ttl as u8)
}

pub fn set_ttl(&self, ttl: u8) -> Result<()> {
self.inner.get_ref().set_ttl(ttl as u32)
}

pub fn poll_send(&mut self, cx: &mut Context, transmits: &[Transmit]) -> Poll<Result<usize>> {
match self.inner.poll_writable(cx) {
Poll::Ready(Ok(())) => {}
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
}
let socket = self.inner.get_ref();
match platform::send(socket, transmits) {
Ok(len) => Poll::Ready(Ok(len)),
Err(err) => {
match err.kind() {
io::ErrorKind::Interrupted | io::ErrorKind::WouldBlock => {}
_ => {
log_sendmsg_error(&mut self.last_send_error, &err, &transmits[0]);
}
}
Poll::Ready(Err(err))
}
}
}

pub fn poll_recv(
&self,
cx: &mut Context,
buffers: &mut [IoSliceMut<'_>],
meta: &mut [RecvMeta],
) -> Poll<Result<usize>> {
match self.inner.poll_readable(cx) {
Poll::Ready(Ok(())) => {}
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
}
let socket = self.inner.get_ref();
Poll::Ready(platform::recv(socket, buffers, meta))
}

pub async fn send(&mut self, transmits: &[Transmit]) -> Result<usize> {
let mut i = 0;
while i < transmits.len() {
i += poll_fn(|cx| self.poll_send(cx, &transmits[i..])).await?;
}
Ok(i)
}

pub async fn recv(
&self,
buffers: &mut [IoSliceMut<'_>],
meta: &mut [RecvMeta],
) -> Result<usize> {
poll_fn(|cx| self.poll_recv(cx, buffers, meta)).await
}
}

/// Log at most 1 IO error per minute
const IO_ERROR_LOG_INTERVAL: Duration = std::time::Duration::from_secs(60);

/// Logs a warning message when sendmsg fails
///
/// Logging will only be performed if at least [`IO_ERROR_LOG_INTERVAL`]
/// has elapsed since the last error was logged.
fn log_sendmsg_error(
last_send_error: &mut Instant,
err: &dyn core::fmt::Debug,
transmit: &Transmit,
) {
let now = Instant::now();
if now.saturating_duration_since(*last_send_error) > IO_ERROR_LOG_INTERVAL {
*last_send_error = now;
warn!(
"sendmsg error: {:?}, Transmit: {{ destination: {:?}, src_ip: {:?}, enc: {:?}, len: {:?}, segment_size: {:?} }}",
err, transmit.destination, transmit.src_ip, transmit.ecn, transmit.contents.len(), transmit.segment_size);
}
}
Loading