From e6a283d4f1b101e85419fc692a2518fbf6437145 Mon Sep 17 00:00:00 2001 From: Roman Proskuryakov Date: Thu, 19 Aug 2021 15:19:43 +0300 Subject: [PATCH 01/11] Mv quinn/src/platform/ -> quinn-udp --- {quinn/src/platform => quinn-udp/src}/cmsg.rs | 0 {quinn/src/platform => quinn-udp/src}/fallback.rs | 0 quinn/src/platform/mod.rs => quinn-udp/src/lib.rs | 0 {quinn/src/platform => quinn-udp/src}/unix.rs | 0 4 files changed, 0 insertions(+), 0 deletions(-) rename {quinn/src/platform => quinn-udp/src}/cmsg.rs (100%) rename {quinn/src/platform => quinn-udp/src}/fallback.rs (100%) rename quinn/src/platform/mod.rs => quinn-udp/src/lib.rs (100%) rename {quinn/src/platform => quinn-udp/src}/unix.rs (100%) diff --git a/quinn/src/platform/cmsg.rs b/quinn-udp/src/cmsg.rs similarity index 100% rename from quinn/src/platform/cmsg.rs rename to quinn-udp/src/cmsg.rs diff --git a/quinn/src/platform/fallback.rs b/quinn-udp/src/fallback.rs similarity index 100% rename from quinn/src/platform/fallback.rs rename to quinn-udp/src/fallback.rs diff --git a/quinn/src/platform/mod.rs b/quinn-udp/src/lib.rs similarity index 100% rename from quinn/src/platform/mod.rs rename to quinn-udp/src/lib.rs diff --git a/quinn/src/platform/unix.rs b/quinn-udp/src/unix.rs similarity index 100% rename from quinn/src/platform/unix.rs rename to quinn-udp/src/unix.rs From 9046b213aede67af9e3fdca1616b89ecd5b8802f Mon Sep 17 00:00:00 2001 From: Roman Proskuryakov Date: Thu, 19 Aug 2021 15:20:34 +0300 Subject: [PATCH 02/11] Use quinn-udp instead of quinn::platform --- Cargo.toml | 4 ++-- quinn-udp/Cargo.toml | 29 +++++++++++++++++++++++++++++ quinn/Cargo.toml | 9 ++------- quinn/src/builders.rs | 6 ++---- quinn/src/connection.rs | 2 +- quinn/src/endpoint.rs | 2 +- quinn/src/lib.rs | 1 - 7 files changed, 37 insertions(+), 16 deletions(-) create mode 100644 quinn-udp/Cargo.toml diff --git a/Cargo.toml b/Cargo.toml index 5eed32b54..183541e35 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] -members = ["quinn", "quinn-proto", "interop", "bench", "perf", "fuzz"] -default-members = ["quinn", "quinn-proto", "interop", "bench", "perf"] +members = ["quinn", "quinn-proto", "quinn-udp", "interop", "bench", "perf", "fuzz"] +default-members = ["quinn", "quinn-proto", "quinn-udp", "interop", "bench", "perf"] [profile.bench] debug = true diff --git a/quinn-udp/Cargo.toml b/quinn-udp/Cargo.toml new file mode 100644 index 000000000..5d5cb6443 --- /dev/null +++ b/quinn-udp/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "quinn-udp" +version = "0.7.0" +authors = ["Benjamin Saunders ", "Dirkjan Ochtman "] +license = "MIT OR Apache-2.0" +repository = "https://github.com/djc/quinn" +description = "UDP sockets with ECN information for the QUIC transport protocol" +keywords = ["quic"] +categories = [ "network-programming", "asynchronous" ] +workspace = ".." +edition = "2018" + +[package.metadata.docs.rs] +all-features = true + +[badges] +maintenance = { status = "experimental" } + +[dependencies] +futures-util = { version = "0.3.11", features = ["io"] } +libc = "0.2.69" +mio = { version = "0.7.7", features = ["net"] } +proto = { package = "quinn-proto", path = "../quinn-proto", version = "0.7" } +socket2 = "0.4" +tracing = "0.1.10" +tokio = { version = "1.0.1", features = ["net"] } + +[target.'cfg(unix)'.dependencies] +lazy_static = "1" diff --git a/quinn/Cargo.toml b/quinn/Cargo.toml index 0739028bb..26a508b7d 100644 --- a/quinn/Cargo.toml +++ b/quinn/Cargo.toml @@ -33,20 +33,15 @@ bytes = "1" futures-util = { version = "0.3.11", features = ["io"] } futures-channel = "0.3.11" fxhash = "0.2.1" -libc = "0.2.69" -mio = { version = "0.7.7", features = ["net"] } once_cell = "1.7.2" proto = { package = "quinn-proto", path = "../quinn-proto", version = "0.7" } rustls = { version = "0.19", features = ["quic"], optional = true } -socket2 = "0.4" thiserror = "1.0.21" tracing = "0.1.10" -tokio = { version = "1.0.1", features = ["net", "rt", "time"] } +tokio = { version = "1.0.1", features = ["rt", "time"] } +udp = { package = "quinn-udp", path = "../quinn-udp", version = "0.7" } webpki = { version = "0.21", optional = true } -[target.'cfg(unix)'.dependencies] -lazy_static = "1" - [dev-dependencies] anyhow = "1.0.22" crc = "1.8.1" diff --git a/quinn/src/builders.rs b/quinn/src/builders.rs index 1b6251756..393532bab 100644 --- a/quinn/src/builders.rs +++ b/quinn/src/builders.rs @@ -7,11 +7,9 @@ use proto::{ }; use thiserror::Error; use tracing::error; +use udp::UdpSocket; -use crate::{ - endpoint::{Endpoint, EndpointDriver, EndpointRef, Incoming}, - platform::UdpSocket, -}; +use crate::endpoint::{Endpoint, EndpointDriver, EndpointRef, Incoming}; #[cfg(feature = "rustls")] use crate::{Certificate, CertificateChain, PrivateKey}; diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index 9ceeea496..8b1825e94 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -17,11 +17,11 @@ use proto::{ConnectionError, ConnectionHandle, ConnectionStats, Dir, StreamEvent use thiserror::Error; use tokio::time::{sleep_until, Instant as TokioInstant, Sleep}; use tracing::info_span; +use udp::caps; use crate::{ broadcast::{self, Broadcast}, mutex::Mutex, - platform::caps, recv_stream::RecvStream, send_stream::{SendStream, WriteError}, ConnectionEvent, EndpointEvent, VarInt, diff --git a/quinn/src/endpoint.rs b/quinn/src/endpoint.rs index 9831ddb42..f1f019952 100644 --- a/quinn/src/endpoint.rs +++ b/quinn/src/endpoint.rs @@ -18,12 +18,12 @@ use futures_util::StreamExt; use fxhash::FxHashMap; use once_cell::sync::OnceCell; use proto::{self as proto, generic::ClientConfig, ConnectError, ConnectionHandle, DatagramEvent}; +use udp::{RecvMeta, UdpSocket, BATCH_SIZE}; use crate::{ broadcast::{self, Broadcast}, builders::EndpointBuilder, connection::Connecting, - platform::{RecvMeta, UdpSocket, BATCH_SIZE}, work_limiter::WorkLimiter, ConnectionEvent, EndpointEvent, VarInt, IO_LOOP_BOUND, RECV_TIME_BOUND, }; diff --git a/quinn/src/lib.rs b/quinn/src/lib.rs index f87567df8..14922f6a7 100644 --- a/quinn/src/lib.rs +++ b/quinn/src/lib.rs @@ -57,7 +57,6 @@ mod builders; mod connection; mod endpoint; mod mutex; -mod platform; mod recv_stream; mod send_stream; mod work_limiter; From 204dea14820c8c145675650b62e6ddbaa214d7a8 Mon Sep 17 00:00:00 2001 From: Roman Proskuryakov Date: Thu, 19 Aug 2021 16:39:27 +0300 Subject: [PATCH 03/11] Fix repo urls for quinn* crates --- quinn-proto/Cargo.toml | 2 +- quinn-udp/Cargo.toml | 2 +- quinn/Cargo.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/quinn-proto/Cargo.toml b/quinn-proto/Cargo.toml index f3b48c177..707f90c30 100644 --- a/quinn-proto/Cargo.toml +++ b/quinn-proto/Cargo.toml @@ -3,7 +3,7 @@ name = "quinn-proto" version = "0.7.0" authors = ["Benjamin Saunders ", "Dirkjan Ochtman "] license = "MIT OR Apache-2.0" -repository = "https://github.com/djc/quinn" +repository = "https://github.com/quinn-rs/quinn" description = "State machine for the QUIC transport protocol" keywords = ["quic"] categories = [ "network-programming", "asynchronous" ] diff --git a/quinn-udp/Cargo.toml b/quinn-udp/Cargo.toml index 5d5cb6443..1f4ac3ea2 100644 --- a/quinn-udp/Cargo.toml +++ b/quinn-udp/Cargo.toml @@ -3,7 +3,7 @@ name = "quinn-udp" version = "0.7.0" authors = ["Benjamin Saunders ", "Dirkjan Ochtman "] license = "MIT OR Apache-2.0" -repository = "https://github.com/djc/quinn" +repository = "https://github.com/quinn-rs/quinn" description = "UDP sockets with ECN information for the QUIC transport protocol" keywords = ["quic"] categories = [ "network-programming", "asynchronous" ] diff --git a/quinn/Cargo.toml b/quinn/Cargo.toml index 26a508b7d..baf23d44a 100644 --- a/quinn/Cargo.toml +++ b/quinn/Cargo.toml @@ -3,7 +3,7 @@ name = "quinn" version = "0.7.0" authors = ["Benjamin Saunders ", "Dirkjan Ochtman "] license = "MIT OR Apache-2.0" -repository = "https://github.com/djc/quinn" +repository = "https://github.com/quinn-rs/quinn" description = "QUIC transport protocol implementation for Tokio" readme = "../README.md" keywords = ["quic"] From 4953932375f55a59dab52605f059a6b6f3bbe515 Mon Sep 17 00:00:00 2001 From: Roman Proskuryakov Date: Thu, 19 Aug 2021 16:49:45 +0300 Subject: [PATCH 04/11] Add quinn-udp to the list of crates into the main README --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 0fa8f187b..618db2e4b 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ This library is at [draft 32][current-draft]. - **quinn:** High-level async API based on tokio, see for usage. This will be used by most developers. (Basic benchmarks are included.) - **quinn-proto:** Deterministic state machine of the protocol which performs [**no** I/O][sans-io] internally and is suitable for use with custom event loops (and potentially a C or C++ API). +- **quinn-udp:** UDP sockets with ECN information tuned for the protocol. - **quinn-h3:** Contains an implementation of HTTP-3 and QPACK. It is split internally in a deterministic state machine and a tokio-based high-level async API. - **bench:** Benchmarks without any framework. - **interop:** Tooling that helps to run interoperability tests. From 765a9ceec51bca6b5033fa45bb053d1d6baca178 Mon Sep 17 00:00:00 2001 From: David Craven Date: Mon, 23 Aug 2021 13:56:06 +0300 Subject: [PATCH 05/11] Move quinn-udp from tokio to async_io --- quinn-udp/Cargo.toml | 13 +- quinn-udp/src/fallback.rs | 121 ++++-------- quinn-udp/src/lib.rs | 29 ++- quinn-udp/src/socket.rs | 102 ++++++++++ quinn-udp/src/unix.rs | 380 ++++++++++++++------------------------ quinn/src/connection.rs | 6 +- 6 files changed, 310 insertions(+), 341 deletions(-) create mode 100644 quinn-udp/src/socket.rs diff --git a/quinn-udp/Cargo.toml b/quinn-udp/Cargo.toml index 1f4ac3ea2..92450026e 100644 --- a/quinn-udp/Cargo.toml +++ b/quinn-udp/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "quinn-udp" version = "0.7.0" -authors = ["Benjamin Saunders ", "Dirkjan Ochtman "] +authors = ["Benjamin Saunders ", "Dirkjan Ochtman ", "David Craven "] license = "MIT OR Apache-2.0" repository = "https://github.com/quinn-rs/quinn" description = "UDP sockets with ECN information for the QUIC transport protocol" @@ -17,13 +17,18 @@ all-features = true maintenance = { status = "experimental" } [dependencies] -futures-util = { version = "0.3.11", features = ["io"] } +async-io = "1.3.1" +futures-lite = "1.11.3" libc = "0.2.69" -mio = { version = "0.7.7", features = ["net"] } proto = { package = "quinn-proto", path = "../quinn-proto", version = "0.7" } socket2 = "0.4" tracing = "0.1.10" -tokio = { version = "1.0.1", features = ["net"] } [target.'cfg(unix)'.dependencies] lazy_static = "1" + +[dev-dependencies] +anyhow = "1.0.40" +async-global-executor = "2.0.2" +env_logger = "0.8.3" +log = "0.4.14" diff --git a/quinn-udp/src/fallback.rs b/quinn-udp/src/fallback.rs index 3bcee1ae2..58a436f4f 100644 --- a/quinn-udp/src/fallback.rs +++ b/quinn-udp/src/fallback.rs @@ -1,102 +1,55 @@ -use std::{ - io::{self, IoSliceMut}, - net::SocketAddr, - task::{Context, Poll}, - time::Instant, -}; - -use futures_util::ready; +use crate::{RecvMeta, SocketType}; use proto::Transmit; -use tokio::io::ReadBuf; -use tracing::warn; -use super::{log_sendmsg_error, RecvMeta, IO_ERROR_LOG_INTERVAL}; +use std::io::{IoSliceMut, Result}; -/// Tokio-compatible UDP socket with some useful specializations. -/// -/// Unlike a standard tokio UDP socket, this allows ECN bits to be read and written on some -/// platforms. -#[derive(Debug)] -pub struct UdpSocket { - io: tokio::net::UdpSocket, - last_send_error: Instant, +pub fn init(socket: &std::net::UdpSocket) -> Result { + Ok(if socket.local_addr()?.is_ipv4() { + SocketType::Ipv4 + } else { + SocketType::Ipv6Only + }) } -impl UdpSocket { - pub fn from_std(socket: std::net::UdpSocket) -> io::Result { - socket.set_nonblocking(true)?; - let now = Instant::now(); - Ok(UdpSocket { - io: tokio::net::UdpSocket::from_std(socket)?, - last_send_error: now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now), - }) - } - - pub fn poll_send( - &mut self, - cx: &mut Context, - transmits: &[Transmit], - ) -> Poll> { - let mut sent = 0; - for transmit in transmits { - match self - .io - .poll_send_to(cx, &transmit.contents, transmit.destination) - { - Poll::Ready(Ok(_)) => { - sent += 1; - } +pub fn send(socket: &std::net::UdpSocket, transmits: &[Transmit]) -> Result { + let mut sent = 0; + for transmit in transmits { + match socket.send_to(&transmit.contents, &transmit.destination) { + Ok(_) => { + sent += 1; + } + Err(_) if sent != 0 => { // We need to report that some packets were sent in this case, so we rely on // errors being either harmlessly transient (in the case of WouldBlock) or // recurring on the next call. - Poll::Ready(Err(_)) | Poll::Pending if sent != 0 => return Poll::Ready(Ok(sent)), - Poll::Ready(Err(e)) => { - // WouldBlock is expected to be returned as `Poll::Pending` - debug_assert!(e.kind() != io::ErrorKind::WouldBlock); - - // Errors are ignored, since they will ususally be handled - // by higher level retransmits and timeouts. - // - PermissionDenied errors have been observed due to iptable rules. - // Those are not fatal errors, since the - // configuration can be dynamically changed. - // - Destination unreachable errors have been observed for other - log_sendmsg_error(&mut self.last_send_error, e, transmit); - sent += 1; - } - Poll::Pending => return Poll::Pending, + return Ok(sent); + } + Err(e) => { + return Err(e); } } - Poll::Ready(Ok(sent)) - } - - pub fn poll_recv( - &self, - cx: &mut Context, - bufs: &mut [IoSliceMut<'_>], - meta: &mut [RecvMeta], - ) -> Poll> { - debug_assert!(!bufs.is_empty()); - let mut buf = ReadBuf::new(&mut bufs[0]); - let addr = ready!(self.io.poll_recv_from(cx, &mut buf))?; - meta[0] = RecvMeta { - len: buf.filled().len(), - addr, - ecn: None, - dst_ip: None, - }; - Poll::Ready(Ok(1)) } + Ok(sent) +} - pub fn local_addr(&self) -> io::Result { - self.io.local_addr() - } +pub fn recv( + socket: &std::net::UdpSocket, + buffers: &mut [IoSliceMut<'_>], + meta: &mut [RecvMeta], +) -> Result { + let (len, addr) = socket.recv_from(&mut buffers[0])?; + meta[0] = RecvMeta { + addr, + len, + ecn: None, + dst_ip: None, + }; + Ok(1) } /// Returns the platforms UDP socket capabilities -pub fn caps() -> super::UdpCapabilities { - super::UdpCapabilities { - max_gso_segments: 1, - } +pub fn max_gso_segments() -> Result { + Ok(1) } pub const BATCH_SIZE: usize = 1; diff --git a/quinn-udp/src/lib.rs b/quinn-udp/src/lib.rs index 05d8ee78e..e5550da3b 100644 --- a/quinn-udp/src/lib.rs +++ b/quinn-udp/src/lib.rs @@ -9,24 +9,22 @@ use tracing::warn; #[cfg(unix)] mod cmsg; + +mod socket; + #[cfg(unix)] #[path = "unix.rs"] -mod imp; +mod platform; // No ECN support #[cfg(not(unix))] #[path = "fallback.rs"] -mod imp; - -pub use imp::UdpSocket; +mod platform; -/// Returns the platforms UDP socket capabilities -pub fn caps() -> UdpCapabilities { - imp::caps() -} +pub use socket::UdpSocket; -/// Number of UDP packets to send/receive at a time -pub const BATCH_SIZE: usize = imp::BATCH_SIZE; +/// Number of UDP packets to send/receive at a time when using sendmmsg/recvmmsg. +pub const BATCH_SIZE: usize = platform::BATCH_SIZE; /// The capabilities a UDP socket suppports on a certain platform #[derive(Debug, Clone, Copy)] @@ -58,6 +56,17 @@ impl Default for RecvMeta { } } +/// Socket type. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum SocketType { + /// Socket is bound to an ip4 address. + Ipv4, + /// Socket is bound to an ip6 address and is not dual stack. + Ipv6Only, + /// Socket is bound to an ip6 address and supports ip4 packets. + Ipv6, +} + /// Log at most 1 IO error per minute const IO_ERROR_LOG_INTERVAL: Duration = std::time::Duration::from_secs(60); diff --git a/quinn-udp/src/socket.rs b/quinn-udp/src/socket.rs new file mode 100644 index 000000000..95ae9440e --- /dev/null +++ b/quinn-udp/src/socket.rs @@ -0,0 +1,102 @@ +use proto::{Transmit}; +use crate::{RecvMeta, SocketType, UdpCapabilities}; +use async_io::Async; +use futures_lite::future::poll_fn; +use std::io::{IoSliceMut, Result}; +use std::net::SocketAddr; +use std::task::{Context, Poll}; + +use crate::platform as platform; + +/// Async-io-compatible UDP socket with some useful specializations. +/// +/// Unlike a standard UDP socket, this allows ECN bits to be read +/// and written on some platforms. +#[derive(Debug)] +pub struct UdpSocket { + inner: Async, + ty: SocketType, +} + +impl UdpSocket { + /// Returns the platforms UDP socket capabilities + pub fn capabilities() -> Result { + Ok(UdpCapabilities { + max_gso_segments: platform::max_gso_segments()?, + }) + } + + pub fn from_std(socket: std::net::UdpSocket) -> Result { + let ty = platform::init(&socket)?; + Ok(Self { + inner: Async::new(socket)?, + ty, + }) + } + + pub fn bind(addr: SocketAddr) -> Result { + let socket = std::net::UdpSocket::bind(addr)?; + Self::from_std(socket) + } + + pub fn socket_type(&self) -> SocketType { + self.ty + } + + pub fn local_addr(&self) -> Result { + self.inner.get_ref().local_addr() + } + + pub fn ttl(&self) -> Result { + let ttl = self.inner.get_ref().ttl()?; + Ok(ttl as u8) + } + + pub fn set_ttl(&self, ttl: u8) -> Result<()> { + self.inner.get_ref().set_ttl(ttl as u32) + } + + pub fn poll_send(&self, cx: &mut Context, transmits: &[Transmit]) -> Poll> { + match self.inner.poll_writable(cx) { + Poll::Ready(Ok(())) => {} + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + } + let socket = self.inner.get_ref(); + match platform::send(socket, transmits) { + Ok(len) => Poll::Ready(Ok(len)), + Err(err) => Poll::Ready(Err(err)), + } + } + + pub fn poll_recv( + &self, + cx: &mut Context, + buffers: &mut [IoSliceMut<'_>], + meta: &mut [RecvMeta], + ) -> Poll> { + match self.inner.poll_readable(cx) { + Poll::Ready(Ok(())) => {} + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + } + let socket = self.inner.get_ref(); + Poll::Ready(platform::recv(socket, buffers, meta)) + } + + pub async fn send(&self, transmits: &[Transmit]) -> Result { + let mut i = 0; + while i < transmits.len() { + i += poll_fn(|cx| self.poll_send(cx, &transmits[i..])).await?; + } + Ok(i) + } + + pub async fn recv( + &self, + buffers: &mut [IoSliceMut<'_>], + meta: &mut [RecvMeta], + ) -> Result { + poll_fn(|cx| self.poll_recv(cx, buffers, meta)).await + } +} diff --git a/quinn-udp/src/unix.rs b/quinn-udp/src/unix.rs index 0565dc1f3..6c1a84510 100644 --- a/quinn-udp/src/unix.rs +++ b/quinn-udp/src/unix.rs @@ -2,82 +2,20 @@ use std::{ io, io::IoSliceMut, mem::{self, MaybeUninit}, - net::{IpAddr, SocketAddr}, + net::{IpAddr, SocketAddr, UdpSocket}, os::unix::io::AsRawFd, ptr, - task::{Context, Poll}, - time::Instant, }; -use futures_util::ready; -use lazy_static::lazy_static; use proto::{EcnCodepoint, Transmit}; -use tokio::io::unix::AsyncFd; - -use super::{cmsg, log_sendmsg_error, RecvMeta, UdpCapabilities, IO_ERROR_LOG_INTERVAL}; +use crate::{cmsg, RecvMeta, SocketType}; #[cfg(target_os = "freebsd")] type IpTosTy = libc::c_uchar; #[cfg(not(target_os = "freebsd"))] type IpTosTy = libc::c_int; -/// Tokio-compatible UDP socket with some useful specializations. -/// -/// Unlike a standard tokio UDP socket, this allows ECN bits to be read and written on some -/// platforms. -#[derive(Debug)] -pub struct UdpSocket { - io: AsyncFd, - last_send_error: Instant, -} - -impl UdpSocket { - pub fn from_std(socket: std::net::UdpSocket) -> io::Result { - socket.set_nonblocking(true)?; - let io = mio::net::UdpSocket::from_std(socket); - init(&io)?; - let now = Instant::now(); - Ok(UdpSocket { - io: AsyncFd::new(io)?, - last_send_error: now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now), - }) - } - - pub fn poll_send( - &mut self, - cx: &mut Context, - transmits: &[Transmit], - ) -> Poll> { - loop { - let last_send_error = &mut self.last_send_error; - let mut guard = ready!(self.io.poll_write_ready(cx))?; - if let Ok(res) = guard.try_io(|io| send(io.get_ref(), last_send_error, transmits)) { - return Poll::Ready(res); - } - } - } - - pub fn poll_recv( - &self, - cx: &mut Context, - bufs: &mut [IoSliceMut<'_>], - meta: &mut [RecvMeta], - ) -> Poll> { - debug_assert!(!bufs.is_empty()); - loop { - let mut guard = ready!(self.io.poll_read_ready(cx))?; - if let Ok(res) = guard.try_io(|io| recv(io.get_ref(), bufs, meta)) { - return Poll::Ready(res); - } - } - } - - pub fn local_addr(&self) -> io::Result { - self.io.get_ref().local_addr() - } -} - -fn init(io: &mio::net::UdpSocket) -> io::Result<()> { +pub fn init(io: &UdpSocket) -> io::Result { let mut cmsg_platform_space = 0; if cfg!(target_os = "linux") { cmsg_platform_space += @@ -95,45 +33,34 @@ fn init(io: &mio::net::UdpSocket) -> io::Result<()> { ); let addr = io.local_addr()?; - - // macos and ios do not support IP_RECVTOS on dual-stack sockets :( - if addr.is_ipv4() || ((!cfg!(any(target_os = "macos", target_os = "ios"))) && !io.only_v6()?) { - let on: libc::c_int = 1; - let rc = unsafe { - libc::setsockopt( + let only_v6 = if addr.is_ipv6() { + unsafe { + let mut only_v6: libc::c_int = 0; + let rc = libc::getsockopt( io.as_raw_fd(), - libc::IPPROTO_IP, - libc::IP_RECVTOS, - &on as *const _ as _, - mem::size_of_val(&on) as _, - ) - }; - if rc == -1 { - return Err(io::Error::last_os_error()); - } - } - #[cfg(target_os = "linux")] - { - if addr.is_ipv4() { - let rc = unsafe { - libc::setsockopt( - io.as_raw_fd(), - libc::IPPROTO_IP, - libc::IP_MTU_DISCOVER, - &libc::IP_PMTUDISC_PROBE as *const _ as _, - mem::size_of_val(&libc::IP_PMTUDISC_PROBE) as _, - ) - }; + libc::IPPROTO_IPV6, + libc::IPV6_V6ONLY, + &mut only_v6 as *mut _ as _, + mem::size_of_val(&only_v6) as _, + ); if rc == -1 { return Err(io::Error::last_os_error()); } + only_v6 != 0 + } + } else { + false + }; + if addr.is_ipv4() || !only_v6 { + // macos and ios do not support IP_RECVTOS on dual-stack sockets :( + if !cfg!(any(target_os = "macos", target_os = "ios")) { let on: libc::c_int = 1; let rc = unsafe { libc::setsockopt( io.as_raw_fd(), libc::IPPROTO_IP, - libc::IP_PKTINFO, + libc::IP_RECVTOS, &on as *const _ as _, mem::size_of_val(&on) as _, ) @@ -141,12 +68,15 @@ fn init(io: &mio::net::UdpSocket) -> io::Result<()> { if rc == -1 { return Err(io::Error::last_os_error()); } - } else if addr.is_ipv6() { + } + + #[cfg(any(target_os = "linux", target_os = "android"))] + { let rc = unsafe { libc::setsockopt( io.as_raw_fd(), - libc::IPPROTO_IPV6, - libc::IPV6_MTU_DISCOVER, + libc::IPPROTO_IP, + libc::IP_MTU_DISCOVER, &libc::IP_PMTUDISC_PROBE as *const _ as _, mem::size_of_val(&libc::IP_PMTUDISC_PROBE) as _, ) @@ -159,8 +89,8 @@ fn init(io: &mio::net::UdpSocket) -> io::Result<()> { let rc = unsafe { libc::setsockopt( io.as_raw_fd(), - libc::IPPROTO_IPV6, - libc::IPV6_RECVPKTINFO, + libc::IPPROTO_IP, + libc::IP_PKTINFO, &on as *const _ as _, mem::size_of_val(&on) as _, ) @@ -184,16 +114,49 @@ fn init(io: &mio::net::UdpSocket) -> io::Result<()> { if rc == -1 { return Err(io::Error::last_os_error()); } + + #[cfg(any(target_os = "linux", target_os = "android"))] + { + let rc = unsafe { + libc::setsockopt( + io.as_raw_fd(), + libc::IPPROTO_IPV6, + libc::IPV6_MTU_DISCOVER, + &libc::IP_PMTUDISC_PROBE as *const _ as _, + mem::size_of_val(&libc::IP_PMTUDISC_PROBE) as _, + ) + }; + if rc == -1 { + return Err(io::Error::last_os_error()); + } + + let on: libc::c_int = 1; + let rc = unsafe { + libc::setsockopt( + io.as_raw_fd(), + libc::IPPROTO_IPV6, + libc::IPV6_RECVPKTINFO, + &on as *const _ as _, + mem::size_of_val(&on) as _, + ) + }; + if rc == -1 { + return Err(io::Error::last_os_error()); + } + } } - Ok(()) + + Ok(if addr.is_ipv4() { + SocketType::Ipv4 + } else if only_v6 { + SocketType::Ipv6Only + } else { + SocketType::Ipv6 + }) } #[cfg(not(any(target_os = "macos", target_os = "ios")))] -fn send( - io: &mio::net::UdpSocket, - last_send_error: &mut Instant, - transmits: &[Transmit], -) -> io::Result { +pub fn send(io: &UdpSocket, transmits: &[Transmit]) -> io::Result { let mut msgs: [libc::mmsghdr; BATCH_SIZE] = unsafe { mem::zeroed() }; let mut iovecs: [libc::iovec; BATCH_SIZE] = unsafe { mem::zeroed() }; let mut cmsgs = [cmsg::Aligned([0u8; CMSG_LEN]); BATCH_SIZE]; @@ -221,46 +184,28 @@ fn send( &mut cmsgs[i], ); } - let num_transmits = transmits.len().min(BATCH_SIZE); - loop { - let n = - unsafe { libc::sendmmsg(io.as_raw_fd(), msgs.as_mut_ptr(), num_transmits as u32, 0) }; + let n = unsafe { + libc::sendmmsg( + io.as_raw_fd(), + msgs.as_mut_ptr(), + transmits.len().min(BATCH_SIZE) as _, + 0, + ) + }; if n == -1 { let e = io::Error::last_os_error(); - match e.kind() { - io::ErrorKind::Interrupted => { - // Retry the transmission - continue; - } - io::ErrorKind::WouldBlock => return Err(e), - _ => { - // Other errors are ignored, since they will ususally be handled - // by higher level retransmits and timeouts. - // - PermissionDenied errors have been observed due to iptable rules. - // Those are not fatal errors, since the - // configuration can be dynamically changed. - // - Destination unreachable errors have been observed for other - log_sendmsg_error(last_send_error, e, &transmits[0]); - - // The ERRORS section in https://man7.org/linux/man-pages/man2/sendmmsg.2.html - // describes that errors will only be returned if no message could be transmitted - // at all. Therefore drop the first (problematic) message, - // and retry the remaining ones. - return Ok(num_transmits.min(1)); - } + if e.kind() == io::ErrorKind::Interrupted { + continue; } + return Err(e); } return Ok(n as usize); } } #[cfg(any(target_os = "macos", target_os = "ios"))] -fn send( - io: &mio::net::UdpSocket, - last_send_error: &mut Instant, - transmits: &[Transmit], -) -> io::Result { +pub fn send(io: &UdpSocket, transmits: &[Transmit]) -> io::Result { let mut hdr: libc::msghdr = unsafe { mem::zeroed() }; let mut iov: libc::iovec = unsafe { mem::zeroed() }; let mut ctrl = cmsg::Aligned([0u8; CMSG_LEN]); @@ -271,23 +216,16 @@ fn send( let n = unsafe { libc::sendmsg(io.as_raw_fd(), &hdr, 0) }; if n == -1 { let e = io::Error::last_os_error(); - match e.kind() { - io::ErrorKind::Interrupted => { - // Retry the transmission - } - io::ErrorKind::WouldBlock if sent != 0 => return Ok(sent), - io::ErrorKind::WouldBlock => return Err(e), - _ => { - // Other errors are ignored, since they will ususally be handled - // by higher level retransmits and timeouts. - // - PermissionDenied errors have been observed due to iptable rules. - // Those are not fatal errors, since the - // configuration can be dynamically changed. - // - Destination unreachable errors have been observed for other - log_sendmsg_error(last_send_error, e, &transmits[sent]); - sent += 1; - } + if e.kind() == io::ErrorKind::Interrupted { + continue; } + if sent != 0 { + // We need to report that some packets were sent in this case, so we rely on + // errors being either harmlessly transient (in the case of WouldBlock) or + // recurring on the next call. + return Ok(sent); + } + return Err(e); } else { sent += 1; } @@ -296,8 +234,8 @@ fn send( } #[cfg(not(any(target_os = "macos", target_os = "ios")))] -fn recv( - io: &mio::net::UdpSocket, +pub fn recv( + io: &UdpSocket, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta], ) -> io::Result { @@ -339,8 +277,8 @@ fn recv( } #[cfg(any(target_os = "macos", target_os = "ios"))] -fn recv( - io: &mio::net::UdpSocket, +pub fn recv( + io: &UdpSocket, bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta], ) -> io::Result { @@ -366,11 +304,6 @@ fn recv( Ok(1) } -/// Returns the platforms UDP socket capabilities -pub fn caps() -> UdpCapabilities { - *CAPABILITIES -} - const CMSG_LEN: usize = 88; fn prepare_msg( @@ -405,37 +338,32 @@ fn prepare_msg( encoder.push(libc::IPPROTO_IPV6, libc::IPV6_TCLASS, ecn); } + #[cfg(target_os = "linux")] if let Some(segment_size) = transmit.segment_size { - debug_assert!( - caps().max_gso_segments > 1, - "Platform must support GSO for setting segment size" - ); - - gso::set_segment_size(&mut encoder, segment_size as u16); + encoder.push(libc::SOL_UDP, libc::UDP_SEGMENT, segment_size as u16); } + #[cfg(any(target_os = "linux", target_os = "android"))] if let Some(ip) = &transmit.src_ip { - if cfg!(target_os = "linux") { - match ip { - IpAddr::V4(v4) => { - let pktinfo = libc::in_pktinfo { - ipi_ifindex: 0, - ipi_spec_dst: libc::in_addr { - s_addr: u32::from_ne_bytes(v4.octets()), - }, - ipi_addr: libc::in_addr { s_addr: 0 }, - }; - encoder.push(libc::IPPROTO_IP, libc::IP_PKTINFO, pktinfo); - } - IpAddr::V6(v6) => { - let pktinfo = libc::in6_pktinfo { - ipi6_ifindex: 0, - ipi6_addr: libc::in6_addr { - s6_addr: v6.octets(), - }, - }; - encoder.push(libc::IPPROTO_IPV6, libc::IPV6_PKTINFO, pktinfo); - } + match ip { + IpAddr::V4(v4) => { + let pktinfo = libc::in_pktinfo { + ipi_ifindex: 0, + ipi_spec_dst: libc::in_addr { + s_addr: u32::from_ne_bytes(v4.octets()), + }, + ipi_addr: libc::in_addr { s_addr: 0 }, + }; + encoder.push(libc::IPPROTO_IP, libc::IP_PKTINFO, pktinfo); + } + IpAddr::V6(v6) => { + let pktinfo = libc::in6_pktinfo { + ipi6_ifindex: 0, + ipi6_addr: libc::in6_addr { + s6_addr: v6.octets(), + }, + }; + encoder.push(libc::IPPROTO_IPV6, libc::IPV6_PKTINFO, pktinfo); } } } @@ -504,8 +432,8 @@ fn decode_recv( }; RecvMeta { - len, addr, + len, ecn: EcnCodepoint::from_bits(ecn_bits), dst_ip, } @@ -519,60 +447,30 @@ pub const BATCH_SIZE: usize = 32; pub const BATCH_SIZE: usize = 1; #[cfg(target_os = "linux")] -mod gso { - use super::*; - - /// Checks whether GSO support is available by setting the UDP_SEGMENT - /// option on a socket - pub fn max_gso_segments() -> usize { - const GSO_SIZE: libc::c_int = 1500; - - let socket = match std::net::UdpSocket::bind("[::]:0") { - Ok(socket) => socket, - Err(_) => return 1, - }; - - let rc = unsafe { - libc::setsockopt( - socket.as_raw_fd(), - libc::SOL_UDP, - libc::UDP_SEGMENT, - &GSO_SIZE as *const _ as _, - mem::size_of_val(&GSO_SIZE) as _, - ) - }; - - if rc != -1 { - // As defined in linux/udp.h - // #define UDP_MAX_SEGMENTS (1 << 6UL) - 64 - } else { - 1 - } - } - - pub fn set_segment_size(encoder: &mut cmsg::Encoder, segment_size: u16) { - encoder.push(libc::SOL_UDP, libc::UDP_SEGMENT, segment_size); - } -} - -#[cfg(not(target_os = "linux"))] -mod gso { - use super::*; - - pub fn max_gso_segments() -> usize { +pub fn max_gso_segments() -> io::Result { + // Checks whether GSO support is availably by setting the UDP_SEGMENT + // option on a socket. + const GSO_SIZE: libc::c_int = 1500; + let socket = UdpSocket::bind("[::]:0")?; + let res = unsafe { + libc::setsockopt( + socket.as_raw_fd(), + libc::SOL_UDP, + libc::UDP_SEGMENT, + &GSO_SIZE as *const _ as _, + mem::size_of_val(&GSO_SIZE) as _, + ) + }; + Ok(if res != -1 { + // As defined in linux/udp.h + // #define UDP_MAX_SEGMENTS ........(1 << 6UL) + 64 + } else { 1 - } - - pub fn set_segment_size(_encoder: &mut cmsg::Encoder, _segment_size: u16) { - panic!("Setting a segment size is not supported on current platform"); - } + }) } -lazy_static! { - static ref CAPABILITIES: UdpCapabilities = { - UdpCapabilities { - max_gso_segments: gso::max_gso_segments(), - } - }; +#[cfg(not(target_os = "linux"))] +pub fn max_gso_segments() -> io::Result { + Ok(1) } diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index 8b1825e94..1a98193b2 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -17,7 +17,7 @@ use proto::{ConnectionError, ConnectionHandle, ConnectionStats, Dir, StreamEvent use thiserror::Error; use tokio::time::{sleep_until, Instant as TokioInstant, Sleep}; use tracing::info_span; -use udp::caps; +use udp::UdpSocket; use crate::{ broadcast::{self, Broadcast}, @@ -806,7 +806,9 @@ where let now = Instant::now(); let mut transmits = 0; - let max_datagrams = caps().max_gso_segments; + let max_datagrams = UdpSocket::capabilities() + .expect("could not get capabilities") + .max_gso_segments; while let Some(t) = self.inner.poll_transmit(now, max_datagrams) { transmits += match t.segment_size { From 07e59f8c542485db3e7bdfe39cb43d58d1140f99 Mon Sep 17 00:00:00 2001 From: Roman Proskuryakov Date: Mon, 23 Aug 2021 13:57:34 +0300 Subject: [PATCH 06/11] Fix tests --- quinn-udp/src/cmsg.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/quinn-udp/src/cmsg.rs b/quinn-udp/src/cmsg.rs index 1d0a50838..a3bf0c3fe 100644 --- a/quinn-udp/src/cmsg.rs +++ b/quinn-udp/src/cmsg.rs @@ -71,8 +71,8 @@ impl<'a> Drop for Encoder<'a> { /// `cmsg` must refer to a cmsg containing a payload of type `T` pub unsafe fn decode(cmsg: &libc::cmsghdr) -> T { assert!(mem::align_of::() <= mem::align_of::()); - debug_assert_eq!( - cmsg.cmsg_len as usize, + debug_assert!( + cmsg.cmsg_len as usize <= libc::CMSG_LEN(mem::size_of::() as _) as usize ); ptr::read(libc::CMSG_DATA(cmsg) as *const T) From 0022dae5ba77ede8ec755451ca7c92878261714e Mon Sep 17 00:00:00 2001 From: David Craven Date: Mon, 23 Aug 2021 13:58:27 +0300 Subject: [PATCH 07/11] Add examples --- quinn-udp/examples/simple.rs | 66 ++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 quinn-udp/examples/simple.rs diff --git a/quinn-udp/examples/simple.rs b/quinn-udp/examples/simple.rs new file mode 100644 index 000000000..7c5139601 --- /dev/null +++ b/quinn-udp/examples/simple.rs @@ -0,0 +1,66 @@ +use anyhow::Result; +use std::io::IoSliceMut; +use std::net::Ipv4Addr; +use std::time::Instant; +use proto::{EcnCodepoint, Transmit}; +use quinn_udp::{RecvMeta, UdpSocket, BATCH_SIZE}; + +fn main() -> Result<()> { + env_logger::init(); + let socket1 = UdpSocket::bind((Ipv4Addr::LOCALHOST, 0).into())?; + let socket2 = UdpSocket::bind((Ipv4Addr::LOCALHOST, 0).into())?; + let addr2 = socket2.local_addr()?; + + let mut transmits = Vec::with_capacity(BATCH_SIZE); + for i in 0..BATCH_SIZE { + let contents = (i as u64).to_be_bytes().to_vec(); + transmits.push(Transmit { + destination: addr2, + ecn: Some(EcnCodepoint::Ce), + segment_size: Some(1200), + contents, + src_ip: Some(Ipv4Addr::LOCALHOST.into()), + }); + } + + let task1 = async_global_executor::spawn(async move { + log::debug!("before send"); + socket1.send(&transmits).await.unwrap(); + log::debug!("after send"); + }); + + let task2 = async_global_executor::spawn(async move { + let mut storage = [[0u8; 1200]; BATCH_SIZE]; + let mut buffers = Vec::with_capacity(BATCH_SIZE); + let mut rest = &mut storage[..]; + for _ in 0..BATCH_SIZE { + let (b, r) = rest.split_at_mut(1); + rest = r; + buffers.push(IoSliceMut::new(&mut b[0])); + } + + let mut meta = [RecvMeta::default(); BATCH_SIZE]; + let n = socket2.recv(&mut buffers, &mut meta).await.unwrap(); + for i in 0..n { + log::debug!( + "received {} {:?} {:?}", + i, + &buffers[i][..meta[i].len], + &meta[i] + ); + } + }); + + async_global_executor::block_on(async move { + let start = Instant::now(); + task1.await; + task2.await; + println!( + "sent {} packets in {}ms", + BATCH_SIZE, + start.elapsed().as_millis() + ); + }); + + Ok(()) +} From f4fddacda691f4d12424a1ff8dab2a1e8de9bdbb Mon Sep 17 00:00:00 2001 From: Roman Proskuryakov Date: Mon, 23 Aug 2021 13:59:22 +0300 Subject: [PATCH 08/11] Fix only_v6 --- quinn-udp/src/unix.rs | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/quinn-udp/src/unix.rs b/quinn-udp/src/unix.rs index 6c1a84510..26078cf37 100644 --- a/quinn-udp/src/unix.rs +++ b/quinn-udp/src/unix.rs @@ -34,20 +34,8 @@ pub fn init(io: &UdpSocket) -> io::Result { let addr = io.local_addr()?; let only_v6 = if addr.is_ipv6() { - unsafe { - let mut only_v6: libc::c_int = 0; - let rc = libc::getsockopt( - io.as_raw_fd(), - libc::IPPROTO_IPV6, - libc::IPV6_V6ONLY, - &mut only_v6 as *mut _ as _, - mem::size_of_val(&only_v6) as _, - ); - if rc == -1 { - return Err(io::Error::last_os_error()); - } - only_v6 != 0 - } + let socket = socket2::SockRef::from(io); + socket.only_v6()? } else { false }; From 78d43b52aafce70cd690c94159fc9b22897c6a92 Mon Sep 17 00:00:00 2001 From: Roman Proskuryakov Date: Mon, 23 Aug 2021 13:59:49 +0300 Subject: [PATCH 09/11] Remove SocketType (it was a write-only field) --- quinn-udp/src/fallback.rs | 11 ++++------- quinn-udp/src/lib.rs | 11 ----------- quinn-udp/src/socket.rs | 10 ++-------- quinn-udp/src/unix.rs | 12 +++--------- 4 files changed, 9 insertions(+), 35 deletions(-) diff --git a/quinn-udp/src/fallback.rs b/quinn-udp/src/fallback.rs index 58a436f4f..77d9ba451 100644 --- a/quinn-udp/src/fallback.rs +++ b/quinn-udp/src/fallback.rs @@ -1,14 +1,11 @@ -use crate::{RecvMeta, SocketType}; +use crate::RecvMeta; use proto::Transmit; use std::io::{IoSliceMut, Result}; -pub fn init(socket: &std::net::UdpSocket) -> Result { - Ok(if socket.local_addr()?.is_ipv4() { - SocketType::Ipv4 - } else { - SocketType::Ipv6Only - }) +pub fn init(_socket: &std::net::UdpSocket) -> Result<()> { + // We do nothing with the given socket. + Ok(()) } pub fn send(socket: &std::net::UdpSocket, transmits: &[Transmit]) -> Result { diff --git a/quinn-udp/src/lib.rs b/quinn-udp/src/lib.rs index e5550da3b..a58ed38c3 100644 --- a/quinn-udp/src/lib.rs +++ b/quinn-udp/src/lib.rs @@ -56,17 +56,6 @@ impl Default for RecvMeta { } } -/// Socket type. -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub enum SocketType { - /// Socket is bound to an ip4 address. - Ipv4, - /// Socket is bound to an ip6 address and is not dual stack. - Ipv6Only, - /// Socket is bound to an ip6 address and supports ip4 packets. - Ipv6, -} - /// Log at most 1 IO error per minute const IO_ERROR_LOG_INTERVAL: Duration = std::time::Duration::from_secs(60); diff --git a/quinn-udp/src/socket.rs b/quinn-udp/src/socket.rs index 95ae9440e..7141b0863 100644 --- a/quinn-udp/src/socket.rs +++ b/quinn-udp/src/socket.rs @@ -1,5 +1,5 @@ use proto::{Transmit}; -use crate::{RecvMeta, SocketType, UdpCapabilities}; +use crate::{RecvMeta, UdpCapabilities}; use async_io::Async; use futures_lite::future::poll_fn; use std::io::{IoSliceMut, Result}; @@ -15,7 +15,6 @@ use crate::platform as platform; #[derive(Debug)] pub struct UdpSocket { inner: Async, - ty: SocketType, } impl UdpSocket { @@ -27,10 +26,9 @@ impl UdpSocket { } pub fn from_std(socket: std::net::UdpSocket) -> Result { - let ty = platform::init(&socket)?; + platform::init(&socket)?; Ok(Self { inner: Async::new(socket)?, - ty, }) } @@ -39,10 +37,6 @@ impl UdpSocket { Self::from_std(socket) } - pub fn socket_type(&self) -> SocketType { - self.ty - } - pub fn local_addr(&self) -> Result { self.inner.get_ref().local_addr() } diff --git a/quinn-udp/src/unix.rs b/quinn-udp/src/unix.rs index 26078cf37..443b6f0b8 100644 --- a/quinn-udp/src/unix.rs +++ b/quinn-udp/src/unix.rs @@ -8,14 +8,14 @@ use std::{ }; use proto::{EcnCodepoint, Transmit}; -use crate::{cmsg, RecvMeta, SocketType}; +use crate::{cmsg, RecvMeta}; #[cfg(target_os = "freebsd")] type IpTosTy = libc::c_uchar; #[cfg(not(target_os = "freebsd"))] type IpTosTy = libc::c_int; -pub fn init(io: &UdpSocket) -> io::Result { +pub fn init(io: &UdpSocket) -> io::Result<()> { let mut cmsg_platform_space = 0; if cfg!(target_os = "linux") { cmsg_platform_space += @@ -134,13 +134,7 @@ pub fn init(io: &UdpSocket) -> io::Result { } } - Ok(if addr.is_ipv4() { - SocketType::Ipv4 - } else if only_v6 { - SocketType::Ipv6Only - } else { - SocketType::Ipv6 - }) + Ok(()) } #[cfg(not(any(target_os = "macos", target_os = "ios")))] From 8106e68240b72ba8c33d84dd282fd6448dba9487 Mon Sep 17 00:00:00 2001 From: Roman Proskuryakov Date: Mon, 23 Aug 2021 14:23:51 +0300 Subject: [PATCH 10/11] Restore logging code --- quinn-udp/examples/simple.rs | 2 +- quinn-udp/src/lib.rs | 29 ++---------------------- quinn-udp/src/socket.rs | 43 ++++++++++++++++++++++++++++++++---- 3 files changed, 42 insertions(+), 32 deletions(-) diff --git a/quinn-udp/examples/simple.rs b/quinn-udp/examples/simple.rs index 7c5139601..640267e43 100644 --- a/quinn-udp/examples/simple.rs +++ b/quinn-udp/examples/simple.rs @@ -7,7 +7,7 @@ use quinn_udp::{RecvMeta, UdpSocket, BATCH_SIZE}; fn main() -> Result<()> { env_logger::init(); - let socket1 = UdpSocket::bind((Ipv4Addr::LOCALHOST, 0).into())?; + let mut socket1 = UdpSocket::bind((Ipv4Addr::LOCALHOST, 0).into())?; let socket2 = UdpSocket::bind((Ipv4Addr::LOCALHOST, 0).into())?; let addr2 = socket2.local_addr()?; diff --git a/quinn-udp/src/lib.rs b/quinn-udp/src/lib.rs index a58ed38c3..550fb5f53 100644 --- a/quinn-udp/src/lib.rs +++ b/quinn-udp/src/lib.rs @@ -1,11 +1,7 @@ //! Uniform interface to send/recv UDP packets with ECN information. -use std::{ - net::{IpAddr, Ipv6Addr, SocketAddr}, - time::{Duration, Instant}, -}; +use std::net::{IpAddr, Ipv6Addr, SocketAddr}; -use proto::{EcnCodepoint, Transmit}; -use tracing::warn; +use proto::EcnCodepoint; #[cfg(unix)] mod cmsg; @@ -55,24 +51,3 @@ impl Default for RecvMeta { } } } - -/// Log at most 1 IO error per minute -const IO_ERROR_LOG_INTERVAL: Duration = std::time::Duration::from_secs(60); - -/// Logs a warning message when sendmsg fails -/// -/// Logging will only be performed if at least [`IO_ERROR_LOG_INTERVAL`] -/// has elapsed since the last error was logged. -fn log_sendmsg_error( - last_send_error: &mut Instant, - err: impl core::fmt::Debug, - transmit: &Transmit, -) { - let now = Instant::now(); - if now.saturating_duration_since(*last_send_error) > IO_ERROR_LOG_INTERVAL { - *last_send_error = now; - warn!( - "sendmsg error: {:?}, Transmit: {{ destination: {:?}, src_ip: {:?}, enc: {:?}, len: {:?}, segment_size: {:?} }}", - err, transmit.destination, transmit.src_ip, transmit.ecn, transmit.contents.len(), transmit.segment_size); - } -} diff --git a/quinn-udp/src/socket.rs b/quinn-udp/src/socket.rs index 7141b0863..d66147e03 100644 --- a/quinn-udp/src/socket.rs +++ b/quinn-udp/src/socket.rs @@ -2,9 +2,11 @@ use proto::{Transmit}; use crate::{RecvMeta, UdpCapabilities}; use async_io::Async; use futures_lite::future::poll_fn; -use std::io::{IoSliceMut, Result}; +use std::io::{IoSliceMut, Result, self}; use std::net::SocketAddr; use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; +use tracing::warn; use crate::platform as platform; @@ -15,6 +17,7 @@ use crate::platform as platform; #[derive(Debug)] pub struct UdpSocket { inner: Async, + last_send_error: Instant, } impl UdpSocket { @@ -27,8 +30,10 @@ impl UdpSocket { pub fn from_std(socket: std::net::UdpSocket) -> Result { platform::init(&socket)?; + let now = Instant::now(); Ok(Self { inner: Async::new(socket)?, + last_send_error: now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now), }) } @@ -50,7 +55,7 @@ impl UdpSocket { self.inner.get_ref().set_ttl(ttl as u32) } - pub fn poll_send(&self, cx: &mut Context, transmits: &[Transmit]) -> Poll> { + pub fn poll_send(&mut self, cx: &mut Context, transmits: &[Transmit]) -> Poll> { match self.inner.poll_writable(cx) { Poll::Ready(Ok(())) => {} Poll::Pending => return Poll::Pending, @@ -59,7 +64,16 @@ impl UdpSocket { let socket = self.inner.get_ref(); match platform::send(socket, transmits) { Ok(len) => Poll::Ready(Ok(len)), - Err(err) => Poll::Ready(Err(err)), + Err(err) => { + match err.kind() { + | io::ErrorKind::Interrupted + | io::ErrorKind::WouldBlock => {}, + _ => { + log_sendmsg_error(&mut self.last_send_error, &err, &transmits[0]); + } + } + Poll::Ready(Err(err)) + }, } } @@ -78,7 +92,7 @@ impl UdpSocket { Poll::Ready(platform::recv(socket, buffers, meta)) } - pub async fn send(&self, transmits: &[Transmit]) -> Result { + pub async fn send(&mut self, transmits: &[Transmit]) -> Result { let mut i = 0; while i < transmits.len() { i += poll_fn(|cx| self.poll_send(cx, &transmits[i..])).await?; @@ -94,3 +108,24 @@ impl UdpSocket { poll_fn(|cx| self.poll_recv(cx, buffers, meta)).await } } + +/// Log at most 1 IO error per minute +const IO_ERROR_LOG_INTERVAL: Duration = std::time::Duration::from_secs(60); + +/// Logs a warning message when sendmsg fails +/// +/// Logging will only be performed if at least [`IO_ERROR_LOG_INTERVAL`] +/// has elapsed since the last error was logged. +fn log_sendmsg_error( + last_send_error: &mut Instant, + err: &dyn core::fmt::Debug, + transmit: &Transmit, +) { + let now = Instant::now(); + if now.saturating_duration_since(*last_send_error) > IO_ERROR_LOG_INTERVAL { + *last_send_error = now; + warn!( + "sendmsg error: {:?}, Transmit: {{ destination: {:?}, src_ip: {:?}, enc: {:?}, len: {:?}, segment_size: {:?} }}", + err, transmit.destination, transmit.src_ip, transmit.ecn, transmit.contents.len(), transmit.segment_size); + } +} From 7e07c3666610220b04cc11517859f6f962377768 Mon Sep 17 00:00:00 2001 From: Roman Proskuryakov Date: Tue, 24 Aug 2021 17:07:11 +0300 Subject: [PATCH 11/11] Cargo fmt code --- quinn-udp/examples/simple.rs | 4 ++-- quinn-udp/src/cmsg.rs | 5 +---- quinn-udp/src/socket.rs | 11 +++++------ quinn-udp/src/unix.rs | 2 +- 4 files changed, 9 insertions(+), 13 deletions(-) diff --git a/quinn-udp/examples/simple.rs b/quinn-udp/examples/simple.rs index 640267e43..21337a3e3 100644 --- a/quinn-udp/examples/simple.rs +++ b/quinn-udp/examples/simple.rs @@ -1,9 +1,9 @@ use anyhow::Result; +use proto::{EcnCodepoint, Transmit}; +use quinn_udp::{RecvMeta, UdpSocket, BATCH_SIZE}; use std::io::IoSliceMut; use std::net::Ipv4Addr; use std::time::Instant; -use proto::{EcnCodepoint, Transmit}; -use quinn_udp::{RecvMeta, UdpSocket, BATCH_SIZE}; fn main() -> Result<()> { env_logger::init(); diff --git a/quinn-udp/src/cmsg.rs b/quinn-udp/src/cmsg.rs index a3bf0c3fe..964e2b85d 100644 --- a/quinn-udp/src/cmsg.rs +++ b/quinn-udp/src/cmsg.rs @@ -71,10 +71,7 @@ impl<'a> Drop for Encoder<'a> { /// `cmsg` must refer to a cmsg containing a payload of type `T` pub unsafe fn decode(cmsg: &libc::cmsghdr) -> T { assert!(mem::align_of::() <= mem::align_of::()); - debug_assert!( - cmsg.cmsg_len as usize <= - libc::CMSG_LEN(mem::size_of::() as _) as usize - ); + debug_assert!(cmsg.cmsg_len as usize <= libc::CMSG_LEN(mem::size_of::() as _) as usize); ptr::read(libc::CMSG_DATA(cmsg) as *const T) } diff --git a/quinn-udp/src/socket.rs b/quinn-udp/src/socket.rs index d66147e03..8ab2a188f 100644 --- a/quinn-udp/src/socket.rs +++ b/quinn-udp/src/socket.rs @@ -1,14 +1,14 @@ -use proto::{Transmit}; use crate::{RecvMeta, UdpCapabilities}; use async_io::Async; use futures_lite::future::poll_fn; -use std::io::{IoSliceMut, Result, self}; +use proto::Transmit; +use std::io::{self, IoSliceMut, Result}; use std::net::SocketAddr; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use tracing::warn; -use crate::platform as platform; +use crate::platform; /// Async-io-compatible UDP socket with some useful specializations. /// @@ -66,14 +66,13 @@ impl UdpSocket { Ok(len) => Poll::Ready(Ok(len)), Err(err) => { match err.kind() { - | io::ErrorKind::Interrupted - | io::ErrorKind::WouldBlock => {}, + io::ErrorKind::Interrupted | io::ErrorKind::WouldBlock => {} _ => { log_sendmsg_error(&mut self.last_send_error, &err, &transmits[0]); } } Poll::Ready(Err(err)) - }, + } } } diff --git a/quinn-udp/src/unix.rs b/quinn-udp/src/unix.rs index 443b6f0b8..6048d7139 100644 --- a/quinn-udp/src/unix.rs +++ b/quinn-udp/src/unix.rs @@ -7,8 +7,8 @@ use std::{ ptr, }; -use proto::{EcnCodepoint, Transmit}; use crate::{cmsg, RecvMeta}; +use proto::{EcnCodepoint, Transmit}; #[cfg(target_os = "freebsd")] type IpTosTy = libc::c_uchar;