From e306d0aa2572abf4355eeac19388102220c0eafe Mon Sep 17 00:00:00 2001 From: dgarus Date: Tue, 20 Aug 2024 16:29:02 +0300 Subject: [PATCH 1/7] WebTransport above QUIC transport --- Cargo.lock | 73 ++++-- libp2p/src/builder/phase/quic.rs | 2 +- transports/quic/Cargo.toml | 7 + transports/quic/src/config.rs | 36 ++- transports/quic/src/connection/connecting.rs | 140 +++++++++-- transports/quic/src/lib.rs | 1 + transports/quic/src/transport.rs | 221 ++++++++++++++---- transports/quic/src/webtransport.rs | 94 ++++++++ .../quic/src/webtransport/certificates.rs | 166 +++++++++++++ .../quic/src/webtransport/connection.rs | 172 ++++++++++++++ transports/tls/Cargo.toml | 1 + transports/tls/src/certificate.rs | 41 ++++ transports/tls/src/lib.rs | 25 ++ 13 files changed, 898 insertions(+), 81 deletions(-) create mode 100644 transports/quic/src/webtransport.rs create mode 100644 transports/quic/src/webtransport/certificates.rs create mode 100644 transports/quic/src/webtransport/connection.rs diff --git a/Cargo.lock b/Cargo.lock index 3fda6b57aa8..6a7170e9fbf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -566,7 +566,7 @@ dependencies = [ "axum-core 0.4.3", "bytes", "futures-util", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "http-body-util", "hyper 1.1.0", @@ -616,7 +616,7 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "http-body-util", "mime", @@ -1917,7 +1917,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http 1.0.0", + "http 1.1.0", "indexmap 2.2.1", "slab", "tokio", @@ -1925,6 +1925,46 @@ dependencies = [ "tracing", ] +[[package]] +name = "h3" +version = "0.0.6" +source = "git+https://github.com/hyperium/h3#1af2235a23a9f9343c50e52e46ea2eaa9b694f6d" +dependencies = [ + "bytes", + "fastrand 2.0.1", + "futures-util", + "http 1.1.0", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "h3-quinn" +version = "0.0.7" +source = "git+https://github.com/hyperium/h3#1af2235a23a9f9343c50e52e46ea2eaa9b694f6d" +dependencies = [ + "bytes", + "futures", + "h3", + "quinn", + "tokio", + "tokio-util", +] + +[[package]] +name = "h3-webtransport" +version = "0.1.0" +source = "git+https://github.com/hyperium/h3#1af2235a23a9f9343c50e52e46ea2eaa9b694f6d" +dependencies = [ + "bytes", + "futures-util", + "h3", + "http 1.1.0", + "pin-project-lite", + "tokio", + "tracing", +] + [[package]] name = "half" version = "1.8.2" @@ -2108,9 +2148,9 @@ dependencies = [ [[package]] name = "http" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" dependencies = [ "bytes", "fnv", @@ -2135,7 +2175,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" dependencies = [ "bytes", - "http 1.0.0", + "http 1.1.0", ] [[package]] @@ -2146,7 +2186,7 @@ checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840" dependencies = [ "bytes", "futures-util", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "pin-project-lite", ] @@ -2209,7 +2249,7 @@ dependencies = [ "futures-channel", "futures-util", "h2 0.4.4", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "httparse", "httpdate", @@ -2226,7 +2266,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" dependencies = [ "futures-util", - "http 1.0.0", + "http 1.1.0", "hyper 1.1.0", "hyper-util", "rustls 0.22.4", @@ -2273,7 +2313,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "hyper 1.1.0", "pin-project-lite", @@ -3146,6 +3186,10 @@ dependencies = [ "bytes", "futures", "futures-timer", + "h3", + "h3-quinn", + "h3-webtransport", + "http 1.1.0", "if-watch", "libp2p-core", "libp2p-identity", @@ -3160,8 +3204,10 @@ dependencies = [ "rand 0.8.5", "ring 0.17.8", "rustls 0.23.11", + "sha2 0.10.8", "socket2 0.5.7", "thiserror", + "time", "tokio", "tracing", "tracing-subscriber", @@ -3388,6 +3434,7 @@ dependencies = [ "rustls 0.23.11", "rustls-webpki 0.101.7", "thiserror", + "time", "tokio", "x509-parser 0.16.0", "yasna", @@ -4886,7 +4933,7 @@ dependencies = [ "futures-core", "futures-util", "h2 0.4.4", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "http-body-util", "hyper 1.1.0", @@ -5867,7 +5914,7 @@ dependencies = [ "async-trait", "base64 0.22.1", "futures", - "http 1.0.0", + "http 1.1.0", "indexmap 2.2.1", "parking_lot", "paste", @@ -6158,7 +6205,7 @@ dependencies = [ "bitflags 2.4.1", "bytes", "futures-util", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "http-body-util", "http-range-header", diff --git a/libp2p/src/builder/phase/quic.rs b/libp2p/src/builder/phase/quic.rs index e030e9493bb..3163dffb65b 100644 --- a/libp2p/src/builder/phase/quic.rs +++ b/libp2p/src/builder/phase/quic.rs @@ -41,7 +41,7 @@ macro_rules! impl_quic_builder { .transport .or_transport( libp2p_quic::$quic::Transport::new(constructor( - libp2p_quic::Config::new(&self.keypair), + libp2p_quic::Config::new(&self.keypair, None), )) .map(|(peer_id, muxer), _| { (peer_id, libp2p_core::muxing::StreamMuxerBox::new(muxer)) diff --git a/transports/quic/Cargo.toml b/transports/quic/Cargo.toml index c7e031b1cfd..66b95ada2b9 100644 --- a/transports/quic/Cargo.toml +++ b/transports/quic/Cargo.toml @@ -17,6 +17,7 @@ if-watch = "3.2.0" libp2p-core = { workspace = true } libp2p-tls = { workspace = true } libp2p-identity = { workspace = true } +libp2p-noise = { workspace = true } parking_lot = "0.12.3" quinn = { version = "0.11.2", default-features = false, features = ["rustls", "futures-io"] } rand = "0.8.5" @@ -26,6 +27,12 @@ tokio = { workspace = true, default-features = false, features = ["net", "rt", " tracing = { workspace = true } socket2 = "0.5.7" ring = { workspace = true } +sha2 = "0.10.8" +time = "0.3" +http = "1.1.0" +h3 = { git = "https://github.com/hyperium/h3" } +h3-quinn = { git = "https://github.com/hyperium/h3" } +h3-webtransport = { git = "https://github.com/hyperium/h3" } [features] tokio = ["dep:tokio", "if-watch/tokio", "quinn/runtime-tokio"] diff --git a/transports/quic/src/config.rs b/transports/quic/src/config.rs index 2456ed3e36f..d20817d2a49 100644 --- a/transports/quic/src/config.rs +++ b/transports/quic/src/config.rs @@ -18,11 +18,17 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use std::{sync::Arc, time::Duration}; + use quinn::{ crypto::rustls::{QuicClientConfig, QuicServerConfig}, MtuDiscoveryConfig, VarInt, }; -use std::{sync::Arc, time::Duration}; + +use libp2p_core::multihash::Multihash; + +use crate::webtransport; +use crate::webtransport::Certificate; /// Config for the transport. #[derive(Clone)] @@ -69,18 +75,36 @@ pub struct Config { /// Parameters governing MTU discovery. See [`MtuDiscoveryConfig`] for details. mtu_discovery_config: Option, + + webtransport_certhashes: Vec>, } impl Config { /// Creates a new configuration object with default values. - pub fn new(keypair: &libp2p_identity::Keypair) -> Self { + pub fn new(keypair: &libp2p_identity::Keypair, webtransport_cert: Option) -> Self { let client_tls_config = Arc::new( QuicClientConfig::try_from(libp2p_tls::make_client_config(keypair, None).unwrap()) .unwrap(), ); + let server_config = match &webtransport_cert { + None => libp2p_tls::make_server_config(keypair).unwrap(), + Some(c) => { + libp2p_tls::make_webtransport_server_config( + &c.cert, + &c.private_key, + webtransport::alpn_protocols() + ).unwrap() + }, + }; let server_tls_config = Arc::new( - QuicServerConfig::try_from(libp2p_tls::make_server_config(keypair).unwrap()).unwrap(), + QuicServerConfig::try_from(server_config).unwrap(), ); + + let webtransport_certhashes: Vec> = match webtransport_cert { + Some(c) => vec!(c.cert_hash()), + None => vec!() + }; + Self { client_tls_config, server_tls_config, @@ -95,6 +119,7 @@ impl Config { max_stream_data: 10_000_000, keypair: keypair.clone(), mtu_discovery_config: Some(Default::default()), + webtransport_certhashes, } } @@ -119,6 +144,8 @@ pub(crate) struct QuinnConfig { pub(crate) client_config: quinn::ClientConfig, pub(crate) server_config: quinn::ServerConfig, pub(crate) endpoint_config: quinn::EndpointConfig, + pub(crate) keypair: libp2p_identity::Keypair, + pub(crate) webtransport_certhashes: Vec>, } impl From for QuinnConfig { @@ -135,6 +162,7 @@ impl From for QuinnConfig { handshake_timeout: _, keypair, mtu_discovery_config, + webtransport_certhashes, } = config; let mut transport = quinn::TransportConfig::default(); // Disable uni-directional streams. @@ -176,6 +204,8 @@ impl From for QuinnConfig { client_config, server_config, endpoint_config, + keypair, + webtransport_certhashes } } } diff --git a/transports/quic/src/connection/connecting.rs b/transports/quic/src/connection/connecting.rs index f6e397b4d1e..c2ddfc2bdff 100644 --- a/transports/quic/src/connection/connecting.rs +++ b/transports/quic/src/connection/connecting.rs @@ -20,31 +20,42 @@ //! Future that drives a QUIC connection until is has performed its TLS handshake. -use crate::{Connection, ConnectionError, Error}; +use std::{pin::Pin, task::{Context, Poll}, time::Duration}; +use std::collections::HashSet; +use std::sync::Arc; use futures::{ - future::{select, Either, FutureExt, Select}, + future::{Either, FutureExt, select, Select}, prelude::*, }; +use futures::future::BoxFuture; use futures_timer::Delay; -use libp2p_identity::PeerId; +use h3::error::ErrorLevel; +use h3::ext::Protocol; +use h3_webtransport::server::WebTransportSession; +use http::Method; use quinn::rustls::pki_types::CertificateDer; -use std::{ - pin::Pin, - task::{Context, Poll}, - time::Duration, -}; + +use libp2p_core::multihash::Multihash; +use libp2p_core::muxing::StreamMuxerBox; +use libp2p_core::upgrade::InboundConnectionUpgrade; +use libp2p_identity::PeerId; + +use crate::{Connection, ConnectionError, Error, webtransport}; +use crate::transport::ConnectingMode; +use crate::webtransport::WebtransportConnectingError; /// A QUIC connection currently being negotiated. -#[derive(Debug)] pub struct Connecting { - connecting: Select, + connecting: Select< + BoxFuture<'static, Result<(PeerId, StreamMuxerBox), Error>>, Delay + >, } impl Connecting { - pub(crate) fn new(connection: quinn::Connecting, timeout: Duration) -> Self { + pub(crate) fn new(connection: quinn::Connecting, mode: ConnectingMode, timeout: Duration) -> Self { Connecting { - connecting: select(connection, Delay::new(timeout)), + connecting: select(handshake(connection, mode).boxed(), Delay::new(timeout)), } } } @@ -67,17 +78,108 @@ impl Connecting { } } +async fn handshake(connecting: quinn::Connecting, mode: ConnectingMode) -> Result<(PeerId, StreamMuxerBox), Error> { + match connecting.await { + Ok(connection) => { + let peer_id = Connecting::remote_peer_id(&connection); + match mode { + ConnectingMode::QUIC => { + let muxer = Connection::new(connection); + return Ok((peer_id, StreamMuxerBox::new(muxer))); + } + ConnectingMode::WebTransport(certhashes, noise_config) => { + let (peer_id, muxer) = webtransport_connecting(peer_id, connection.clone(), certhashes, noise_config) + .await?; + Ok((peer_id, StreamMuxerBox::new(muxer))) + } + ConnectingMode::Mixed(certhashes, noise_config) => { + let res = webtransport_connecting(peer_id, connection.clone(), certhashes, noise_config).await; + match res { + Ok((peer_id, muxer)) => Ok((peer_id, StreamMuxerBox::new(muxer))), + Err(WebtransportConnectingError::UnexpectedProtocol(_)) => { + let muxer = Connection::new(connection); + Ok((peer_id, StreamMuxerBox::new(muxer))) + } + Err(e) => Err(e.into()), + } + } + } + } + Err(e) => { + return Err(Error::Connection(ConnectionError(e))); + } + } +} + +async fn webtransport_connecting(peer_id: PeerId, + connection: quinn::Connection, + certhashes: Vec>, + noise_config: libp2p_noise::Config, +) -> Result<(PeerId, StreamMuxerBox), WebtransportConnectingError> { + loop { + let mut h3_conn = h3::server::builder() + .enable_webtransport(true) + .enable_connect(true) + .enable_datagram(false) + .max_webtransport_sessions(1) + .send_grease(true) + .build(h3_quinn::Connection::new(connection.clone())) + .await?; + + match h3_conn.accept().await { + Ok(Some((request, stream))) => { + let ext = request.extensions(); + let proto = ext.get::(); + if Some(&Protocol::WEB_TRANSPORT) == proto { + let method = request.method(); + if method != &Method::CONNECT { + return Err(WebtransportConnectingError::UnexpectedMethod(method.clone())) + } + if request.uri().path() != webtransport::WEBTRANSPORT_PATH { + return Err(WebtransportConnectingError::UnexpectedPath(String::from(request.uri().path()))) + } + if request.uri().query() == Some(webtransport::NOISE_QUERY) { + return Err(WebtransportConnectingError::UnexpectedQuery(String::from(request.uri().path()))) + } + let session = WebTransportSession::accept(request, stream, h3_conn).await?; + let arc_session = Arc::new(session); + let webtr_stream = webtransport::accept_webtransport_stream(&arc_session).await?; + + let mut certs = HashSet::new(); + certs.insert(certhashes.first().unwrap().clone()); + let t_noise = noise_config.with_webtransport_certhashes(certs); + t_noise.upgrade_inbound(webtr_stream, "").await?; + + let muxer = webtransport::Connection::new(arc_session, connection); + + return Ok((peer_id, StreamMuxerBox::new(muxer))); + } else { + return Err(WebtransportConnectingError::UnexpectedProtocol(proto.cloned())); + } + } + Ok(None) => { + // indicating no more streams to be received + return Err(WebtransportConnectingError::NoMoreStreams) + } + Err(err) => { + match err.get_error_level() { + ErrorLevel::ConnectionError => return Err(WebtransportConnectingError::Http3Error(err)), + ErrorLevel::StreamError => continue, + } + } + } + } +} + impl Future for Connecting { - type Output = Result<(PeerId, Connection), Error>; + type Output = Result<(PeerId, StreamMuxerBox), Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let connection = match futures::ready!(self.connecting.poll_unpin(cx)) { + let (peer_id, connection) = match futures::ready!(self.connecting.poll_unpin(cx)) { Either::Right(_) => return Poll::Ready(Err(Error::HandshakeTimedOut)), - Either::Left((connection, _)) => connection.map_err(ConnectionError)?, + Either::Left((res, _)) => res?, }; - let peer_id = Self::remote_peer_id(&connection); - let muxer = Connection::new(connection); - Poll::Ready(Ok((peer_id, muxer))) + Poll::Ready(Ok((peer_id, connection))) } -} +} \ No newline at end of file diff --git a/transports/quic/src/lib.rs b/transports/quic/src/lib.rs index 7ae649b6914..fe25eadd6ae 100644 --- a/transports/quic/src/lib.rs +++ b/transports/quic/src/lib.rs @@ -62,6 +62,7 @@ mod connection; mod hole_punching; mod provider; mod transport; +mod webtransport; use std::net::SocketAddr; diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index 9bd4c035cec..76bb03f7aaa 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -21,7 +21,7 @@ use crate::config::{Config, QuinnConfig}; use crate::hole_punching::hole_puncher; use crate::provider::Provider; -use crate::{ConnectError, Connecting, Connection, Error}; +use crate::{ConnectError, Connecting, Error}; use futures::channel::oneshot; use futures::future::{BoxFuture, Either}; @@ -49,10 +49,13 @@ use std::{ pin::Pin, task::{Context, Poll, Waker}, }; +use std::fmt::{Debug, Formatter}; +use libp2p_core::multihash::Multihash; +use libp2p_core::muxing::StreamMuxerBox; /// Implementation of the [`Transport`] trait for QUIC. /// -/// By default only QUIC Version 1 (RFC 9000) is supported. In the [`Multiaddr`] this maps to +/// By default, only QUIC Version 1 (RFC 9000) is supported. In the [`Multiaddr`] this maps to /// [`libp2p_core::multiaddr::Protocol::QuicV1`]. /// The [`libp2p_core::multiaddr::Protocol::Quic`] codepoint is interpreted as QUIC version /// draft-29 and only supported if [`Config::support_draft_29`] is set to `true`. @@ -131,19 +134,19 @@ impl GenTransport

{ /// Extract the addr, quic version and peer id from the given [`Multiaddr`]. fn remote_multiaddr_to_socketaddr( &self, - addr: Multiaddr, + addr: &Multiaddr, check_unspecified_addr: bool, ) -> Result< - (SocketAddr, ProtocolVersion, Option), + (SocketAddr, ProtocolVersion, Option, bool), TransportError<::Error>, > { - let (socket_addr, version, peer_id) = multiaddr_to_socketaddr(&addr, self.support_draft_29) + let (socket_addr, version, peer_id, wt) = multiaddr_to_socketaddr(addr, self.support_draft_29) .ok_or_else(|| TransportError::MultiaddrNotSupported(addr.clone()))?; if check_unspecified_addr && (socket_addr.port() == 0 || socket_addr.ip().is_unspecified()) { - return Err(TransportError::MultiaddrNotSupported(addr)); + return Err(TransportError::MultiaddrNotSupported(addr.clone())); } - Ok((socket_addr, version, peer_id)) + Ok((socket_addr, version, peer_id, wt)) } /// Pick any listener to use for dialing. @@ -198,7 +201,7 @@ impl GenTransport

{ } impl Transport for GenTransport

{ - type Output = (PeerId, Connection); + type Output = (PeerId, StreamMuxerBox); type Error = Error; type ListenerUpgrade = Connecting; type Dial = BoxFuture<'static, Result>; @@ -208,21 +211,42 @@ impl Transport for GenTransport

{ listener_id: ListenerId, addr: Multiaddr, ) -> Result<(), TransportError> { - let (socket_addr, version, _peer_id) = self.remote_multiaddr_to_socketaddr(addr, false)?; - let endpoint_config = self.quinn_config.endpoint_config.clone(); - let server_config = self.quinn_config.server_config.clone(); - let socket = self.create_socket(socket_addr).map_err(Self::Error::from)?; - - let socket_c = socket.try_clone().map_err(Self::Error::from)?; - let endpoint = Self::new_endpoint(endpoint_config, Some(server_config), socket)?; - let listener = Listener::new( - listener_id, - socket_c, - endpoint, - self.handshake_timeout, - version, - )?; - self.listeners.push(listener); + let (socket_addr, version, _peer_id, wt) = self.remote_multiaddr_to_socketaddr(&addr, false)?; + + if wt && self.quinn_config.webtransport_certhashes.is_empty() { + return Err(TransportError::MultiaddrNotSupported(addr)); + } + + let noise_config = libp2p_noise::Config::new(&self.quinn_config.keypair) + .expect("Getting a noise configuration from the node keypair"); + + if let Some(listener) = self.listeners.iter_mut() + .find(|l| !l.is_closed && l.socket_addr() == socket_addr) { + listener.update(wt, &self.quinn_config.webtransport_certhashes, noise_config); + } else { + let socket = self.create_socket(socket_addr).map_err(Self::Error::from)?; + let socket_c = socket.try_clone().map_err(Self::Error::from)?; + let endpoint_config = self.quinn_config.endpoint_config.clone(); + let server_config = self.quinn_config.server_config.clone(); + let endpoint = Self::new_endpoint(endpoint_config, Some(server_config), socket)?; + + let mode = if wt { + ConnectingMode::WebTransport(self.quinn_config.webtransport_certhashes.clone(), noise_config) + } else { + ConnectingMode::QUIC + }; + + let listener = Listener::new( + listener_id, + socket_c, + endpoint, + self.handshake_timeout, + version, + mode, + )?; + + self.listeners.push(listener); + } if let Some(waker) = self.waker.take() { waker.wake(); @@ -257,7 +281,11 @@ impl Transport for GenTransport

{ } fn dial(&mut self, addr: Multiaddr) -> Result> { - let (socket_addr, version, _peer_id) = self.remote_multiaddr_to_socketaddr(addr, true)?; + let (socket_addr, version, _peer_id, wt) = self.remote_multiaddr_to_socketaddr(&addr, true)?; + + if wt { + return Err(TransportError::MultiaddrNotSupported(addr)); + } let endpoint = match self.eligible_listener(&socket_addr) { None => { @@ -298,7 +326,7 @@ impl Transport for GenTransport

{ let connecting = endpoint .connect_with(client_config, socket_addr, "l") .map_err(ConnectError)?; - Connecting::new(connecting, handshake_timeout).await + Connecting::new(connecting, ConnectingMode::QUIC, handshake_timeout).await })) } @@ -306,8 +334,8 @@ impl Transport for GenTransport

{ &mut self, addr: Multiaddr, ) -> Result> { - let (socket_addr, _version, peer_id) = - self.remote_multiaddr_to_socketaddr(addr.clone(), true)?; + let (socket_addr, _version, peer_id, _wt) = + self.remote_multiaddr_to_socketaddr(&addr, true)?; let peer_id = peer_id.ok_or(TransportError::MultiaddrNotSupported(addr.clone()))?; let socket = self @@ -444,6 +472,8 @@ struct Listener { close_listener_waker: Option, listening_addresses: HashSet, + + mode: ConnectingMode, } impl Listener

{ @@ -453,6 +483,7 @@ impl Listener

{ endpoint: quinn::Endpoint, handshake_timeout: Duration, version: ProtocolVersion, + mode: ConnectingMode, ) -> Result { let if_watcher; let pending_event; @@ -464,7 +495,8 @@ impl Listener

{ } else { if_watcher = None; listening_addresses.insert(local_addr.ip()); - let ma = socketaddr_to_multiaddr(&local_addr, version); + let mut ma = socketaddr_to_multiaddr(&local_addr, version); + ma = mode.update_multiaddr(ma); pending_event = Some(TransportEvent::NewAddress { listener_id, listen_addr: ma, @@ -486,6 +518,7 @@ impl Listener

{ pending_event, close_listener_waker: None, listening_addresses, + mode, }) } @@ -528,9 +561,10 @@ impl Listener

{ loop { match ready!(P::poll_if_event(if_watcher, cx)) { Ok(IfEvent::Up(inet)) => { - if let Some(listen_addr) = + if let Some(mut listen_addr) = ip_to_listenaddr(&endpoint_addr, inet.addr(), self.version) { + listen_addr = self.mode.update_multiaddr(listen_addr); tracing::debug!( address=%listen_addr, "New listen address" @@ -543,9 +577,10 @@ impl Listener

{ } } Ok(IfEvent::Down(inet)) => { - if let Some(listen_addr) = + if let Some(mut listen_addr) = ip_to_listenaddr(&endpoint_addr, inet.addr(), self.version) { + listen_addr = self.mode.update_multiaddr(listen_addr); tracing::debug!( address=%listen_addr, "Expired listen address" @@ -566,6 +601,33 @@ impl Listener

{ } } } + + fn update(&mut self, + is_webtransport: bool, + cert_hashes: &Vec>, + noise_cfg: libp2p_noise::Config) { + let cur_mode = &self.mode; + match cur_mode { + ConnectingMode::QUIC => { + if is_webtransport { + self.mode = ConnectingMode::Mixed(cert_hashes.clone(), noise_cfg) + } + } + ConnectingMode::WebTransport(certs, cfg) => { + if !is_webtransport { + self.mode = ConnectingMode::Mixed(certs.clone(), cfg.clone()) + } + } + ConnectingMode::Mixed(_, _) => { + self.mode = ConnectingMode::Mixed(cert_hashes.clone(), noise_cfg) + } + } + } + + fn socketaddr_to_multiaddr(&self, socket_addr: &SocketAddr) -> Multiaddr { + let res = socketaddr_to_multiaddr(socket_addr, self.version); + self.mode.update_multiaddr(res) + } } impl Stream for Listener

{ @@ -597,12 +659,13 @@ impl Stream for Listener

{ } }; - let local_addr = socketaddr_to_multiaddr(&self.socket_addr(), self.version); + let local_addr = self.socketaddr_to_multiaddr(&self.socket_addr()); let remote_addr = connecting.remote_address(); - let send_back_addr = socketaddr_to_multiaddr(&remote_addr, self.version); + let send_back_addr = self.socketaddr_to_multiaddr(&remote_addr); + let mode = self.mode.clone(); let event = TransportEvent::Incoming { - upgrade: Connecting::new(connecting, self.handshake_timeout), + upgrade: Connecting::new(connecting, mode, self.handshake_timeout), local_addr, send_back_addr, listener_id: self.listener_id, @@ -630,6 +693,7 @@ impl fmt::Debug for Listener

{ .field("handshake_timeout", &self.handshake_timeout) .field("is_closed", &self.is_closed) .field("pending_event", &self.pending_event) + .field("mode", &self.mode) .finish() } } @@ -646,6 +710,47 @@ pub(crate) enum SocketFamily { Ipv6, } +#[derive(Clone)] +pub(crate) enum ConnectingMode { + QUIC, + WebTransport(Vec>, libp2p_noise::Config), + Mixed(Vec>, libp2p_noise::Config), +} + +impl ConnectingMode { + pub(crate) fn update_multiaddr(&self, addr: Multiaddr) -> Multiaddr { + fn add_web_transport_protocol(v: &Vec>, addr: Multiaddr) -> Multiaddr { + let mut vec = v.clone(); + let mut res = addr.with(Protocol::WebTransport) + .with(Protocol::Certhash(vec.pop().expect("Gets the last element"))); + if !vec.is_empty() { + res = res.with(Protocol::Certhash(vec.pop().expect("Gets the last element"))); + }; + + res + } + + match self { + ConnectingMode::QUIC => addr, + ConnectingMode::WebTransport(hashes, _) => add_web_transport_protocol(hashes, addr), + ConnectingMode::Mixed(hashes, _) => add_web_transport_protocol(hashes, addr), + } + } +} + +impl Debug for ConnectingMode { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + ConnectingMode::QUIC => + write!(f, "ConnectingMode::QUIC"), + ConnectingMode::WebTransport(certs, _) => + write!(f, "ConnectingMode::WebTransport(hashes:{:?})", certs), + ConnectingMode::Mixed(certs, _) => + write!(f, "ConnectingMode::Mixed(hashes:{:?})", certs), + } + } +} + impl SocketFamily { fn is_same(a: &IpAddr, b: &IpAddr) -> bool { matches!( @@ -690,18 +795,25 @@ fn ip_to_listenaddr( fn multiaddr_to_socketaddr( addr: &Multiaddr, support_draft_29: bool, -) -> Option<(SocketAddr, ProtocolVersion, Option)> { +) -> Option<(SocketAddr, ProtocolVersion, Option, bool)> { let mut iter = addr.iter(); let proto1 = iter.next()?; let proto2 = iter.next()?; let proto3 = iter.next()?; let mut peer_id = None; + let mut is_webtransport = false; for proto in iter { match proto { Protocol::P2p(id) => { peer_id = Some(id); } + Protocol::WebTransport => { + is_webtransport = true; + } + Protocol::Certhash(_) => { + panic!("Cannot listen on a specific certhash for WebTransport addr {addr}"); + } _ => return None, } } @@ -713,10 +825,10 @@ fn multiaddr_to_socketaddr( match (proto1, proto2) { (Protocol::Ip4(ip), Protocol::Udp(port)) => { - Some((SocketAddr::new(ip.into(), port), version, peer_id)) + Some((SocketAddr::new(ip.into(), port), version, peer_id, is_webtransport)) } (Protocol::Ip6(ip), Protocol::Udp(port)) => { - Some((SocketAddr::new(ip.into(), port), version, peer_id)) + Some((SocketAddr::new(ip.into(), port), version, peer_id, is_webtransport)) } _ => None, } @@ -785,7 +897,8 @@ mod tests { Some(( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12345,), ProtocolVersion::V1, - None + None, + false )) ); assert_eq!( @@ -798,7 +911,8 @@ mod tests { Some(( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), 8080,), ProtocolVersion::V1, - None + None, + false )) ); assert_eq!( @@ -810,7 +924,8 @@ mod tests { Some((SocketAddr::new( IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 55148, - ), ProtocolVersion::V1, Some("12D3KooW9xk7Zp1gejwfwNpfm6L9zH5NL4Bx5rm94LRYJJHJuARZ".parse().unwrap()))) + ), ProtocolVersion::V1, Some("12D3KooW9xk7Zp1gejwfwNpfm6L9zH5NL4Bx5rm94LRYJJHJuARZ".parse().unwrap()), + false)) ); assert_eq!( multiaddr_to_socketaddr( @@ -820,7 +935,8 @@ mod tests { Some(( SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), 12345,), ProtocolVersion::V1, - None + None, + false )) ); assert_eq!( @@ -838,7 +954,8 @@ mod tests { 8080, ), ProtocolVersion::V1, - None + None, + false )) ); @@ -855,7 +972,21 @@ mod tests { Some(( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234,), ProtocolVersion::Draft29, - None + None, + false + )) + ); + + assert_eq!( + multiaddr_to_socketaddr( + &"/ip4/127.0.0.1/udp/1234/quic/webtransport".parse::().unwrap(), + true, + ), + Some(( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234), + ProtocolVersion::Draft29, + None, + true )) ); } @@ -864,7 +995,7 @@ mod tests { #[async_std::test] async fn test_close_listener() { let keypair = libp2p_identity::Keypair::generate_ed25519(); - let config = Config::new(&keypair); + let config = Config::new(&keypair, None); let mut transport = crate::async_std::Transport::new(config); assert!(poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)) .now_or_never() @@ -917,7 +1048,7 @@ mod tests { #[tokio::test] async fn test_dialer_drop() { let keypair = libp2p_identity::Keypair::generate_ed25519(); - let config = Config::new(&keypair); + let config = Config::new(&keypair, None); let mut transport = crate::tokio::Transport::new(config); let _dial = transport @@ -941,7 +1072,7 @@ mod tests { #[tokio::test] async fn test_listens_ipv4_ipv6_separately() { let keypair = libp2p_identity::Keypair::generate_ed25519(); - let config = Config::new(&keypair); + let config = Config::new(&keypair, None); let mut transport = crate::tokio::Transport::new(config); let port = { let socket = UdpSocket::bind("127.0.0.1:0").unwrap(); diff --git a/transports/quic/src/webtransport.rs b/transports/quic/src/webtransport.rs new file mode 100644 index 00000000000..a51fa547c15 --- /dev/null +++ b/transports/quic/src/webtransport.rs @@ -0,0 +1,94 @@ +use std::fmt::{Debug, Display, Formatter, Write}; +use std::io; +use std::io::ErrorKind; + +use h3::ext::Protocol; +use http::Method; + +pub(crate) use certificates::alpn_protocols; +pub(crate) use certificates::Certificate; +pub(crate) use connection::accept_webtransport_stream; +pub(crate) use connection::Connection; + +use crate::Error; + +mod certificates; +mod connection; + +pub(crate) const WEBTRANSPORT_PATH: &str = "/.well-known/libp2p-webtransport"; +pub(crate) const NOISE_QUERY: &str = "type=noise"; + +#[derive(Debug)] +pub enum WebtransportConnectingError { + UnexpectedProtocol(Option), + UnexpectedMethod(Method), + UnexpectedPath(String), + UnexpectedQuery(String), + + ConnectionError(quinn::ConnectionError), + Http3Error(h3::Error), + NoiseError(libp2p_noise::Error), + NoMoreStreams, + CrateError(Error), +} + +impl Display for WebtransportConnectingError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + WebtransportConnectingError::UnexpectedProtocol(p) => + write!(f, "Unexpected request protocol {:?}", p), + WebtransportConnectingError::UnexpectedMethod(m) => + write!(f, "Unexpected initial request method {:?}", m), + WebtransportConnectingError::UnexpectedPath(p) => + write!(f, "Unexpected initial request path {}", p), + WebtransportConnectingError::UnexpectedQuery(q) => + write!(f, "Unexpected initial request query {}", q), + WebtransportConnectingError::ConnectionError(e) => + write!(f, "Connection error: {}", e), + WebtransportConnectingError::Http3Error(e) => + write!(f, "Http3 error: {}", e), + WebtransportConnectingError::NoiseError(e) => + write!(f, "Noise error: {}", e), + WebtransportConnectingError::NoMoreStreams => + write!(f, "No more streams"), + WebtransportConnectingError::CrateError(e) => + write!(f, "{}", e), + } + } +} + +impl std::error::Error for WebtransportConnectingError {} + +impl From for WebtransportConnectingError { + fn from(value: Error) -> Self { + WebtransportConnectingError::CrateError(value) + } +} + +impl From for Error { + fn from(value: WebtransportConnectingError) -> Self { + if let WebtransportConnectingError::CrateError(e) = value { + e + } else { + Error::Io(io::Error::new(ErrorKind::Other, value)) + } + } +} + +impl From for WebtransportConnectingError { + fn from(e: libp2p_noise::Error) -> Self { + WebtransportConnectingError::NoiseError(e) + } +} + +impl From for WebtransportConnectingError { + fn from(e: h3::Error) -> Self { + WebtransportConnectingError::Http3Error(e) + } +} + +impl From for WebtransportConnectingError { + fn from(e: quinn::ConnectionError) -> Self { + WebtransportConnectingError::ConnectionError(e) + } +} \ No newline at end of file diff --git a/transports/quic/src/webtransport/certificates.rs b/transports/quic/src/webtransport/certificates.rs new file mode 100644 index 00000000000..a8ba919b917 --- /dev/null +++ b/transports/quic/src/webtransport/certificates.rs @@ -0,0 +1,166 @@ +use std::io; +use std::io::{Cursor, Read, Write}; +use rustls::pki_types::{CertificateDer, PrivateKeyDer}; +use time::{Duration, OffsetDateTime}; +use libp2p_core::multihash::Multihash; +use libp2p_tls::certificate; +use sha2::Digest; + +const MULTIHASH_SHA256_CODE: u64 = 0x12; +const CERT_VALID_PERIOD: Duration = Duration::days(14); + +pub(crate) fn alpn_protocols() -> Vec> { + vec![b"libp2p".to_vec(), + b"h3".to_vec(), + b"h3-32".to_vec(), + b"h3-31".to_vec(), + b"h3-30".to_vec(), + b"h3-29".to_vec(), ] +} + + +/* +I would like to avoid interacting with the file system as much as possible. +My suggestion would be: +- libp2p::webtransport::Transport::new takes a list of certificates (of type libp2p::webtransport::Certificate) +- libp2p::webtransport::Certificate::generate allows users generate a new certificate with certain parameters (validity date etc) +- libp2p::webtransport::Certificate::{parse,to_bytes} allow users to serialize and deserialize certificates +*/ +#[derive(Debug, PartialEq, Eq)] +pub struct Certificate { + pub cert: CertificateDer<'static>, + pub private_key: PrivateKeyDer<'static>, + pub not_before: OffsetDateTime, + pub not_after: OffsetDateTime, +} + +#[derive(Debug)] +pub enum Error { + GenError(certificate::GenError), + IoError(io::Error), +} + +impl From for Error { + fn from(value: certificate::GenError) -> Self { + Self::GenError(value) + } +} + +impl From for Error { + fn from(value: io::Error) -> Self { + Self::IoError(value) + } +} + +impl Clone for Certificate { + fn clone(&self) -> Self { + Self { + cert: self.cert.clone(), + private_key: self.private_key.clone_key(), + not_before: self.not_before.clone(), + not_after: self.not_after.clone(), + } + } +} + +impl Certificate { + pub fn generate( + identity_keypair: &libp2p_identity::Keypair, + not_before: OffsetDateTime, + ) -> Result { + let not_after = not_before.clone() + .checked_add(CERT_VALID_PERIOD) + .expect("Addition does not overflow"); + let (cert, private_key) = certificate::generate_with_validity_period( + identity_keypair, + not_before.clone(), + not_after.clone(), + )?; + + Ok(Self { cert, private_key, not_before, not_after }) + } + + pub(crate) fn cert_hash(&self) -> Multihash<64> { + Multihash::wrap( + MULTIHASH_SHA256_CODE, sha2::Sha256::digest(&self.cert.as_ref().as_ref()).as_ref(), + ).expect("fingerprint's len to be 32 bytes") + } + + pub fn to_bytes(&self) -> Vec { + let mut bytes = Vec::new(); + + Self::write_data(&mut bytes, self.cert.as_ref()) + .expect("Write cert data"); + Self::write_data(&mut bytes, self.private_key.secret_der()) + .expect("Write private_key data"); + + let nb_buff = self.not_before.unix_timestamp().to_be_bytes(); + std::io::Write::write(&mut bytes, &nb_buff) + .expect("Write not_before"); + + let na_buff = self.not_after.unix_timestamp().to_be_bytes(); + std::io::Write::write(&mut bytes, &na_buff) + .expect("Write not_after"); + + bytes + } + + pub fn parse(data: &[u8]) -> Result { + let mut cursor = Cursor::new(data); + let cert_data = Self::read_data(&mut cursor)?; + let private_key_data = Self::read_data(&mut cursor)?; + let nb = Self::read_i64(&mut cursor).unwrap(); + let na = Self::read_i64(&mut cursor).unwrap(); + + let cert = CertificateDer::from(cert_data); + let private_key = PrivateKeyDer::try_from(private_key_data).unwrap(); + let not_before = OffsetDateTime::from_unix_timestamp(nb).unwrap(); + let not_after = OffsetDateTime::from_unix_timestamp(na).unwrap(); + + Ok(Self { cert, private_key, not_before, not_after }) + } + + fn write_data(w: &mut W, data: &[u8]) -> Result<(), io::Error> { + let size = data.len() as u64; + let mut size_buf = size.to_be_bytes(); + + w.write_all(&mut size_buf)?; + w.write_all(data)?; + + Ok(()) + } + + fn read_data(r: &mut R) -> Result, io::Error> { + let size = Self::read_i64(r)? as usize; + let mut res = vec![0u8; size]; + + r.read(res.as_mut_slice())?; + + Ok(res) + } + + fn read_i64(r: &mut R) -> Result { + let mut buffer = [0u8; 8]; + r.read(&mut buffer)?; + + Ok(i64::from_be_bytes(buffer)) + } +} + +#[cfg(test)] +mod tests { + use time::macros::datetime; + + #[test] + fn test_certificate_parsing() { + let keypair = libp2p_identity::Keypair::generate_ed25519(); + let not_before = datetime!(2025-08-08 0:00 UTC); + let cert = super::Certificate::generate(&keypair, not_before).unwrap(); + + let binary_data = cert.to_bytes(); + let actual = super::Certificate::parse(binary_data.as_slice()) + .unwrap(); + + assert_eq!(actual, cert); + } +} \ No newline at end of file diff --git a/transports/quic/src/webtransport/connection.rs b/transports/quic/src/webtransport/connection.rs new file mode 100644 index 00000000000..d3d5e54b616 --- /dev/null +++ b/transports/quic/src/webtransport/connection.rs @@ -0,0 +1,172 @@ +use std::fmt::{Display, Formatter}; +use std::io; +use std::io::ErrorKind; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use bytes::Bytes; +use futures::{AsyncRead, AsyncWrite, FutureExt}; +use futures::future::BoxFuture; +use h3::quic; +use h3_webtransport::server::{AcceptedBi, WebTransportSession}; +use h3_webtransport::stream::{RecvStream, SendStream}; + +use libp2p_core::muxing::StreamMuxerEvent; +use libp2p_core::StreamMuxer; + +use crate::{ConnectionError, Error}; + +type SendPart = SendStream, Bytes>; +type RecvPart = RecvStream; + +pub(crate) struct Connection { + session: Arc>, + incoming: Option>>, + /// Underlying connection. + connection: quinn::Connection, + /// Future to wait for the connection to be closed. + closing: Option>, +} + +impl Connection { + pub(crate) fn new( + session: Arc>, + connection: quinn::Connection + ) -> Self { + Self { + session, + incoming: None, + connection, + closing: None, + } + } +} + +impl StreamMuxer for Connection { + type Substream = Stream; + type Error = Error; + + fn poll_inbound(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + let t_session = Arc::clone(&this.session); + + let inbound = this.incoming.get_or_insert_with(|| { + async move { accept_webtransport_stream(&t_session).await }.boxed() + }); + + let res = futures::ready!(inbound.poll_unpin(cx))?; + this.incoming.take(); + Poll::Ready(Ok(res)) + } + + fn poll_outbound(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + panic!("WebTransport implementation doesn't support outbound streams.") + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + let closing = this.closing.get_or_insert_with(|| { + this.connection.close(From::from(0u32), &[]); + let connection = this.connection.clone(); + async move { connection.closed().await }.boxed() + }); + + match futures::ready!(closing.poll_unpin(cx)) { + // Expected error given that `connection.close` was called above. + quinn::ConnectionError::LocallyClosed => {} + error => return Poll::Ready(Err(Error::Connection(ConnectionError(error)))), + }; + + Poll::Ready(Ok(())) + } + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + // TODO: If connection migration is enabled (currently disabled) address + // change on the connection needs to be handled. + Poll::Pending + } +} + +/// A single stream on a connection +pub(crate) struct Stream { + send: SendPart, + recv: RecvPart, + /// Whether the stream is closed or not + close_result: Option>, +} + +impl Stream { + pub(crate) fn new(send: SendPart, recv: RecvPart) -> Self { + Self { send, recv, close_result: None } + } +} + +impl AsyncRead for Stream { + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + if let Some(close_result) = self.close_result { + if close_result.is_err() { + return Poll::Ready(Ok(0)); + } + } + + Pin::new(&mut self.recv).poll_read(cx, buf) + } +} + +impl AsyncWrite for Stream { + fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + Pin::new(&mut self.send) + .poll_write(cx, buf) + .map_err(Into::into) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.send).poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + if let Some(close_result) = self.close_result { + // For some reason poll_close needs to be 'fuse'able + return Poll::Ready(close_result.map_err(Into::into)); + } + let close_result = futures::ready!(Pin::new(&mut self.send).poll_close(cx)); + self.close_result = Some(close_result.as_ref().map_err(|e| e.kind()).copied()); + Poll::Ready(close_result) + } +} + +#[derive(Debug)] +pub struct WebtransportError(String); + +impl Display for WebtransportError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl std::error::Error for WebtransportError {} + +impl From for Error { + fn from(value: WebtransportError) -> Self { + Error::Io(io::Error::new(ErrorKind::Other, value)) + } +} + +pub(crate) async fn accept_webtransport_stream( + session: &Arc>, +) -> Result { + let t_session = Arc::clone(session); + + match t_session.accept_bi().await { + Ok(Some(AcceptedBi::BidiStream(_, stream))) => { + let (send, recv) = quic::BidiStream::split(stream); + Ok(Stream::new(send, recv)) + } + Ok(_) => Err( + WebtransportError(String::from("Only bidirectional streams are supported")).into() + ), + Err(e) => Err(Error::Io(io::Error::new(ErrorKind::Other, e))), + } +} \ No newline at end of file diff --git a/transports/tls/Cargo.toml b/transports/tls/Cargo.toml index c4b30951e66..0bfa64cc134 100644 --- a/transports/tls/Cargo.toml +++ b/transports/tls/Cargo.toml @@ -19,6 +19,7 @@ thiserror = "1.0.61" webpki = { version = "0.101.4", package = "rustls-webpki", features = ["std"] } x509-parser = "0.16.0" yasna = "0.5.2" +time = "0.3" # Exposed dependencies. Breaking changes to these are breaking changes to us. [dependencies.rustls] diff --git a/transports/tls/src/certificate.rs b/transports/tls/src/certificate.rs index 65b373bcf9b..0d3c56f33b2 100644 --- a/transports/tls/src/certificate.rs +++ b/transports/tls/src/certificate.rs @@ -27,6 +27,7 @@ use libp2p_identity::PeerId; use x509_parser::{prelude::*, signature_algorithm::SignatureAlgorithm}; use std::sync::Arc; +use ::time::OffsetDateTime; /// The libp2p Public Key Extension is a X.509 extension /// with the Object Identifier 1.3.6.1.4.1.53594.1.1, @@ -121,6 +122,46 @@ pub fn generate( Ok((rustls_certificate, rustls_key)) } +pub fn generate_with_validity_period( + identity_keypair: &identity::Keypair, + not_before: OffsetDateTime, + not_after: OffsetDateTime, +) -> Result< + ( + rustls::pki_types::CertificateDer<'static>, + rustls::pki_types::PrivateKeyDer<'static>, + ), + GenError, +> { + // Keypair used to sign the certificate. + // SHOULD NOT be related to the host's key. + // Endpoints MAY generate a new key and certificate + // for every connection attempt, or they MAY reuse the same key + // and certificate for multiple connections. + let certificate_keypair = rcgen::KeyPair::generate(P2P_SIGNATURE_ALGORITHM)?; + let rustls_key = rustls::pki_types::PrivateKeyDer::from( + rustls::pki_types::PrivatePkcs8KeyDer::from(certificate_keypair.serialize_der()), + ); + + let certificate = { + let mut params = rcgen::CertificateParams::new(vec![]); + params.distinguished_name = rcgen::DistinguishedName::new(); + params.custom_extensions.push(make_libp2p_extension( + identity_keypair, + &certificate_keypair, + )?); + params.alg = P2P_SIGNATURE_ALGORITHM; + params.key_pair = Some(certificate_keypair); + params.not_before = not_before; + params.not_after = not_after; + rcgen::Certificate::from_params(params)? + }; + + let rustls_certificate = rustls::pki_types::CertificateDer::from(certificate.serialize_der()?); + + Ok((rustls_certificate, rustls_key)) +} + /// Attempts to parse the provided bytes as a [`P2pCertificate`]. /// /// For this to succeed, the certificate must contain the specified extension and the signature must diff --git a/transports/tls/src/lib.rs b/transports/tls/src/lib.rs index 3aa66db12b3..5539dcdff5c 100644 --- a/transports/tls/src/lib.rs +++ b/transports/tls/src/lib.rs @@ -35,6 +35,7 @@ use libp2p_identity::PeerId; use std::sync::Arc; pub use futures_rustls::TlsStream; +use rustls::pki_types::{CertificateDer, PrivateKeyDer}; pub use upgrade::Config; pub use upgrade::UpgradeError; @@ -91,3 +92,27 @@ pub fn make_server_config( Ok(crypto) } + +/// Create a TLS server configuration for libp2p. +pub fn make_webtransport_server_config( + certificate: &CertificateDer<'static>, + private_key: &PrivateKeyDer, + protocols: Vec>, +) -> Result { + let mut provider = rustls::crypto::ring::default_provider(); + provider.cipher_suites = verifier::CIPHERSUITES.to_vec(); + + let cert_resolver = Arc::new( + AlwaysResolvesCert::new(certificate.clone(), private_key) + .expect("Server cert key DER is valid; qed"), + ); + + let mut crypto = rustls::ServerConfig::builder_with_provider(provider.into()) + .with_protocol_versions(verifier::PROTOCOL_VERSIONS) + .expect("Cipher suites and kx groups are configured; qed") + .with_client_cert_verifier(Arc::new(verifier::Libp2pCertificateVerifier::new())) + .with_cert_resolver(cert_resolver); + crypto.alpn_protocols = protocols.to_vec(); + + Ok(crypto) +} From 07afc3fcbb2c708edf63b940864b93d7018bdeb3 Mon Sep 17 00:00:00 2001 From: dgarus Date: Tue, 20 Aug 2024 16:30:25 +0300 Subject: [PATCH 2/7] WebTransport above QUIC transport --- transports/quic/src/config.rs | 23 +++-- transports/quic/src/connection/connecting.rs | 86 +++++++++++++------ transports/quic/src/transport.rs | 83 +++++++++++------- transports/quic/src/webtransport.rs | 37 ++++---- .../quic/src/webtransport/certificates.rs | 60 +++++++------ .../quic/src/webtransport/connection.rs | 45 +++++++--- transports/tls/src/certificate.rs | 2 +- 7 files changed, 208 insertions(+), 128 deletions(-) diff --git a/transports/quic/src/config.rs b/transports/quic/src/config.rs index d20817d2a49..319d9446b39 100644 --- a/transports/quic/src/config.rs +++ b/transports/quic/src/config.rs @@ -88,21 +88,18 @@ impl Config { ); let server_config = match &webtransport_cert { None => libp2p_tls::make_server_config(keypair).unwrap(), - Some(c) => { - libp2p_tls::make_webtransport_server_config( - &c.cert, - &c.private_key, - webtransport::alpn_protocols() - ).unwrap() - }, + Some(c) => libp2p_tls::make_webtransport_server_config( + &c.cert, + &c.private_key, + webtransport::alpn_protocols(), + ) + .unwrap(), }; - let server_tls_config = Arc::new( - QuicServerConfig::try_from(server_config).unwrap(), - ); + let server_tls_config = Arc::new(QuicServerConfig::try_from(server_config).unwrap()); let webtransport_certhashes: Vec> = match webtransport_cert { - Some(c) => vec!(c.cert_hash()), - None => vec!() + Some(c) => vec![c.cert_hash()], + None => vec![], }; Self { @@ -205,7 +202,7 @@ impl From for QuinnConfig { server_config, endpoint_config, keypair, - webtransport_certhashes + webtransport_certhashes, } } } diff --git a/transports/quic/src/connection/connecting.rs b/transports/quic/src/connection/connecting.rs index c2ddfc2bdff..ddd693064b2 100644 --- a/transports/quic/src/connection/connecting.rs +++ b/transports/quic/src/connection/connecting.rs @@ -20,15 +20,19 @@ //! Future that drives a QUIC connection until is has performed its TLS handshake. -use std::{pin::Pin, task::{Context, Poll}, time::Duration}; use std::collections::HashSet; use std::sync::Arc; +use std::{ + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; +use futures::future::BoxFuture; use futures::{ - future::{Either, FutureExt, select, Select}, + future::{select, Either, FutureExt, Select}, prelude::*, }; -use futures::future::BoxFuture; use futures_timer::Delay; use h3::error::ErrorLevel; use h3::ext::Protocol; @@ -41,19 +45,21 @@ use libp2p_core::muxing::StreamMuxerBox; use libp2p_core::upgrade::InboundConnectionUpgrade; use libp2p_identity::PeerId; -use crate::{Connection, ConnectionError, Error, webtransport}; use crate::transport::ConnectingMode; use crate::webtransport::WebtransportConnectingError; +use crate::{webtransport, Connection, ConnectionError, Error}; /// A QUIC connection currently being negotiated. pub struct Connecting { - connecting: Select< - BoxFuture<'static, Result<(PeerId, StreamMuxerBox), Error>>, Delay - >, + connecting: Select>, Delay>, } impl Connecting { - pub(crate) fn new(connection: quinn::Connecting, mode: ConnectingMode, timeout: Duration) -> Self { + pub(crate) fn new( + connection: quinn::Connecting, + mode: ConnectingMode, + timeout: Duration, + ) -> Self { Connecting { connecting: select(handshake(connection, mode).boxed(), Delay::new(timeout)), } @@ -78,7 +84,10 @@ impl Connecting { } } -async fn handshake(connecting: quinn::Connecting, mode: ConnectingMode) -> Result<(PeerId, StreamMuxerBox), Error> { +async fn handshake( + connecting: quinn::Connecting, + mode: ConnectingMode, +) -> Result<(PeerId, StreamMuxerBox), Error> { match connecting.await { Ok(connection) => { let peer_id = Connecting::remote_peer_id(&connection); @@ -88,12 +97,23 @@ async fn handshake(connecting: quinn::Connecting, mode: ConnectingMode) -> Resul return Ok((peer_id, StreamMuxerBox::new(muxer))); } ConnectingMode::WebTransport(certhashes, noise_config) => { - let (peer_id, muxer) = webtransport_connecting(peer_id, connection.clone(), certhashes, noise_config) - .await?; + let (peer_id, muxer) = webtransport_connecting( + peer_id, + connection.clone(), + certhashes, + noise_config, + ) + .await?; Ok((peer_id, StreamMuxerBox::new(muxer))) } ConnectingMode::Mixed(certhashes, noise_config) => { - let res = webtransport_connecting(peer_id, connection.clone(), certhashes, noise_config).await; + let res = webtransport_connecting( + peer_id, + connection.clone(), + certhashes, + noise_config, + ) + .await; match res { Ok((peer_id, muxer)) => Ok((peer_id, StreamMuxerBox::new(muxer))), Err(WebtransportConnectingError::UnexpectedProtocol(_)) => { @@ -111,10 +131,11 @@ async fn handshake(connecting: quinn::Connecting, mode: ConnectingMode) -> Resul } } -async fn webtransport_connecting(peer_id: PeerId, - connection: quinn::Connection, - certhashes: Vec>, - noise_config: libp2p_noise::Config, +async fn webtransport_connecting( + peer_id: PeerId, + connection: quinn::Connection, + certhashes: Vec>, + noise_config: libp2p_noise::Config, ) -> Result<(PeerId, StreamMuxerBox), WebtransportConnectingError> { loop { let mut h3_conn = h3::server::builder() @@ -133,17 +154,24 @@ async fn webtransport_connecting(peer_id: PeerId, if Some(&Protocol::WEB_TRANSPORT) == proto { let method = request.method(); if method != &Method::CONNECT { - return Err(WebtransportConnectingError::UnexpectedMethod(method.clone())) + return Err(WebtransportConnectingError::UnexpectedMethod( + method.clone(), + )); } if request.uri().path() != webtransport::WEBTRANSPORT_PATH { - return Err(WebtransportConnectingError::UnexpectedPath(String::from(request.uri().path()))) + return Err(WebtransportConnectingError::UnexpectedPath(String::from( + request.uri().path(), + ))); } if request.uri().query() == Some(webtransport::NOISE_QUERY) { - return Err(WebtransportConnectingError::UnexpectedQuery(String::from(request.uri().path()))) + return Err(WebtransportConnectingError::UnexpectedQuery(String::from( + request.uri().path(), + ))); } let session = WebTransportSession::accept(request, stream, h3_conn).await?; let arc_session = Arc::new(session); - let webtr_stream = webtransport::accept_webtransport_stream(&arc_session).await?; + let webtr_stream = + webtransport::accept_webtransport_stream(&arc_session).await?; let mut certs = HashSet::new(); certs.insert(certhashes.first().unwrap().clone()); @@ -154,19 +182,21 @@ async fn webtransport_connecting(peer_id: PeerId, return Ok((peer_id, StreamMuxerBox::new(muxer))); } else { - return Err(WebtransportConnectingError::UnexpectedProtocol(proto.cloned())); + return Err(WebtransportConnectingError::UnexpectedProtocol( + proto.cloned(), + )); } } Ok(None) => { // indicating no more streams to be received - return Err(WebtransportConnectingError::NoMoreStreams) + return Err(WebtransportConnectingError::NoMoreStreams); } - Err(err) => { - match err.get_error_level() { - ErrorLevel::ConnectionError => return Err(WebtransportConnectingError::Http3Error(err)), - ErrorLevel::StreamError => continue, + Err(err) => match err.get_error_level() { + ErrorLevel::ConnectionError => { + return Err(WebtransportConnectingError::Http3Error(err)) } - } + ErrorLevel::StreamError => continue, + }, } } } @@ -182,4 +212,4 @@ impl Future for Connecting { Poll::Ready(Ok((peer_id, connection))) } -} \ No newline at end of file +} diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index 76bb03f7aaa..764eea306f9 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -31,6 +31,8 @@ use futures::{prelude::*, stream::SelectAll}; use if_watch::IfEvent; +use libp2p_core::multihash::Multihash; +use libp2p_core::muxing::StreamMuxerBox; use libp2p_core::{ multiaddr::{Multiaddr, Protocol}, transport::{ListenerId, TransportError, TransportEvent}, @@ -40,6 +42,7 @@ use libp2p_identity::PeerId; use socket2::{Domain, Socket, Type}; use std::collections::hash_map::{DefaultHasher, Entry}; use std::collections::{HashMap, HashSet}; +use std::fmt::{Debug, Formatter}; use std::hash::{Hash, Hasher}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, UdpSocket}; use std::time::Duration; @@ -49,9 +52,6 @@ use std::{ pin::Pin, task::{Context, Poll, Waker}, }; -use std::fmt::{Debug, Formatter}; -use libp2p_core::multihash::Multihash; -use libp2p_core::muxing::StreamMuxerBox; /// Implementation of the [`Transport`] trait for QUIC. /// @@ -140,8 +140,9 @@ impl GenTransport

{ (SocketAddr, ProtocolVersion, Option, bool), TransportError<::Error>, > { - let (socket_addr, version, peer_id, wt) = multiaddr_to_socketaddr(addr, self.support_draft_29) - .ok_or_else(|| TransportError::MultiaddrNotSupported(addr.clone()))?; + let (socket_addr, version, peer_id, wt) = + multiaddr_to_socketaddr(addr, self.support_draft_29) + .ok_or_else(|| TransportError::MultiaddrNotSupported(addr.clone()))?; if check_unspecified_addr && (socket_addr.port() == 0 || socket_addr.ip().is_unspecified()) { return Err(TransportError::MultiaddrNotSupported(addr.clone())); @@ -211,7 +212,8 @@ impl Transport for GenTransport

{ listener_id: ListenerId, addr: Multiaddr, ) -> Result<(), TransportError> { - let (socket_addr, version, _peer_id, wt) = self.remote_multiaddr_to_socketaddr(&addr, false)?; + let (socket_addr, version, _peer_id, wt) = + self.remote_multiaddr_to_socketaddr(&addr, false)?; if wt && self.quinn_config.webtransport_certhashes.is_empty() { return Err(TransportError::MultiaddrNotSupported(addr)); @@ -220,8 +222,11 @@ impl Transport for GenTransport

{ let noise_config = libp2p_noise::Config::new(&self.quinn_config.keypair) .expect("Getting a noise configuration from the node keypair"); - if let Some(listener) = self.listeners.iter_mut() - .find(|l| !l.is_closed && l.socket_addr() == socket_addr) { + if let Some(listener) = self + .listeners + .iter_mut() + .find(|l| !l.is_closed && l.socket_addr() == socket_addr) + { listener.update(wt, &self.quinn_config.webtransport_certhashes, noise_config); } else { let socket = self.create_socket(socket_addr).map_err(Self::Error::from)?; @@ -231,7 +236,10 @@ impl Transport for GenTransport

{ let endpoint = Self::new_endpoint(endpoint_config, Some(server_config), socket)?; let mode = if wt { - ConnectingMode::WebTransport(self.quinn_config.webtransport_certhashes.clone(), noise_config) + ConnectingMode::WebTransport( + self.quinn_config.webtransport_certhashes.clone(), + noise_config, + ) } else { ConnectingMode::QUIC }; @@ -281,7 +289,8 @@ impl Transport for GenTransport

{ } fn dial(&mut self, addr: Multiaddr) -> Result> { - let (socket_addr, version, _peer_id, wt) = self.remote_multiaddr_to_socketaddr(&addr, true)?; + let (socket_addr, version, _peer_id, wt) = + self.remote_multiaddr_to_socketaddr(&addr, true)?; if wt { return Err(TransportError::MultiaddrNotSupported(addr)); @@ -602,10 +611,12 @@ impl Listener

{ } } - fn update(&mut self, - is_webtransport: bool, - cert_hashes: &Vec>, - noise_cfg: libp2p_noise::Config) { + fn update( + &mut self, + is_webtransport: bool, + cert_hashes: &Vec>, + noise_cfg: libp2p_noise::Config, + ) { let cur_mode = &self.mode; match cur_mode { ConnectingMode::QUIC => { @@ -721,10 +732,13 @@ impl ConnectingMode { pub(crate) fn update_multiaddr(&self, addr: Multiaddr) -> Multiaddr { fn add_web_transport_protocol(v: &Vec>, addr: Multiaddr) -> Multiaddr { let mut vec = v.clone(); - let mut res = addr.with(Protocol::WebTransport) - .with(Protocol::Certhash(vec.pop().expect("Gets the last element"))); + let mut res = addr.with(Protocol::WebTransport).with(Protocol::Certhash( + vec.pop().expect("Gets the last element"), + )); if !vec.is_empty() { - res = res.with(Protocol::Certhash(vec.pop().expect("Gets the last element"))); + res = res.with(Protocol::Certhash( + vec.pop().expect("Gets the last element"), + )); }; res @@ -741,12 +755,13 @@ impl ConnectingMode { impl Debug for ConnectingMode { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { - ConnectingMode::QUIC => - write!(f, "ConnectingMode::QUIC"), - ConnectingMode::WebTransport(certs, _) => - write!(f, "ConnectingMode::WebTransport(hashes:{:?})", certs), - ConnectingMode::Mixed(certs, _) => - write!(f, "ConnectingMode::Mixed(hashes:{:?})", certs), + ConnectingMode::QUIC => write!(f, "ConnectingMode::QUIC"), + ConnectingMode::WebTransport(certs, _) => { + write!(f, "ConnectingMode::WebTransport(hashes:{:?})", certs) + } + ConnectingMode::Mixed(certs, _) => { + write!(f, "ConnectingMode::Mixed(hashes:{:?})", certs) + } } } } @@ -824,12 +839,18 @@ fn multiaddr_to_socketaddr( }; match (proto1, proto2) { - (Protocol::Ip4(ip), Protocol::Udp(port)) => { - Some((SocketAddr::new(ip.into(), port), version, peer_id, is_webtransport)) - } - (Protocol::Ip6(ip), Protocol::Udp(port)) => { - Some((SocketAddr::new(ip.into(), port), version, peer_id, is_webtransport)) - } + (Protocol::Ip4(ip), Protocol::Udp(port)) => Some(( + SocketAddr::new(ip.into(), port), + version, + peer_id, + is_webtransport, + )), + (Protocol::Ip6(ip), Protocol::Udp(port)) => Some(( + SocketAddr::new(ip.into(), port), + version, + peer_id, + is_webtransport, + )), _ => None, } } @@ -979,7 +1000,9 @@ mod tests { assert_eq!( multiaddr_to_socketaddr( - &"/ip4/127.0.0.1/udp/1234/quic/webtransport".parse::().unwrap(), + &"/ip4/127.0.0.1/udp/1234/quic/webtransport" + .parse::() + .unwrap(), true, ), Some(( diff --git a/transports/quic/src/webtransport.rs b/transports/quic/src/webtransport.rs index a51fa547c15..c9d064ab1c0 100644 --- a/transports/quic/src/webtransport.rs +++ b/transports/quic/src/webtransport.rs @@ -35,24 +35,23 @@ pub enum WebtransportConnectingError { impl Display for WebtransportConnectingError { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { - WebtransportConnectingError::UnexpectedProtocol(p) => - write!(f, "Unexpected request protocol {:?}", p), - WebtransportConnectingError::UnexpectedMethod(m) => - write!(f, "Unexpected initial request method {:?}", m), - WebtransportConnectingError::UnexpectedPath(p) => - write!(f, "Unexpected initial request path {}", p), - WebtransportConnectingError::UnexpectedQuery(q) => - write!(f, "Unexpected initial request query {}", q), - WebtransportConnectingError::ConnectionError(e) => - write!(f, "Connection error: {}", e), - WebtransportConnectingError::Http3Error(e) => - write!(f, "Http3 error: {}", e), - WebtransportConnectingError::NoiseError(e) => - write!(f, "Noise error: {}", e), - WebtransportConnectingError::NoMoreStreams => - write!(f, "No more streams"), - WebtransportConnectingError::CrateError(e) => - write!(f, "{}", e), + WebtransportConnectingError::UnexpectedProtocol(p) => { + write!(f, "Unexpected request protocol {:?}", p) + } + WebtransportConnectingError::UnexpectedMethod(m) => { + write!(f, "Unexpected initial request method {:?}", m) + } + WebtransportConnectingError::UnexpectedPath(p) => { + write!(f, "Unexpected initial request path {}", p) + } + WebtransportConnectingError::UnexpectedQuery(q) => { + write!(f, "Unexpected initial request query {}", q) + } + WebtransportConnectingError::ConnectionError(e) => write!(f, "Connection error: {}", e), + WebtransportConnectingError::Http3Error(e) => write!(f, "Http3 error: {}", e), + WebtransportConnectingError::NoiseError(e) => write!(f, "Noise error: {}", e), + WebtransportConnectingError::NoMoreStreams => write!(f, "No more streams"), + WebtransportConnectingError::CrateError(e) => write!(f, "{}", e), } } } @@ -91,4 +90,4 @@ impl From for WebtransportConnectingError { fn from(e: quinn::ConnectionError) -> Self { WebtransportConnectingError::ConnectionError(e) } -} \ No newline at end of file +} diff --git a/transports/quic/src/webtransport/certificates.rs b/transports/quic/src/webtransport/certificates.rs index a8ba919b917..52a5a3765f9 100644 --- a/transports/quic/src/webtransport/certificates.rs +++ b/transports/quic/src/webtransport/certificates.rs @@ -1,24 +1,25 @@ -use std::io; -use std::io::{Cursor, Read, Write}; -use rustls::pki_types::{CertificateDer, PrivateKeyDer}; -use time::{Duration, OffsetDateTime}; use libp2p_core::multihash::Multihash; use libp2p_tls::certificate; +use rustls::pki_types::{CertificateDer, PrivateKeyDer}; use sha2::Digest; +use std::io; +use std::io::{Cursor, Read, Write}; +use time::{Duration, OffsetDateTime}; const MULTIHASH_SHA256_CODE: u64 = 0x12; const CERT_VALID_PERIOD: Duration = Duration::days(14); pub(crate) fn alpn_protocols() -> Vec> { - vec![b"libp2p".to_vec(), - b"h3".to_vec(), - b"h3-32".to_vec(), - b"h3-31".to_vec(), - b"h3-30".to_vec(), - b"h3-29".to_vec(), ] + vec![ + b"libp2p".to_vec(), + b"h3".to_vec(), + b"h3-32".to_vec(), + b"h3-31".to_vec(), + b"h3-30".to_vec(), + b"h3-29".to_vec(), + ] } - /* I would like to avoid interacting with the file system as much as possible. My suggestion would be: @@ -68,7 +69,8 @@ impl Certificate { identity_keypair: &libp2p_identity::Keypair, not_before: OffsetDateTime, ) -> Result { - let not_after = not_before.clone() + let not_after = not_before + .clone() .checked_add(CERT_VALID_PERIOD) .expect("Addition does not overflow"); let (cert, private_key) = certificate::generate_with_validity_period( @@ -77,30 +79,34 @@ impl Certificate { not_after.clone(), )?; - Ok(Self { cert, private_key, not_before, not_after }) + Ok(Self { + cert, + private_key, + not_before, + not_after, + }) } pub(crate) fn cert_hash(&self) -> Multihash<64> { Multihash::wrap( - MULTIHASH_SHA256_CODE, sha2::Sha256::digest(&self.cert.as_ref().as_ref()).as_ref(), - ).expect("fingerprint's len to be 32 bytes") + MULTIHASH_SHA256_CODE, + sha2::Sha256::digest(&self.cert.as_ref().as_ref()).as_ref(), + ) + .expect("fingerprint's len to be 32 bytes") } pub fn to_bytes(&self) -> Vec { let mut bytes = Vec::new(); - Self::write_data(&mut bytes, self.cert.as_ref()) - .expect("Write cert data"); + Self::write_data(&mut bytes, self.cert.as_ref()).expect("Write cert data"); Self::write_data(&mut bytes, self.private_key.secret_der()) .expect("Write private_key data"); let nb_buff = self.not_before.unix_timestamp().to_be_bytes(); - std::io::Write::write(&mut bytes, &nb_buff) - .expect("Write not_before"); + std::io::Write::write(&mut bytes, &nb_buff).expect("Write not_before"); let na_buff = self.not_after.unix_timestamp().to_be_bytes(); - std::io::Write::write(&mut bytes, &na_buff) - .expect("Write not_after"); + std::io::Write::write(&mut bytes, &na_buff).expect("Write not_after"); bytes } @@ -117,7 +123,12 @@ impl Certificate { let not_before = OffsetDateTime::from_unix_timestamp(nb).unwrap(); let not_after = OffsetDateTime::from_unix_timestamp(na).unwrap(); - Ok(Self { cert, private_key, not_before, not_after }) + Ok(Self { + cert, + private_key, + not_before, + not_after, + }) } fn write_data(w: &mut W, data: &[u8]) -> Result<(), io::Error> { @@ -158,9 +169,8 @@ mod tests { let cert = super::Certificate::generate(&keypair, not_before).unwrap(); let binary_data = cert.to_bytes(); - let actual = super::Certificate::parse(binary_data.as_slice()) - .unwrap(); + let actual = super::Certificate::parse(binary_data.as_slice()).unwrap(); assert_eq!(actual, cert); } -} \ No newline at end of file +} diff --git a/transports/quic/src/webtransport/connection.rs b/transports/quic/src/webtransport/connection.rs index d3d5e54b616..775485eeba8 100644 --- a/transports/quic/src/webtransport/connection.rs +++ b/transports/quic/src/webtransport/connection.rs @@ -6,8 +6,8 @@ use std::sync::Arc; use std::task::{Context, Poll}; use bytes::Bytes; -use futures::{AsyncRead, AsyncWrite, FutureExt}; use futures::future::BoxFuture; +use futures::{AsyncRead, AsyncWrite, FutureExt}; use h3::quic; use h3_webtransport::server::{AcceptedBi, WebTransportSession}; use h3_webtransport::stream::{RecvStream, SendStream}; @@ -32,7 +32,7 @@ pub(crate) struct Connection { impl Connection { pub(crate) fn new( session: Arc>, - connection: quinn::Connection + connection: quinn::Connection, ) -> Self { Self { session, @@ -47,7 +47,10 @@ impl StreamMuxer for Connection { type Substream = Stream; type Error = Error; - fn poll_inbound(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_inbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { let this = self.get_mut(); let t_session = Arc::clone(&this.session); @@ -60,7 +63,10 @@ impl StreamMuxer for Connection { Poll::Ready(Ok(res)) } - fn poll_outbound(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + fn poll_outbound( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { panic!("WebTransport implementation doesn't support outbound streams.") } @@ -82,7 +88,10 @@ impl StreamMuxer for Connection { Poll::Ready(Ok(())) } - fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + fn poll( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { // TODO: If connection migration is enabled (currently disabled) address // change on the connection needs to be handled. Poll::Pending @@ -99,12 +108,20 @@ pub(crate) struct Stream { impl Stream { pub(crate) fn new(send: SendPart, recv: RecvPart) -> Self { - Self { send, recv, close_result: None } + Self { + send, + recv, + close_result: None, + } } } impl AsyncRead for Stream { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { if let Some(close_result) = self.close_result { if close_result.is_err() { return Poll::Ready(Ok(0)); @@ -116,7 +133,11 @@ impl AsyncRead for Stream { } impl AsyncWrite for Stream { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { Pin::new(&mut self.send) .poll_write(cx, buf) .map_err(Into::into) @@ -164,9 +185,9 @@ pub(crate) async fn accept_webtransport_stream( let (send, recv) = quic::BidiStream::split(stream); Ok(Stream::new(send, recv)) } - Ok(_) => Err( - WebtransportError(String::from("Only bidirectional streams are supported")).into() - ), + Ok(_) => { + Err(WebtransportError(String::from("Only bidirectional streams are supported")).into()) + } Err(e) => Err(Error::Io(io::Error::new(ErrorKind::Other, e))), } -} \ No newline at end of file +} diff --git a/transports/tls/src/certificate.rs b/transports/tls/src/certificate.rs index 0d3c56f33b2..6012f6eb464 100644 --- a/transports/tls/src/certificate.rs +++ b/transports/tls/src/certificate.rs @@ -26,8 +26,8 @@ use libp2p_identity as identity; use libp2p_identity::PeerId; use x509_parser::{prelude::*, signature_algorithm::SignatureAlgorithm}; -use std::sync::Arc; use ::time::OffsetDateTime; +use std::sync::Arc; /// The libp2p Public Key Extension is a X.509 extension /// with the Object Identifier 1.3.6.1.4.1.53594.1.1, From 5af4f1aa2f297ae0cf4b8fb8a874191f568b9fe9 Mon Sep 17 00:00:00 2001 From: dgarus Date: Tue, 20 Aug 2024 16:57:31 +0300 Subject: [PATCH 3/7] resolved conflicts --- transports/quic/src/transport.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index 51a6cb1a0f5..73bc1fe586d 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -302,7 +302,7 @@ impl Transport for GenTransport

{ dial_opts: DialOpts, ) -> Result> { let (socket_addr, version, peer_id, wt) = - self.remote_multiaddr_to_socketaddr(addr.clone(), true)?; + self.remote_multiaddr_to_socketaddr(&addr, true)?; if wt { return Err(TransportError::MultiaddrNotSupported(addr)); @@ -344,7 +344,7 @@ impl Transport for GenTransport

{ let connecting = endpoint .connect_with(client_config, socket_addr, "l") .map_err(ConnectError)?; - Connecting::new(connecting, handshake_timeout).await + Connecting::new(connecting, ConnectingMode::QUIC, handshake_timeout).await })) } (Endpoint::Listener, _) => { From 589ab88366d78286882c497e9d80ab999aee9faf Mon Sep 17 00:00:00 2001 From: dgarus Date: Fri, 23 Aug 2024 18:12:20 +0300 Subject: [PATCH 4/7] refactoring --- transports/quic/src/connection/connecting.rs | 3 +-- transports/quic/src/webtransport.rs | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/transports/quic/src/connection/connecting.rs b/transports/quic/src/connection/connecting.rs index ddd693064b2..5ec3b830280 100644 --- a/transports/quic/src/connection/connecting.rs +++ b/transports/quic/src/connection/connecting.rs @@ -173,8 +173,7 @@ async fn webtransport_connecting( let webtr_stream = webtransport::accept_webtransport_stream(&arc_session).await?; - let mut certs = HashSet::new(); - certs.insert(certhashes.first().unwrap().clone()); + let certs = certhashes.iter().cloned().collect::>(); let t_noise = noise_config.with_webtransport_certhashes(certs); t_noise.upgrade_inbound(webtr_stream, "").await?; diff --git a/transports/quic/src/webtransport.rs b/transports/quic/src/webtransport.rs index c9d064ab1c0..7b82c8b76aa 100644 --- a/transports/quic/src/webtransport.rs +++ b/transports/quic/src/webtransport.rs @@ -1,4 +1,4 @@ -use std::fmt::{Debug, Display, Formatter, Write}; +use std::fmt::{Debug, Display, Formatter}; use std::io; use std::io::ErrorKind; From d920fc3f8312679a6057dfa33209bdcd1362c833 Mon Sep 17 00:00:00 2001 From: dgarus Date: Sat, 24 Aug 2024 16:03:31 +0300 Subject: [PATCH 5/7] added a close listener test --- transports/quic/src/transport.rs | 61 ++++++++++++++++++++++++++++++++ transports/quic/tests/smoke.rs | 6 ++-- 2 files changed, 64 insertions(+), 3 deletions(-) diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index 73bc1fe586d..0e22315d56e 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -879,6 +879,8 @@ fn socketaddr_to_multiaddr(socket_addr: &SocketAddr, version: ProtocolVersion) - mod tests { use super::*; use futures::future::poll_fn; + use time::OffsetDateTime; + use crate::webtransport; #[test] fn multiaddr_to_udp_conversion() { @@ -1047,6 +1049,65 @@ mod tests { } } + #[cfg(feature = "async-std")] + #[async_std::test] + async fn test_close_webtransport_listener() { + let keypair = libp2p_identity::Keypair::generate_ed25519(); + let not_before = OffsetDateTime::now_utc(); + let cert = webtransport::Certificate::generate(&keypair, not_before) + .expect("Certificate generation"); + let certhash = cert.cert_hash(); + let config = Config::new(&keypair, Some(cert)); + let mut transport = crate::async_std::Transport::new(config); + assert!(poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)) + .now_or_never() + .is_none()); + + // Run test twice to check that there is no unexpected behaviour if `Transport.listener` + // is temporarily empty. + for _ in 0..2 { + let id = ListenerId::next(); + transport + .listen_on(id, "/ip4/0.0.0.0/udp/0/quic-v1/webtransport".parse().unwrap()) + .unwrap(); + + match poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)).await { + TransportEvent::NewAddress { + listener_id, + listen_addr, + } => { + assert_eq!(listener_id, id); + assert!( + matches!(listen_addr.iter().next(), Some(Protocol::Ip4(a)) if !a.is_unspecified()) + ); + assert!( + matches!(listen_addr.iter().nth(1), Some(Protocol::Udp(port)) if port != 0) + ); + assert!(matches!(listen_addr.iter().nth(2), Some(Protocol::QuicV1))); + assert!(matches!(listen_addr.iter().nth(3), Some(Protocol::WebTransport))); + assert!(matches!(listen_addr.iter().nth(4), Some(Protocol::Certhash(h)) if h == certhash)); + } + e => panic!("Unexpected event: {e:?}"), + } + assert!(transport.remove_listener(id), "Expect listener to exist."); + match poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)).await { + TransportEvent::ListenerClosed { + listener_id, + reason: Ok(()), + } => { + assert_eq!(listener_id, id); + } + e => panic!("Unexpected event: {e:?}"), + } + // Poll once again so that the listener has the chance to return `Poll::Ready(None)` and + // be removed from the list of listeners. + assert!(poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)) + .now_or_never() + .is_none()); + assert!(transport.listeners.is_empty()); + } + } + #[cfg(feature = "tokio")] #[tokio::test] async fn test_dialer_drop() { diff --git a/transports/quic/tests/smoke.rs b/transports/quic/tests/smoke.rs index 6a760f9997c..338c2d217b4 100644 --- a/transports/quic/tests/smoke.rs +++ b/transports/quic/tests/smoke.rs @@ -221,7 +221,7 @@ async fn wrong_peerid() { fn new_tcp_quic_transport() -> (PeerId, Boxed<(PeerId, StreamMuxerBox)>) { let keypair = generate_tls_keypair(); let peer_id = keypair.public().to_peer_id(); - let mut config = quic::Config::new(&keypair); + let mut config = quic::Config::new(&keypair, None); config.handshake_timeout = Duration::from_secs(1); let quic_transport = quic::async_std::Transport::new(config); @@ -376,7 +376,7 @@ async fn backpressure() { let _ = tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env()) .try_init(); - let max_stream_data = quic::Config::new(&generate_tls_keypair()).max_stream_data; + let max_stream_data = quic::Config::new(&generate_tls_keypair(), None).max_stream_data; let (mut stream_a, mut stream_b) = build_streams::().await; @@ -567,7 +567,7 @@ fn create_transport( ) -> (PeerId, Boxed<(PeerId, StreamMuxerBox)>) { let keypair = generate_tls_keypair(); let peer_id = keypair.public().to_peer_id(); - let mut config = quic::Config::new(&keypair); + let mut config = quic::Config::new(&keypair, None); with_config(&mut config); let transport = quic::GenTransport::

::new(config) .map(|(p, c), _| (p, StreamMuxerBox::new(c))) From 04bf57538dabe5f0a1c97d91e210130f1e1c1f02 Mon Sep 17 00:00:00 2001 From: dgarus Date: Sat, 24 Aug 2024 16:05:04 +0300 Subject: [PATCH 6/7] fixed format issues --- transports/quic/src/transport.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index 0e22315d56e..ed9b2634b16 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -878,9 +878,9 @@ fn socketaddr_to_multiaddr(socket_addr: &SocketAddr, version: ProtocolVersion) - #[cfg(any(feature = "async-std", feature = "tokio"))] mod tests { use super::*; + use crate::webtransport; use futures::future::poll_fn; use time::OffsetDateTime; - use crate::webtransport; #[test] fn multiaddr_to_udp_conversion() { @@ -1068,7 +1068,10 @@ mod tests { for _ in 0..2 { let id = ListenerId::next(); transport - .listen_on(id, "/ip4/0.0.0.0/udp/0/quic-v1/webtransport".parse().unwrap()) + .listen_on( + id, + "/ip4/0.0.0.0/udp/0/quic-v1/webtransport".parse().unwrap(), + ) .unwrap(); match poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)).await { @@ -1084,8 +1087,13 @@ mod tests { matches!(listen_addr.iter().nth(1), Some(Protocol::Udp(port)) if port != 0) ); assert!(matches!(listen_addr.iter().nth(2), Some(Protocol::QuicV1))); - assert!(matches!(listen_addr.iter().nth(3), Some(Protocol::WebTransport))); - assert!(matches!(listen_addr.iter().nth(4), Some(Protocol::Certhash(h)) if h == certhash)); + assert!(matches!( + listen_addr.iter().nth(3), + Some(Protocol::WebTransport) + )); + assert!( + matches!(listen_addr.iter().nth(4), Some(Protocol::Certhash(h)) if h == certhash) + ); } e => panic!("Unexpected event: {e:?}"), } From f017725e3045ef2b9c1ffa14bdfc859ba7d7b5ba Mon Sep 17 00:00:00 2001 From: dgarus Date: Sat, 24 Aug 2024 18:06:46 +0300 Subject: [PATCH 7/7] fixed failed tests --- transports/quic/tests/stream_compliance.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/transports/quic/tests/stream_compliance.rs b/transports/quic/tests/stream_compliance.rs index b0536473215..c29ebd181c6 100644 --- a/transports/quic/tests/stream_compliance.rs +++ b/transports/quic/tests/stream_compliance.rs @@ -4,6 +4,7 @@ use libp2p_core::transport::{DialOpts, ListenerId, PortUse}; use libp2p_core::{Endpoint, Transport}; use libp2p_quic as quic; use std::time::Duration; +use libp2p_core::muxing::StreamMuxerBox; #[async_std::test] async fn close_implies_flush() { @@ -19,7 +20,7 @@ async fn read_after_close() { libp2p_muxer_test_harness::read_after_close(alice, bob).await; } -async fn connected_peers() -> (quic::Connection, quic::Connection) { +async fn connected_peers() -> (StreamMuxerBox, StreamMuxerBox) { let mut dialer = new_transport().boxed(); let mut listener = new_transport().boxed(); @@ -75,7 +76,7 @@ async fn connected_peers() -> (quic::Connection, quic::Connection) { fn new_transport() -> quic::async_std::Transport { let keypair = libp2p_identity::Keypair::generate_ed25519(); - let mut config = quic::Config::new(&keypair); + let mut config = quic::Config::new(&keypair, None); config.handshake_timeout = Duration::from_secs(1); quic::async_std::Transport::new(config)