From b8ceeccdc6a4d38ebe77d74cab3c66be072f16db Mon Sep 17 00:00:00 2001 From: Denis Garus Date: Wed, 14 Jun 2023 20:49:23 +0300 Subject: [PATCH] refactor(rendezvous): rewrite using `libp2p-request-response` Fixes [#3878](https://github.com/libp2p/rust-libp2p/issues/3878). Pull-Request: #4051. Co-Authored-By: Thomas Eizinger --- Cargo.lock | 1 + protocols/rendezvous/Cargo.toml | 3 +- protocols/rendezvous/src/client.rs | 462 ++++++++------- protocols/rendezvous/src/codec.rs | 123 +++- protocols/rendezvous/src/handler.rs | 50 -- protocols/rendezvous/src/handler/inbound.rs | 192 ------ protocols/rendezvous/src/handler/outbound.rs | 134 ----- protocols/rendezvous/src/lib.rs | 10 +- protocols/rendezvous/src/server.rs | 361 ++++++------ protocols/rendezvous/src/substream_handler.rs | 556 ------------------ 10 files changed, 555 insertions(+), 1337 deletions(-) delete mode 100644 protocols/rendezvous/src/handler.rs delete mode 100644 protocols/rendezvous/src/handler/inbound.rs delete mode 100644 protocols/rendezvous/src/handler/outbound.rs delete mode 100644 protocols/rendezvous/src/substream_handler.rs diff --git a/Cargo.lock b/Cargo.lock index 4055073ced9..c0f35edd86e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2881,6 +2881,7 @@ dependencies = [ "libp2p-identity", "libp2p-noise", "libp2p-ping", + "libp2p-request-response", "libp2p-swarm", "libp2p-swarm-test", "libp2p-tcp", diff --git a/protocols/rendezvous/Cargo.toml b/protocols/rendezvous/Cargo.toml index e066686a437..b1cf7db044d 100644 --- a/protocols/rendezvous/Cargo.toml +++ b/protocols/rendezvous/Cargo.toml @@ -12,6 +12,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] asynchronous-codec = "0.6" +async-trait = "0.1" bimap = "0.6.3" futures = { version = "0.3", default-features = false, features = ["std"] } futures-timer = "3.0.2" @@ -19,6 +20,7 @@ instant = "0.1.12" libp2p-core = { workspace = true } libp2p-swarm = { workspace = true } libp2p-identity = { workspace = true } +libp2p-request-response = { workspace = true } log = "0.4" quick-protobuf = "0.8" quick-protobuf-codec = { workspace = true } @@ -27,7 +29,6 @@ thiserror = "1" void = "1" [dev-dependencies] -async-trait = "0.1" env_logger = "0.10.0" libp2p-swarm = { workspace = true, features = ["macros", "tokio"] } libp2p-noise = { workspace = true } diff --git a/protocols/rendezvous/src/client.rs b/protocols/rendezvous/src/client.rs index d1a514f1820..363e31965eb 100644 --- a/protocols/rendezvous/src/client.rs +++ b/protocols/rendezvous/src/client.rs @@ -18,32 +18,33 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::codec::{Cookie, ErrorCode, Namespace, NewRegistration, Registration, Ttl}; -use crate::handler; -use crate::handler::outbound; -use crate::handler::outbound::OpenInfo; -use crate::substream_handler::{InEvent, SubstreamConnectionHandler}; +use crate::codec::Message::*; +use crate::codec::{Cookie, ErrorCode, Message, Namespace, NewRegistration, Registration, Ttl}; use futures::future::BoxFuture; use futures::future::FutureExt; use futures::stream::FuturesUnordered; use futures::stream::StreamExt; -use instant::Duration; use libp2p_core::{Endpoint, Multiaddr, PeerRecord}; use libp2p_identity::{Keypair, PeerId, SigningError}; -use libp2p_swarm::behaviour::FromSwarm; +use libp2p_request_response::{ProtocolSupport, RequestId}; use libp2p_swarm::{ - CloseConnection, ConnectionDenied, ConnectionId, ExternalAddresses, NetworkBehaviour, - NotifyHandler, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, + ConnectionDenied, ConnectionId, ExternalAddresses, FromSwarm, NetworkBehaviour, PollParameters, + THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use std::collections::{HashMap, VecDeque}; -use std::iter::FromIterator; +use std::iter; use std::task::{Context, Poll}; -use void::Void; +use std::time::Duration; pub struct Behaviour { - events: VecDeque>>, + inner: libp2p_request_response::Behaviour, + keypair: Keypair, - pending_register_requests: Vec<(Namespace, PeerId, Option)>, + + error_events: VecDeque, + + waiting_for_register: HashMap, + waiting_for_discovery: HashMap)>, /// Hold addresses of all peers that we have discovered so far. /// @@ -60,9 +61,15 @@ impl Behaviour { /// Create a new instance of the rendezvous [`NetworkBehaviour`]. pub fn new(keypair: Keypair) -> Self { Self { - events: Default::default(), + inner: libp2p_request_response::Behaviour::with_codec( + crate::codec::Codec::default(), + iter::once((crate::PROTOCOL_IDENT, ProtocolSupport::Outbound)), + libp2p_request_response::Config::default(), + ), + error_events: Default::default(), keypair, - pending_register_requests: vec![], + waiting_for_register: Default::default(), + waiting_for_discovery: Default::default(), discovered_peers: Default::default(), expiring_registrations: FuturesUnordered::from_iter(vec![ futures::future::pending().boxed() @@ -76,19 +83,35 @@ impl Behaviour { /// External addresses are either manually added via [`libp2p_swarm::Swarm::add_external_address`] or reported /// by other [`NetworkBehaviour`]s via [`ToSwarm::ExternalAddrConfirmed`]. pub fn register(&mut self, namespace: Namespace, rendezvous_node: PeerId, ttl: Option) { - self.pending_register_requests - .push((namespace, rendezvous_node, ttl)); + let external_addresses = self.external_addresses.iter().cloned().collect::>(); + if external_addresses.is_empty() { + self.error_events + .push_back(Event::RegisterFailed(RegisterError::NoExternalAddresses)); + + return; + } + + match PeerRecord::new(&self.keypair, external_addresses) { + Ok(peer_record) => { + let req_id = self.inner.send_request( + &rendezvous_node, + Register(NewRegistration::new(namespace.clone(), peer_record, ttl)), + ); + self.waiting_for_register + .insert(req_id, (rendezvous_node, namespace)); + } + Err(signing_error) => { + self.error_events.push_back(Event::RegisterFailed( + RegisterError::FailedToMakeRecord(signing_error), + )); + } + }; } /// Unregister ourselves from the given namespace with the given rendezvous peer. pub fn unregister(&mut self, namespace: Namespace, rendezvous_node: PeerId) { - self.events.push_back(ToSwarm::NotifyHandler { - peer_id: rendezvous_node, - event: handler::OutboundInEvent::NewSubstream { - open_info: OpenInfo::UnregisterRequest(namespace), - }, - handler: NotifyHandler::Any, - }); + self.inner + .send_request(&rendezvous_node, Unregister(namespace)); } /// Discover other peers at a given rendezvous peer. @@ -100,22 +123,22 @@ impl Behaviour { /// the cookie was acquired. pub fn discover( &mut self, - ns: Option, + namespace: Option, cookie: Option, limit: Option, rendezvous_node: PeerId, ) { - self.events.push_back(ToSwarm::NotifyHandler { - peer_id: rendezvous_node, - event: handler::OutboundInEvent::NewSubstream { - open_info: OpenInfo::DiscoverRequest { - namespace: ns, - cookie, - limit, - }, + let req_id = self.inner.send_request( + &rendezvous_node, + Discover { + namespace: namespace.clone(), + cookie, + limit, }, - handler: NotifyHandler::Any, - }); + ); + + self.waiting_for_discovery + .insert(req_id, (rendezvous_node, namespace)); } } @@ -161,55 +184,36 @@ pub enum Event { } impl NetworkBehaviour for Behaviour { - type ConnectionHandler = - SubstreamConnectionHandler; + type ConnectionHandler = as NetworkBehaviour>::ConnectionHandler; + type ToSwarm = Event; fn handle_established_inbound_connection( &mut self, - _: ConnectionId, - _: PeerId, - _: &Multiaddr, - _: &Multiaddr, + connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, ) -> Result, ConnectionDenied> { - Ok(SubstreamConnectionHandler::new_outbound_only( - Duration::from_secs(30), - )) - } - - fn handle_pending_outbound_connection( - &mut self, - _connection_id: ConnectionId, - maybe_peer: Option, - _addresses: &[Multiaddr], - _effective_role: Endpoint, - ) -> Result, ConnectionDenied> { - let peer = match maybe_peer { - None => return Ok(vec![]), - Some(peer) => peer, - }; - - let addresses = self - .discovered_peers - .iter() - .filter_map(|((candidate, _), addresses)| (candidate == &peer).then_some(addresses)) - .flatten() - .cloned() - .collect(); - - Ok(addresses) + self.inner.handle_established_inbound_connection( + connection_id, + peer, + local_addr, + remote_addr, + ) } fn handle_established_outbound_connection( &mut self, - _: ConnectionId, - _: PeerId, - _: &Multiaddr, - _: Endpoint, + connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: Endpoint, ) -> Result, ConnectionDenied> { - Ok(SubstreamConnectionHandler::new_outbound_only( - Duration::from_secs(30), - )) + self.inner + .handle_established_outbound_connection(connection_id, peer, addr, role_override) } fn on_connection_handler_event( @@ -218,159 +222,213 @@ impl NetworkBehaviour for Behaviour { connection_id: ConnectionId, event: THandlerOutEvent, ) { - let new_events = match event { - handler::OutboundOutEvent::InboundEvent { message, .. } => void::unreachable(message), - handler::OutboundOutEvent::OutboundEvent { message, .. } => handle_outbound_event( - message, - peer_id, - &mut self.discovered_peers, - &mut self.expiring_registrations, - ), - handler::OutboundOutEvent::InboundError { error, .. } => void::unreachable(error), - handler::OutboundOutEvent::OutboundError { error, .. } => { - log::warn!("Connection with peer {} failed: {}", peer_id, error); - - vec![ToSwarm::CloseConnection { - peer_id, - connection: CloseConnection::One(connection_id), - }] - } - }; + self.inner + .on_connection_handler_event(peer_id, connection_id, event); + } - self.events.extend(new_events); + fn on_swarm_event(&mut self, event: FromSwarm) { + self.external_addresses.on_swarm_event(&event); + + self.inner.on_swarm_event(event); } fn poll( &mut self, cx: &mut Context<'_>, - _: &mut impl PollParameters, + params: &mut impl PollParameters, ) -> Poll>> { - if let Some(event) = self.events.pop_front() { - return Poll::Ready(event); - } + use libp2p_request_response as req_res; - if let Some((namespace, rendezvous_node, ttl)) = self.pending_register_requests.pop() { - // Update our external addresses based on the Swarm's current knowledge. - // It doesn't make sense to register addresses on which we are not reachable, hence this should not be configurable from the outside. - - let external_addresses = self.external_addresses.iter().cloned().collect::>(); - - if external_addresses.is_empty() { - return Poll::Ready(ToSwarm::GenerateEvent(Event::RegisterFailed( - RegisterError::NoExternalAddresses, - ))); - } + if let Some(event) = self.error_events.pop_front() { + return Poll::Ready(ToSwarm::GenerateEvent(event)); + } - let action = match PeerRecord::new(&self.keypair, external_addresses) { - Ok(peer_record) => ToSwarm::NotifyHandler { - peer_id: rendezvous_node, - event: handler::OutboundInEvent::NewSubstream { - open_info: OpenInfo::RegisterRequest(NewRegistration { - namespace, - record: peer_record, - ttl, - }), + loop { + match self.inner.poll(cx, params) { + Poll::Ready(ToSwarm::GenerateEvent(req_res::Event::Message { + message: + req_res::Message::Response { + request_id, + response, + }, + .. + })) => { + if let Some(event) = self.handle_response(&request_id, response) { + return Poll::Ready(ToSwarm::GenerateEvent(event)); + } + + continue; // not a request we care about + } + Poll::Ready(ToSwarm::GenerateEvent(req_res::Event::OutboundFailure { + request_id, + .. + })) => { + if let Some(event) = self.event_for_outbound_failure(&request_id) { + return Poll::Ready(ToSwarm::GenerateEvent(event)); + } + + continue; // not a request we care about + } + Poll::Ready(ToSwarm::GenerateEvent( + req_res::Event::InboundFailure { .. } + | req_res::Event::ResponseSent { .. } + | req_res::Event::Message { + message: req_res::Message::Request { .. }, + .. }, - handler: NotifyHandler::Any, - }, - Err(signing_error) => ToSwarm::GenerateEvent(Event::RegisterFailed( - RegisterError::FailedToMakeRecord(signing_error), - )), - }; + )) => { + unreachable!("rendezvous clients never receive requests") + } + Poll::Ready( + other @ (ToSwarm::ExternalAddrConfirmed(_) + | ToSwarm::ExternalAddrExpired(_) + | ToSwarm::NewExternalAddrCandidate(_) + | ToSwarm::NotifyHandler { .. } + | ToSwarm::Dial { .. } + | ToSwarm::CloseConnection { .. } + | ToSwarm::ListenOn { .. } + | ToSwarm::RemoveListener { .. }), + ) => { + let new_to_swarm = + other.map_out(|_| unreachable!("we manually map `GenerateEvent` variants")); + + return Poll::Ready(new_to_swarm); + } + Poll::Pending => {} + } - return Poll::Ready(action); - } + if let Poll::Ready(Some(expired_registration)) = + self.expiring_registrations.poll_next_unpin(cx) + { + self.discovered_peers.remove(&expired_registration); + return Poll::Ready(ToSwarm::GenerateEvent(Event::Expired { + peer: expired_registration.0, + })); + } - if let Some(expired_registration) = - futures::ready!(self.expiring_registrations.poll_next_unpin(cx)) - { - self.discovered_peers.remove(&expired_registration); - return Poll::Ready(ToSwarm::GenerateEvent(Event::Expired { - peer: expired_registration.0, - })); + return Poll::Pending; } - - Poll::Pending } - fn on_swarm_event(&mut self, event: FromSwarm) { - self.external_addresses.on_swarm_event(&event); + fn handle_pending_outbound_connection( + &mut self, + _connection_id: ConnectionId, + maybe_peer: Option, + _addresses: &[Multiaddr], + _effective_role: Endpoint, + ) -> Result, ConnectionDenied> { + let peer = match maybe_peer { + None => return Ok(vec![]), + Some(peer) => peer, + }; - match event { - FromSwarm::ConnectionEstablished(_) - | FromSwarm::ConnectionClosed(_) - | FromSwarm::AddressChange(_) - | FromSwarm::DialFailure(_) - | FromSwarm::ListenFailure(_) - | FromSwarm::NewListener(_) - | FromSwarm::NewListenAddr(_) - | FromSwarm::ExpiredListenAddr(_) - | FromSwarm::ListenerError(_) - | FromSwarm::ListenerClosed(_) - | FromSwarm::NewExternalAddrCandidate(_) - | FromSwarm::ExternalAddrExpired(_) - | FromSwarm::ExternalAddrConfirmed(_) => {} - } + let addresses = self + .discovered_peers + .iter() + .filter_map(|((candidate, _), addresses)| (candidate == &peer).then_some(addresses)) + .flatten() + .cloned() + .collect(); + + Ok(addresses) } } -fn handle_outbound_event( - event: outbound::OutEvent, - peer_id: PeerId, - discovered_peers: &mut HashMap<(PeerId, Namespace), Vec>, - expiring_registrations: &mut FuturesUnordered>, -) -> Vec>> { - match event { - outbound::OutEvent::Registered { namespace, ttl } => { - vec![ToSwarm::GenerateEvent(Event::Registered { - rendezvous_node: peer_id, - ttl, +impl Behaviour { + fn event_for_outbound_failure(&mut self, req_id: &RequestId) -> Option { + if let Some((rendezvous_node, namespace)) = self.waiting_for_register.remove(req_id) { + return Some(Event::RegisterFailed(RegisterError::Remote { + rendezvous_node, namespace, - })] - } - outbound::OutEvent::RegisterFailed(namespace, error) => { - vec![ToSwarm::GenerateEvent(Event::RegisterFailed( - RegisterError::Remote { - rendezvous_node: peer_id, - namespace, - error, - }, - ))] - } - outbound::OutEvent::Discovered { - registrations, - cookie, - } => { - discovered_peers.extend(registrations.iter().map(|registration| { - let peer_id = registration.record.peer_id(); - let namespace = registration.namespace.clone(); + error: ErrorCode::Unavailable, + })); + }; - let addresses = registration.record.addresses().to_vec(); + if let Some((rendezvous_node, namespace)) = self.waiting_for_discovery.remove(req_id) { + return Some(Event::DiscoverFailed { + rendezvous_node, + namespace, + error: ErrorCode::Unavailable, + }); + }; - ((peer_id, namespace), addresses) - })); - expiring_registrations.extend(registrations.iter().cloned().map(|registration| { - async move { - // if the timer errors we consider it expired - futures_timer::Delay::new(Duration::from_secs(registration.ttl)).await; + None + } - (registration.record.peer_id(), registration.namespace) + fn handle_response(&mut self, request_id: &RequestId, response: Message) -> Option { + match response { + RegisterResponse(Ok(ttl)) => { + if let Some((rendezvous_node, namespace)) = + self.waiting_for_register.remove(request_id) + { + return Some(Event::Registered { + rendezvous_node, + ttl, + namespace, + }); } - .boxed() - })); - vec![ToSwarm::GenerateEvent(Event::Discovered { - rendezvous_node: peer_id, - registrations, - cookie, - })] - } - outbound::OutEvent::DiscoverFailed { namespace, error } => { - vec![ToSwarm::GenerateEvent(Event::DiscoverFailed { - rendezvous_node: peer_id, - namespace, - error, - })] + None + } + RegisterResponse(Err(error_code)) => { + if let Some((rendezvous_node, namespace)) = + self.waiting_for_register.remove(request_id) + { + return Some(Event::RegisterFailed(RegisterError::Remote { + rendezvous_node, + namespace, + error: error_code, + })); + } + + None + } + DiscoverResponse(Ok((registrations, cookie))) => { + if let Some((rendezvous_node, _ns)) = self.waiting_for_discovery.remove(request_id) + { + self.discovered_peers + .extend(registrations.iter().map(|registration| { + let peer_id = registration.record.peer_id(); + let namespace = registration.namespace.clone(); + + let addresses = registration.record.addresses().to_vec(); + + ((peer_id, namespace), addresses) + })); + + self.expiring_registrations + .extend(registrations.iter().cloned().map(|registration| { + async move { + // if the timer errors we consider it expired + futures_timer::Delay::new(Duration::from_secs(registration.ttl)) + .await; + + (registration.record.peer_id(), registration.namespace) + } + .boxed() + })); + + return Some(Event::Discovered { + rendezvous_node, + registrations, + cookie, + }); + } + + None + } + DiscoverResponse(Err(error_code)) => { + if let Some((rendezvous_node, ns)) = self.waiting_for_discovery.remove(request_id) { + return Some(Event::DiscoverFailed { + rendezvous_node, + namespace: ns, + error: error_code, + }); + } + + None + } + _ => unreachable!("rendezvous clients never receive requests"), } } } diff --git a/protocols/rendezvous/src/codec.rs b/protocols/rendezvous/src/codec.rs index 716ad79893f..bfc3cf275fc 100644 --- a/protocols/rendezvous/src/codec.rs +++ b/protocols/rendezvous/src/codec.rs @@ -19,16 +19,24 @@ // DEALINGS IN THE SOFTWARE. use crate::DEFAULT_TTL; +use async_trait::async_trait; use asynchronous_codec::{BytesMut, Decoder, Encoder}; +use asynchronous_codec::{FramedRead, FramedWrite}; +use futures::{AsyncRead, AsyncWrite, SinkExt, StreamExt}; use libp2p_core::{peer_record, signed_envelope, PeerRecord, SignedEnvelope}; +use libp2p_swarm::StreamProtocol; +use quick_protobuf_codec::Codec as ProtobufCodec; use rand::RngCore; use std::convert::{TryFrom, TryInto}; -use std::fmt; +use std::{fmt, io}; pub type Ttl = u64; +pub(crate) type Limit = u64; + +const MAX_MESSAGE_LEN_BYTES: usize = 1024 * 1024; #[allow(clippy::large_enum_variant)] -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub enum Message { Register(NewRegistration), RegisterResponse(Result), @@ -36,7 +44,7 @@ pub enum Message { Discover { namespace: Option, cookie: Option, - limit: Option, + limit: Option, }, DiscoverResponse(Result<(Vec, Cookie), ErrorCode>), } @@ -49,7 +57,7 @@ impl Namespace { /// /// This will panic if the namespace is too long. We accepting panicking in this case because we are enforcing a `static lifetime which means this value can only be a constant in the program and hence we hope the developer checked that it is of an acceptable length. pub fn from_static(value: &'static str) -> Self { - if value.len() > 255 { + if value.len() > crate::MAX_NAMESPACE { panic!("Namespace '{value}' is too long!") } @@ -57,7 +65,7 @@ impl Namespace { } pub fn new(value: String) -> Result { - if value.len() > 255 { + if value.len() > crate::MAX_NAMESPACE { return Err(NamespaceTooLong); } @@ -160,7 +168,7 @@ impl Cookie { #[error("The cookie was malformed")] pub struct InvalidCookie; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct NewRegistration { pub namespace: Namespace, pub record: PeerRecord, @@ -199,35 +207,27 @@ pub enum ErrorCode { Unavailable, } -pub struct RendezvousCodec { - inner: quick_protobuf_codec::Codec, -} - -impl Default for RendezvousCodec { - fn default() -> Self { - Self { - inner: quick_protobuf_codec::Codec::new(1024 * 1024), // 1MB - } - } -} - -impl Encoder for RendezvousCodec { +impl Encoder for Codec { type Item = Message; type Error = Error; fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { - self.inner.encode(proto::Message::from(item), dst)?; + let mut pb: ProtobufCodec = ProtobufCodec::new(MAX_MESSAGE_LEN_BYTES); + + pb.encode(proto::Message::from(item), dst)?; Ok(()) } } -impl Decoder for RendezvousCodec { +impl Decoder for Codec { type Item = Message; type Error = Error; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - let message = match self.inner.decode(src)? { + let mut pb: ProtobufCodec = ProtobufCodec::new(MAX_MESSAGE_LEN_BYTES); + + let message = match pb.decode(src)? { Some(p) => p, None => return Ok(None), }; @@ -236,6 +236,72 @@ impl Decoder for RendezvousCodec { } } +#[derive(Clone, Default)] +pub struct Codec {} + +#[async_trait] +impl libp2p_request_response::Codec for Codec { + type Protocol = StreamProtocol; + type Request = Message; + type Response = Message; + + async fn read_request(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let message = FramedRead::new(io, self.clone()) + .next() + .await + .ok_or(io::ErrorKind::UnexpectedEof)??; + + Ok(message) + } + + async fn read_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let message = FramedRead::new(io, self.clone()) + .next() + .await + .ok_or(io::ErrorKind::UnexpectedEof)??; + + Ok(message) + } + + async fn write_request( + &mut self, + _: &Self::Protocol, + io: &mut T, + req: Self::Request, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + FramedWrite::new(io, self.clone()).send(req).await?; + + Ok(()) + } + + async fn write_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + res: Self::Response, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + FramedWrite::new(io, self.clone()).send(res).await?; + + Ok(()) + } +} + #[derive(Debug, thiserror::Error)] pub enum Error { #[error(transparent)] @@ -246,6 +312,16 @@ pub enum Error { Conversion(#[from] ConversionError), } +impl From for std::io::Error { + fn from(value: Error) -> Self { + match value { + Error::Io(e) => e, + Error::Codec(e) => io::Error::from(e), + Error::Conversion(e) => io::Error::new(io::ErrorKind::InvalidInput, e), + } + } +} + impl From for proto::Message { fn from(message: Message) -> Self { match message { @@ -528,7 +604,7 @@ impl TryFrom for ErrorCode { E_UNAVAILABLE => ErrorCode::Unavailable, }; - Result::Ok(code) + Ok(code) } } @@ -567,6 +643,7 @@ mod proto { #[cfg(test)] mod tests { use super::*; + use crate::Namespace; #[test] fn cookie_wire_encoding_roundtrip() { diff --git a/protocols/rendezvous/src/handler.rs b/protocols/rendezvous/src/handler.rs deleted file mode 100644 index ccf765c9c65..00000000000 --- a/protocols/rendezvous/src/handler.rs +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright 2021 COMIT Network. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use crate::codec; -use crate::codec::Message; -use libp2p_swarm::StreamProtocol; -use void::Void; - -const PROTOCOL_IDENT: StreamProtocol = StreamProtocol::new("/rendezvous/1.0.0"); - -pub(crate) mod inbound; -pub(crate) mod outbound; -/// Errors that can occur while interacting with a substream. -#[allow(clippy::large_enum_variant)] -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("Reading message {0:?} at this stage is a protocol violation")] - BadMessage(Message), - #[error("Failed to write message to substream")] - WriteMessage(#[source] codec::Error), - #[error("Failed to read message from substream")] - ReadMessage(#[source] codec::Error), - #[error("Substream ended unexpectedly mid-protocol")] - UnexpectedEndOfStream, -} - -pub(crate) type OutboundInEvent = crate::substream_handler::InEvent; -pub(crate) type OutboundOutEvent = - crate::substream_handler::OutEvent; - -pub(crate) type InboundInEvent = crate::substream_handler::InEvent<(), inbound::InEvent, Void>; -pub(crate) type InboundOutEvent = - crate::substream_handler::OutEvent; diff --git a/protocols/rendezvous/src/handler/inbound.rs b/protocols/rendezvous/src/handler/inbound.rs deleted file mode 100644 index bf0083780c5..00000000000 --- a/protocols/rendezvous/src/handler/inbound.rs +++ /dev/null @@ -1,192 +0,0 @@ -// Copyright 2021 COMIT Network. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use crate::codec::{ - Cookie, ErrorCode, Message, Namespace, NewRegistration, Registration, RendezvousCodec, Ttl, -}; -use crate::handler::Error; -use crate::handler::PROTOCOL_IDENT; -use crate::substream_handler::{Next, PassthroughProtocol, SubstreamHandler}; -use asynchronous_codec::Framed; -use futures::{SinkExt, StreamExt}; -use libp2p_swarm::SubstreamProtocol; -use std::fmt; -use std::task::{Context, Poll}; - -/// The state of an inbound substream (i.e. the remote node opened it). -#[allow(clippy::large_enum_variant)] -#[allow(clippy::enum_variant_names)] -pub enum Stream { - /// We are in the process of reading a message from the substream. - PendingRead(Framed), - /// We read a message, dispatched it to the behaviour and are waiting for the response. - PendingBehaviour(Framed), - /// We are in the process of sending a response. - PendingSend(Framed, Message), - /// We've sent the message and are now closing down the substream. - PendingClose(Framed), -} - -impl fmt::Debug for Stream { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Stream::PendingRead(_) => write!(f, "Inbound::PendingRead"), - Stream::PendingBehaviour(_) => write!(f, "Inbound::PendingBehaviour"), - Stream::PendingSend(_, _) => write!(f, "Inbound::PendingSend"), - Stream::PendingClose(_) => write!(f, "Inbound::PendingClose"), - } - } -} - -#[allow(clippy::large_enum_variant)] -#[allow(clippy::enum_variant_names)] -#[derive(Debug, Clone)] -pub enum OutEvent { - RegistrationRequested(NewRegistration), - UnregisterRequested(Namespace), - DiscoverRequested { - namespace: Option, - cookie: Option, - limit: Option, - }, -} - -#[derive(Debug)] -pub enum InEvent { - RegisterResponse { - ttl: Ttl, - }, - DeclineRegisterRequest(ErrorCode), - DiscoverResponse { - discovered: Vec, - cookie: Cookie, - }, - DeclineDiscoverRequest(ErrorCode), -} - -impl SubstreamHandler for Stream { - type InEvent = InEvent; - type OutEvent = OutEvent; - type Error = Error; - type OpenInfo = (); - - fn upgrade( - open_info: Self::OpenInfo, - ) -> SubstreamProtocol { - SubstreamProtocol::new(PassthroughProtocol::new(PROTOCOL_IDENT), open_info) - } - - fn new(substream: libp2p_swarm::Stream, _: Self::OpenInfo) -> Self { - Stream::PendingRead(Framed::new(substream, RendezvousCodec::default())) - } - - fn on_event(self, event: Self::InEvent) -> Self { - match (event, self) { - (InEvent::RegisterResponse { ttl }, Stream::PendingBehaviour(substream)) => { - Stream::PendingSend(substream, Message::RegisterResponse(Ok(ttl))) - } - (InEvent::DeclineRegisterRequest(error), Stream::PendingBehaviour(substream)) => { - Stream::PendingSend(substream, Message::RegisterResponse(Err(error))) - } - ( - InEvent::DiscoverResponse { discovered, cookie }, - Stream::PendingBehaviour(substream), - ) => Stream::PendingSend( - substream, - Message::DiscoverResponse(Ok((discovered, cookie))), - ), - (InEvent::DeclineDiscoverRequest(error), Stream::PendingBehaviour(substream)) => { - Stream::PendingSend(substream, Message::DiscoverResponse(Err(error))) - } - (event, inbound) => { - debug_assert!(false, "{inbound:?} cannot handle event {event:?}"); - - inbound - } - } - } - - fn advance(self, cx: &mut Context<'_>) -> Result, Self::Error> { - let next_state = match self { - Stream::PendingRead(mut substream) => { - match substream.poll_next_unpin(cx).map_err(Error::ReadMessage)? { - Poll::Ready(Some(msg)) => { - let event = match msg { - Message::Register(registration) => { - OutEvent::RegistrationRequested(registration) - } - Message::Discover { - cookie, - namespace, - limit, - } => OutEvent::DiscoverRequested { - cookie, - namespace, - limit, - }, - Message::Unregister(namespace) => { - OutEvent::UnregisterRequested(namespace) - } - other => return Err(Error::BadMessage(other)), - }; - - Next::EmitEvent { - event, - next_state: Stream::PendingBehaviour(substream), - } - } - Poll::Ready(None) => return Err(Error::UnexpectedEndOfStream), - Poll::Pending => Next::Pending { - next_state: Stream::PendingRead(substream), - }, - } - } - Stream::PendingBehaviour(substream) => Next::Pending { - next_state: Stream::PendingBehaviour(substream), - }, - Stream::PendingSend(mut substream, message) => match substream - .poll_ready_unpin(cx) - .map_err(Error::WriteMessage)? - { - Poll::Ready(()) => { - substream - .start_send_unpin(message) - .map_err(Error::WriteMessage)?; - - Next::Continue { - next_state: Stream::PendingClose(substream), - } - } - Poll::Pending => Next::Pending { - next_state: Stream::PendingSend(substream, message), - }, - }, - Stream::PendingClose(mut substream) => match substream.poll_close_unpin(cx) { - Poll::Ready(Ok(())) => Next::Done, - Poll::Ready(Err(_)) => Next::Done, // there is nothing we can do about an error during close - Poll::Pending => Next::Pending { - next_state: Stream::PendingClose(substream), - }, - }, - }; - - Ok(next_state) - } -} diff --git a/protocols/rendezvous/src/handler/outbound.rs b/protocols/rendezvous/src/handler/outbound.rs deleted file mode 100644 index dd44bf8c2b4..00000000000 --- a/protocols/rendezvous/src/handler/outbound.rs +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright 2021 COMIT Network. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use crate::codec::{Cookie, Message, NewRegistration, RendezvousCodec}; -use crate::handler::Error; -use crate::handler::PROTOCOL_IDENT; -use crate::substream_handler::{FutureSubstream, Next, PassthroughProtocol, SubstreamHandler}; -use crate::{ErrorCode, Namespace, Registration, Ttl}; -use asynchronous_codec::Framed; -use futures::{SinkExt, TryFutureExt, TryStreamExt}; -use libp2p_swarm::SubstreamProtocol; -use std::task::Context; -use void::Void; - -pub struct Stream(FutureSubstream); - -impl SubstreamHandler for Stream { - type InEvent = Void; - type OutEvent = OutEvent; - type Error = Error; - type OpenInfo = OpenInfo; - - fn upgrade( - open_info: Self::OpenInfo, - ) -> SubstreamProtocol { - SubstreamProtocol::new(PassthroughProtocol::new(PROTOCOL_IDENT), open_info) - } - - fn new(substream: libp2p_swarm::Stream, info: Self::OpenInfo) -> Self { - let mut stream = Framed::new(substream, RendezvousCodec::default()); - let sent_message = match info { - OpenInfo::RegisterRequest(new_registration) => Message::Register(new_registration), - OpenInfo::UnregisterRequest(namespace) => Message::Unregister(namespace), - OpenInfo::DiscoverRequest { - namespace, - cookie, - limit, - } => Message::Discover { - namespace, - cookie, - limit, - }, - }; - - Self(FutureSubstream::new(async move { - use Message::*; - use OutEvent::*; - - stream - .send(sent_message.clone()) - .map_err(Error::WriteMessage) - .await?; - let received_message = stream.try_next().map_err(Error::ReadMessage).await?; - let received_message = received_message.ok_or(Error::UnexpectedEndOfStream)?; - - let event = match (sent_message, received_message) { - (Register(registration), RegisterResponse(Ok(ttl))) => Registered { - namespace: registration.namespace, - ttl, - }, - (Register(registration), RegisterResponse(Err(error))) => { - RegisterFailed(registration.namespace, error) - } - (Discover { .. }, DiscoverResponse(Ok((registrations, cookie)))) => Discovered { - registrations, - cookie, - }, - (Discover { namespace, .. }, DiscoverResponse(Err(error))) => { - DiscoverFailed { namespace, error } - } - (.., other) => return Err(Error::BadMessage(other)), - }; - - stream.close().map_err(Error::WriteMessage).await?; - - Ok(event) - })) - } - - fn on_event(self, event: Self::InEvent) -> Self { - void::unreachable(event) - } - - fn advance(self, cx: &mut Context<'_>) -> Result, Self::Error> { - Ok(self.0.advance(cx)?.map_state(Stream)) - } -} - -#[derive(Debug, Clone)] -pub enum OutEvent { - Registered { - namespace: Namespace, - ttl: Ttl, - }, - RegisterFailed(Namespace, ErrorCode), - Discovered { - registrations: Vec, - cookie: Cookie, - }, - DiscoverFailed { - namespace: Option, - error: ErrorCode, - }, -} - -#[allow(clippy::large_enum_variant)] -#[allow(clippy::enum_variant_names)] -#[derive(Debug)] -pub enum OpenInfo { - RegisterRequest(NewRegistration), - UnregisterRequest(Namespace), - DiscoverRequest { - namespace: Option, - cookie: Option, - limit: Option, - }, -} diff --git a/protocols/rendezvous/src/lib.rs b/protocols/rendezvous/src/lib.rs index 337e554ea00..7c607085f20 100644 --- a/protocols/rendezvous/src/lib.rs +++ b/protocols/rendezvous/src/lib.rs @@ -23,10 +23,9 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] pub use self::codec::{Cookie, ErrorCode, Namespace, NamespaceTooLong, Registration, Ttl}; +use libp2p_swarm::StreamProtocol; mod codec; -mod handler; -mod substream_handler; /// If unspecified, rendezvous nodes should assume a TTL of 2h. /// @@ -43,5 +42,12 @@ pub const MIN_TTL: Ttl = 60 * 60 * 2; /// . pub const MAX_TTL: Ttl = 60 * 60 * 72; +/// The maximum namespace length. +/// +/// . +pub const MAX_NAMESPACE: usize = 255; + +pub(crate) const PROTOCOL_IDENT: StreamProtocol = StreamProtocol::new("/rendezvous/1.0.0"); + pub mod client; pub mod server; diff --git a/protocols/rendezvous/src/server.rs b/protocols/rendezvous/src/server.rs index 6d64938ca3d..44f2f97a6a0 100644 --- a/protocols/rendezvous/src/server.rs +++ b/protocols/rendezvous/src/server.rs @@ -18,30 +18,29 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::codec::{Cookie, ErrorCode, Namespace, NewRegistration, Registration, Ttl}; -use crate::handler::inbound; -use crate::substream_handler::{InEvent, InboundSubstreamId, SubstreamConnectionHandler}; -use crate::{handler, MAX_TTL, MIN_TTL}; +use crate::codec::{Cookie, ErrorCode, Message, Namespace, NewRegistration, Registration, Ttl}; +use crate::{MAX_TTL, MIN_TTL}; use bimap::BiMap; use futures::future::BoxFuture; -use futures::ready; use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use libp2p_core::{Endpoint, Multiaddr}; use libp2p_identity::PeerId; +use libp2p_request_response::ProtocolSupport; use libp2p_swarm::behaviour::FromSwarm; use libp2p_swarm::{ - CloseConnection, ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, - PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, + ConnectionDenied, ConnectionId, NetworkBehaviour, PollParameters, THandler, THandlerInEvent, + THandlerOutEvent, ToSwarm, }; -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, HashSet}; +use std::iter; use std::iter::FromIterator; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use std::time::Duration; -use void::Void; pub struct Behaviour { - events: VecDeque>>, + inner: libp2p_request_response::Behaviour, + registrations: Registrations, } @@ -75,7 +74,12 @@ impl Behaviour { /// Create a new instance of the rendezvous [`NetworkBehaviour`]. pub fn new(config: Config) -> Self { Self { - events: Default::default(), + inner: libp2p_request_response::Behaviour::with_codec( + crate::codec::Codec::default(), + iter::once((crate::PROTOCOL_IDENT, ProtocolSupport::Inbound)), + libp2p_request_response::Config::default(), + ), + registrations: Registrations::with_config(config), } } @@ -109,31 +113,36 @@ pub enum Event { } impl NetworkBehaviour for Behaviour { - type ConnectionHandler = SubstreamConnectionHandler; + type ConnectionHandler = as NetworkBehaviour>::ConnectionHandler; + type ToSwarm = Event; fn handle_established_inbound_connection( &mut self, - _: ConnectionId, - _: PeerId, - _: &Multiaddr, - _: &Multiaddr, + connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, ) -> Result, ConnectionDenied> { - Ok(SubstreamConnectionHandler::new_inbound_only( - Duration::from_secs(30), - )) + self.inner.handle_established_inbound_connection( + connection_id, + peer, + local_addr, + remote_addr, + ) } fn handle_established_outbound_connection( &mut self, - _: ConnectionId, - _: PeerId, - _: &Multiaddr, - _: Endpoint, + connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: Endpoint, ) -> Result, ConnectionDenied> { - Ok(SubstreamConnectionHandler::new_inbound_only( - Duration::from_secs(30), - )) + self.inner + .handle_established_outbound_connection(connection_id, peer, addr, role_override) } fn on_connection_handler_event( @@ -142,29 +151,14 @@ impl NetworkBehaviour for Behaviour { connection: ConnectionId, event: THandlerOutEvent, ) { - let new_events = match event { - handler::InboundOutEvent::InboundEvent { id, message } => { - handle_inbound_event(message, peer_id, connection, id, &mut self.registrations) - } - handler::InboundOutEvent::OutboundEvent { message, .. } => void::unreachable(message), - handler::InboundOutEvent::InboundError { error, .. } => { - log::warn!("Connection with peer {} failed: {}", peer_id, error); - - vec![ToSwarm::CloseConnection { - peer_id, - connection: CloseConnection::One(connection), - }] - } - handler::InboundOutEvent::OutboundError { error, .. } => void::unreachable(error), - }; - - self.events.extend(new_events); + self.inner + .on_connection_handler_event(peer_id, connection, event); } fn poll( &mut self, cx: &mut Context<'_>, - _: &mut impl PollParameters, + params: &mut impl PollParameters, ) -> Poll>> { if let Poll::Ready(ExpiredRegistration(registration)) = self.registrations.poll(cx) { return Poll::Ready(ToSwarm::GenerateEvent(Event::RegistrationExpired( @@ -172,106 +166,134 @@ impl NetworkBehaviour for Behaviour { ))); } - if let Some(event) = self.events.pop_front() { - return Poll::Ready(event); - } + loop { + if let Poll::Ready(to_swarm) = self.inner.poll(cx, params) { + match to_swarm { + ToSwarm::GenerateEvent(libp2p_request_response::Event::Message { + peer: peer_id, + message: + libp2p_request_response::Message::Request { + request, channel, .. + }, + }) => { + if let Some((event, response)) = + handle_request(peer_id, request, &mut self.registrations) + { + if let Some(resp) = response { + self.inner + .send_response(channel, resp) + .expect("Send response"); + } + + return Poll::Ready(ToSwarm::GenerateEvent(event)); + } + + continue; + } + ToSwarm::GenerateEvent(libp2p_request_response::Event::InboundFailure { + peer, + request_id, + error, + }) => { + log::warn!("Inbound request {request_id} with peer {peer} failed: {error}"); + + continue; + } + ToSwarm::GenerateEvent(libp2p_request_response::Event::ResponseSent { + .. + }) + | ToSwarm::GenerateEvent(libp2p_request_response::Event::Message { + peer: _, + message: libp2p_request_response::Message::Response { .. }, + }) + | ToSwarm::GenerateEvent(libp2p_request_response::Event::OutboundFailure { + .. + }) => { + continue; + } + ToSwarm::Dial { .. } + | ToSwarm::ListenOn { .. } + | ToSwarm::RemoveListener { .. } + | ToSwarm::NotifyHandler { .. } + | ToSwarm::NewExternalAddrCandidate(_) + | ToSwarm::ExternalAddrConfirmed(_) + | ToSwarm::ExternalAddrExpired(_) + | ToSwarm::CloseConnection { .. } => { + let new_to_swarm = to_swarm + .map_out(|_| unreachable!("we manually map `GenerateEvent` variants")); + + return Poll::Ready(new_to_swarm); + } + }; + } - Poll::Pending + return Poll::Pending; + } } fn on_swarm_event(&mut self, event: FromSwarm) { - match event { - FromSwarm::ConnectionEstablished(_) - | FromSwarm::ConnectionClosed(_) - | FromSwarm::AddressChange(_) - | FromSwarm::DialFailure(_) - | FromSwarm::ListenFailure(_) - | FromSwarm::NewListener(_) - | FromSwarm::NewListenAddr(_) - | FromSwarm::ExpiredListenAddr(_) - | FromSwarm::ListenerError(_) - | FromSwarm::ListenerClosed(_) - | FromSwarm::NewExternalAddrCandidate(_) - | FromSwarm::ExternalAddrExpired(_) - | FromSwarm::ExternalAddrConfirmed(_) => {} - } + self.inner.on_swarm_event(event); } } -fn handle_inbound_event( - event: inbound::OutEvent, +fn handle_request( peer_id: PeerId, - connection: ConnectionId, - id: InboundSubstreamId, + message: Message, registrations: &mut Registrations, -) -> Vec>> { - match event { - // bad registration - inbound::OutEvent::RegistrationRequested(registration) - if registration.record.peer_id() != peer_id => - { - let error = ErrorCode::NotAuthorized; - - vec![ - ToSwarm::NotifyHandler { - peer_id, - handler: NotifyHandler::One(connection), - event: handler::InboundInEvent::NotifyInboundSubstream { - id, - message: inbound::InEvent::DeclineRegisterRequest(error), - }, - }, - ToSwarm::GenerateEvent(Event::PeerNotRegistered { +) -> Option<(Event, Option)> { + match message { + Message::Register(registration) => { + if registration.record.peer_id() != peer_id { + let error = ErrorCode::NotAuthorized; + + let event = Event::PeerNotRegistered { peer: peer_id, namespace: registration.namespace, error, - }), - ] - } - inbound::OutEvent::RegistrationRequested(registration) => { + }; + + return Some((event, Some(Message::RegisterResponse(Err(error))))); + } + let namespace = registration.namespace.clone(); match registrations.add(registration) { Ok(registration) => { - vec![ - ToSwarm::NotifyHandler { - peer_id, - handler: NotifyHandler::One(connection), - event: handler::InboundInEvent::NotifyInboundSubstream { - id, - message: inbound::InEvent::RegisterResponse { - ttl: registration.ttl, - }, - }, - }, - ToSwarm::GenerateEvent(Event::PeerRegistered { - peer: peer_id, - registration, - }), - ] + let response = Message::RegisterResponse(Ok(registration.ttl)); + + let event = Event::PeerRegistered { + peer: peer_id, + registration, + }; + + Some((event, Some(response))) } Err(TtlOutOfRange::TooLong { .. }) | Err(TtlOutOfRange::TooShort { .. }) => { let error = ErrorCode::InvalidTtl; - vec![ - ToSwarm::NotifyHandler { - peer_id, - handler: NotifyHandler::One(connection), - event: handler::InboundInEvent::NotifyInboundSubstream { - id, - message: inbound::InEvent::DeclineRegisterRequest(error), - }, - }, - ToSwarm::GenerateEvent(Event::PeerNotRegistered { - peer: peer_id, - namespace, - error, - }), - ] + let response = Message::RegisterResponse(Err(error)); + + let event = Event::PeerNotRegistered { + peer: peer_id, + namespace, + error, + }; + + Some((event, Some(response))) } } } - inbound::OutEvent::DiscoverRequested { + Message::Unregister(namespace) => { + registrations.remove(namespace.clone(), peer_id); + + let event = Event::PeerUnregistered { + peer: peer_id, + namespace, + }; + + Some((event, None)) + } + Message::Discover { namespace, cookie, limit, @@ -279,51 +301,30 @@ fn handle_inbound_event( Ok((registrations, cookie)) => { let discovered = registrations.cloned().collect::>(); - vec![ - ToSwarm::NotifyHandler { - peer_id, - handler: NotifyHandler::One(connection), - event: handler::InboundInEvent::NotifyInboundSubstream { - id, - message: inbound::InEvent::DiscoverResponse { - discovered: discovered.clone(), - cookie, - }, - }, - }, - ToSwarm::GenerateEvent(Event::DiscoverServed { - enquirer: peer_id, - registrations: discovered, - }), - ] + let response = Message::DiscoverResponse(Ok((discovered.clone(), cookie))); + + let event = Event::DiscoverServed { + enquirer: peer_id, + registrations: discovered, + }; + + Some((event, Some(response))) } Err(_) => { let error = ErrorCode::InvalidCookie; - vec![ - ToSwarm::NotifyHandler { - peer_id, - handler: NotifyHandler::One(connection), - event: handler::InboundInEvent::NotifyInboundSubstream { - id, - message: inbound::InEvent::DeclineDiscoverRequest(error), - }, - }, - ToSwarm::GenerateEvent(Event::DiscoverNotServed { - enquirer: peer_id, - error, - }), - ] + let response = Message::DiscoverResponse(Err(error)); + + let event = Event::DiscoverNotServed { + enquirer: peer_id, + error, + }; + + Some((event, Some(response))) } }, - inbound::OutEvent::UnregisterRequested(namespace) => { - registrations.remove(namespace.clone(), peer_id); - - vec![ToSwarm::GenerateEvent(Event::PeerUnregistered { - peer: peer_id, - namespace, - })] - } + Message::RegisterResponse(_) => None, + Message::DiscoverResponse(_) => None, } } @@ -488,32 +489,38 @@ impl Registrations { self.cookies .insert(new_cookie.clone(), reggos_of_last_discover); - let reggos = &self.registrations; + let regs = &self.registrations; let registrations = ids .into_iter() - .map(move |id| reggos.get(&id).expect("bad internal datastructure")); + .map(move |id| regs.get(&id).expect("bad internal data structure")); Ok((registrations, new_cookie)) } fn poll(&mut self, cx: &mut Context<'_>) -> Poll { - let expired_registration = ready!(self.next_expiry.poll_next_unpin(cx)).expect( - "This stream should never finish because it is initialised with a pending future", - ); + loop { + let expired_registration = ready!(self.next_expiry.poll_next_unpin(cx)).expect( + "This stream should never finish because it is initialised with a pending future", + ); - // clean up our cookies - self.cookies.retain(|_, registrations| { - registrations.remove(&expired_registration); + // clean up our cookies + self.cookies.retain(|_, registrations| { + registrations.remove(&expired_registration); - // retain all cookies where there are still registrations left - !registrations.is_empty() - }); + // retain all cookies where there are still registrations left + !registrations.is_empty() + }); - self.registrations_for_peer - .remove_by_right(&expired_registration); - match self.registrations.remove(&expired_registration) { - None => self.poll(cx), - Some(registration) => Poll::Ready(ExpiredRegistration(registration)), + self.registrations_for_peer + .remove_by_right(&expired_registration); + match self.registrations.remove(&expired_registration) { + None => { + continue; + } + Some(registration) => { + return Poll::Ready(ExpiredRegistration(registration)); + } + } } } } diff --git a/protocols/rendezvous/src/substream_handler.rs b/protocols/rendezvous/src/substream_handler.rs deleted file mode 100644 index 2434b691fb2..00000000000 --- a/protocols/rendezvous/src/substream_handler.rs +++ /dev/null @@ -1,556 +0,0 @@ -// Copyright 2021 COMIT Network. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! A generic [`ConnectionHandler`] that delegates the handling of substreams to [`SubstreamHandler`]s. -//! -//! This module is an attempt to simplify the implementation of protocols by freeing implementations from dealing with aspects such as concurrent substreams. -//! Particularly for outbound substreams, it greatly simplifies the definition of protocols through the [`FutureSubstream`] helper. -//! -//! At the moment, this module is an implementation detail of the rendezvous protocol but the intent is for it to be provided as a generic module that is accessible to other protocols as well. - -use futures::future::{self, BoxFuture, Fuse, FusedFuture}; -use futures::FutureExt; -use instant::Instant; -use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; -use libp2p_swarm::handler::{ConnectionEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound}; -use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, KeepAlive, Stream, StreamProtocol, SubstreamProtocol, -}; -use std::collections::{HashMap, VecDeque}; -use std::fmt; -use std::future::Future; -use std::hash::Hash; -use std::task::{Context, Poll}; -use std::time::Duration; -use void::Void; - -/// Handles a substream throughout its lifetime. -pub trait SubstreamHandler: Sized { - type InEvent; - type OutEvent; - type Error; - type OpenInfo; - - fn upgrade(open_info: Self::OpenInfo) - -> SubstreamProtocol; - fn new(substream: Stream, info: Self::OpenInfo) -> Self; - fn on_event(self, event: Self::InEvent) -> Self; - fn advance(self, cx: &mut Context<'_>) -> Result, Self::Error>; -} - -/// The result of advancing a [`SubstreamHandler`]. -pub enum Next { - /// Return the given event and set the handler into `next_state`. - EmitEvent { event: TEvent, next_state: TState }, - /// The handler currently cannot do any more work, set its state back into `next_state`. - Pending { next_state: TState }, - /// The handler performed some work and wants to continue in the given state. - /// - /// This variant is useful because it frees the handler from implementing a loop internally. - Continue { next_state: TState }, - /// The handler finished. - Done, -} - -impl Next { - pub fn map_state( - self, - map: impl FnOnce(TState) -> TNextState, - ) -> Next { - match self { - Next::EmitEvent { event, next_state } => Next::EmitEvent { - event, - next_state: map(next_state), - }, - Next::Pending { next_state } => Next::Pending { - next_state: map(next_state), - }, - Next::Continue { next_state } => Next::Pending { - next_state: map(next_state), - }, - Next::Done => Next::Done, - } - } -} - -#[derive(Debug, Hash, Eq, PartialEq, Clone, Copy)] -pub struct InboundSubstreamId(u64); - -impl InboundSubstreamId { - fn fetch_and_increment(&mut self) -> Self { - let next_id = *self; - self.0 += 1; - - next_id - } -} - -impl fmt::Display for InboundSubstreamId { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.0) - } -} - -#[derive(Debug, Hash, Eq, PartialEq, Clone, Copy)] -pub struct OutboundSubstreamId(u64); - -impl OutboundSubstreamId { - fn fetch_and_increment(&mut self) -> Self { - let next_id = *self; - self.0 += 1; - - next_id - } -} - -impl fmt::Display for OutboundSubstreamId { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.0) - } -} - -pub struct PassthroughProtocol { - ident: Option, -} - -impl PassthroughProtocol { - pub fn new(ident: StreamProtocol) -> Self { - Self { ident: Some(ident) } - } -} - -impl UpgradeInfo for PassthroughProtocol { - type Info = StreamProtocol; - type InfoIter = std::option::IntoIter; - - fn protocol_info(&self) -> Self::InfoIter { - self.ident.clone().into_iter() - } -} - -impl InboundUpgrade for PassthroughProtocol { - type Output = C; - type Error = Void; - type Future = BoxFuture<'static, Result>; - - fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { - match self.ident { - Some(_) => future::ready(Ok(socket)).boxed(), - None => future::pending().boxed(), - } - } -} - -impl OutboundUpgrade for PassthroughProtocol { - type Output = C; - type Error = Void; - type Future = BoxFuture<'static, Result>; - - fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { - match self.ident { - Some(_) => future::ready(Ok(socket)).boxed(), - None => future::pending().boxed(), - } - } -} - -/// An implementation of [`ConnectionHandler`] that delegates to individual [`SubstreamHandler`]s. -pub struct SubstreamConnectionHandler { - inbound_substreams: HashMap, - outbound_substreams: HashMap, - next_inbound_substream_id: InboundSubstreamId, - next_outbound_substream_id: OutboundSubstreamId, - - new_substreams: VecDeque, - - initial_keep_alive_deadline: Instant, -} - -impl - SubstreamConnectionHandler -{ - pub fn new(initial_keep_alive: Duration) -> Self { - Self { - inbound_substreams: Default::default(), - outbound_substreams: Default::default(), - next_inbound_substream_id: InboundSubstreamId(0), - next_outbound_substream_id: OutboundSubstreamId(0), - new_substreams: Default::default(), - initial_keep_alive_deadline: Instant::now() + initial_keep_alive, - } - } -} - -impl - SubstreamConnectionHandler -{ - pub fn new_outbound_only(initial_keep_alive: Duration) -> Self { - Self { - inbound_substreams: Default::default(), - outbound_substreams: Default::default(), - next_inbound_substream_id: InboundSubstreamId(0), - next_outbound_substream_id: OutboundSubstreamId(0), - new_substreams: Default::default(), - initial_keep_alive_deadline: Instant::now() + initial_keep_alive, - } - } -} - -impl - SubstreamConnectionHandler -{ - pub fn new_inbound_only(initial_keep_alive: Duration) -> Self { - Self { - inbound_substreams: Default::default(), - outbound_substreams: Default::default(), - next_inbound_substream_id: InboundSubstreamId(0), - next_outbound_substream_id: OutboundSubstreamId(0), - new_substreams: Default::default(), - initial_keep_alive_deadline: Instant::now() + initial_keep_alive, - } - } -} - -/// Poll all substreams within the given HashMap. -/// -/// This is defined as a separate function because we call it with two different fields stored within [`SubstreamConnectionHandler`]. -fn poll_substreams( - substreams: &mut HashMap, - cx: &mut Context<'_>, -) -> Poll> -where - TSubstream: SubstreamHandler, - TId: Copy + Eq + Hash + fmt::Display, -{ - let substream_ids = substreams.keys().copied().collect::>(); - - 'loop_substreams: for id in substream_ids { - let mut handler = substreams - .remove(&id) - .expect("we just got the key out of the map"); - - let (next_state, poll) = 'loop_handler: loop { - match handler.advance(cx) { - Ok(Next::EmitEvent { next_state, event }) => { - break (next_state, Poll::Ready(Ok((id, event)))) - } - Ok(Next::Pending { next_state }) => break (next_state, Poll::Pending), - Ok(Next::Continue { next_state }) => { - handler = next_state; - continue 'loop_handler; - } - Ok(Next::Done) => { - log::debug!("Substream handler {} finished", id); - continue 'loop_substreams; - } - Err(e) => return Poll::Ready(Err((id, e))), - } - }; - - substreams.insert(id, next_state); - - return poll; - } - - Poll::Pending -} - -/// Event sent from the [`libp2p_swarm::NetworkBehaviour`] to the [`SubstreamConnectionHandler`]. -#[allow(clippy::enum_variant_names)] -#[derive(Debug)] -pub enum InEvent { - /// Open a new substream using the provided `open_info`. - /// - /// For "client-server" protocols, this is typically the initial message to be sent to the other party. - NewSubstream { open_info: I }, - NotifyInboundSubstream { - id: InboundSubstreamId, - message: TInboundEvent, - }, - NotifyOutboundSubstream { - id: OutboundSubstreamId, - message: TOutboundEvent, - }, -} - -/// Event produced by the [`SubstreamConnectionHandler`] for the corresponding [`libp2p_swarm::NetworkBehaviour`]. -#[derive(Debug)] -pub enum OutEvent { - /// An inbound substream produced an event. - InboundEvent { - id: InboundSubstreamId, - message: TInbound, - }, - /// An outbound substream produced an event. - OutboundEvent { - id: OutboundSubstreamId, - message: TOutbound, - }, - /// An inbound substream errored irrecoverably. - InboundError { - id: InboundSubstreamId, - error: TInboundError, - }, - /// An outbound substream errored irrecoverably. - OutboundError { - id: OutboundSubstreamId, - error: TOutboundError, - }, -} - -impl< - TInboundInEvent, - TInboundOutEvent, - TOutboundInEvent, - TOutboundOutEvent, - TOutboundOpenInfo, - TInboundError, - TOutboundError, - TInboundSubstreamHandler, - TOutboundSubstreamHandler, - > ConnectionHandler - for SubstreamConnectionHandler< - TInboundSubstreamHandler, - TOutboundSubstreamHandler, - TOutboundOpenInfo, - > -where - TInboundSubstreamHandler: SubstreamHandler< - InEvent = TInboundInEvent, - OutEvent = TInboundOutEvent, - Error = TInboundError, - OpenInfo = (), - >, - TOutboundSubstreamHandler: SubstreamHandler< - InEvent = TOutboundInEvent, - OutEvent = TOutboundOutEvent, - Error = TOutboundError, - OpenInfo = TOutboundOpenInfo, - >, - TInboundInEvent: fmt::Debug + Send + 'static, - TInboundOutEvent: fmt::Debug + Send + 'static, - TOutboundInEvent: fmt::Debug + Send + 'static, - TOutboundOutEvent: fmt::Debug + Send + 'static, - TOutboundOpenInfo: fmt::Debug + Send + 'static, - TInboundError: fmt::Debug + Send + 'static, - TOutboundError: fmt::Debug + Send + 'static, - TInboundSubstreamHandler: Send + 'static, - TOutboundSubstreamHandler: Send + 'static, -{ - type FromBehaviour = InEvent; - type ToBehaviour = OutEvent; - type Error = Void; - type InboundProtocol = PassthroughProtocol; - type OutboundProtocol = PassthroughProtocol; - type InboundOpenInfo = (); - type OutboundOpenInfo = TOutboundOpenInfo; - - fn listen_protocol(&self) -> SubstreamProtocol { - TInboundSubstreamHandler::upgrade(()) - } - - fn on_connection_event( - &mut self, - event: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, - Self::InboundOpenInfo, - Self::OutboundOpenInfo, - >, - ) { - match event { - ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { - protocol, .. - }) => { - self.inbound_substreams.insert( - self.next_inbound_substream_id.fetch_and_increment(), - TInboundSubstreamHandler::new(protocol, ()), - ); - } - ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { - protocol, - info, - }) => { - self.outbound_substreams.insert( - self.next_outbound_substream_id.fetch_and_increment(), - TOutboundSubstreamHandler::new(protocol, info), - ); - } - // TODO: Handle upgrade errors properly - ConnectionEvent::AddressChange(_) - | ConnectionEvent::ListenUpgradeError(_) - | ConnectionEvent::DialUpgradeError(_) - | ConnectionEvent::LocalProtocolsChange(_) - | ConnectionEvent::RemoteProtocolsChange(_) => {} - } - } - - fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { - match event { - InEvent::NewSubstream { open_info } => self.new_substreams.push_back(open_info), - InEvent::NotifyInboundSubstream { id, message } => { - match self.inbound_substreams.remove(&id) { - Some(handler) => { - let new_handler = handler.on_event(message); - - self.inbound_substreams.insert(id, new_handler); - } - None => { - log::debug!("Substream with ID {} not found", id); - } - } - } - InEvent::NotifyOutboundSubstream { id, message } => { - match self.outbound_substreams.remove(&id) { - Some(handler) => { - let new_handler = handler.on_event(message); - - self.outbound_substreams.insert(id, new_handler); - } - None => { - log::debug!("Substream with ID {} not found", id); - } - } - } - } - } - - fn connection_keep_alive(&self) -> KeepAlive { - // Rudimentary keep-alive handling, to be extended as needed as this abstraction is used more by other protocols. - - if Instant::now() < self.initial_keep_alive_deadline { - return KeepAlive::Yes; - } - - if self.inbound_substreams.is_empty() - && self.outbound_substreams.is_empty() - && self.new_substreams.is_empty() - { - return KeepAlive::No; - } - - KeepAlive::Yes - } - - fn poll( - &mut self, - cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, - > { - if let Some(open_info) = self.new_substreams.pop_front() { - return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: TOutboundSubstreamHandler::upgrade(open_info), - }); - } - - match poll_substreams(&mut self.inbound_substreams, cx) { - Poll::Ready(Ok((id, message))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - OutEvent::InboundEvent { id, message }, - )) - } - Poll::Ready(Err((id, error))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - OutEvent::InboundError { id, error }, - )) - } - Poll::Pending => {} - } - - match poll_substreams(&mut self.outbound_substreams, cx) { - Poll::Ready(Ok((id, message))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - OutEvent::OutboundEvent { id, message }, - )) - } - Poll::Ready(Err((id, error))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - OutEvent::OutboundError { id, error }, - )) - } - Poll::Pending => {} - } - - Poll::Pending - } -} - -/// A helper struct for substream handlers that can be implemented as async functions. -/// -/// This only works for substreams without an `InEvent` because - once constructed - the state of an inner future is opaque. -pub(crate) struct FutureSubstream { - future: Fuse>>, -} - -impl FutureSubstream { - pub(crate) fn new( - future: impl Future> + Send + 'static, - ) -> Self { - Self { - future: future.boxed().fuse(), - } - } - - pub(crate) fn advance(mut self, cx: &mut Context<'_>) -> Result, TError> { - if self.future.is_terminated() { - return Ok(Next::Done); - } - - match self.future.poll_unpin(cx) { - Poll::Ready(Ok(event)) => Ok(Next::EmitEvent { - event, - next_state: self, - }), - Poll::Ready(Err(error)) => Err(error), - Poll::Pending => Ok(Next::Pending { next_state: self }), - } - } -} - -impl SubstreamHandler for void::Void { - type InEvent = void::Void; - type OutEvent = void::Void; - type Error = void::Void; - type OpenInfo = (); - - fn new(_: Stream, _: Self::OpenInfo) -> Self { - unreachable!("we should never yield a substream") - } - - fn on_event(self, event: Self::InEvent) -> Self { - void::unreachable(event) - } - - fn advance(self, _: &mut Context<'_>) -> Result, Self::Error> { - void::unreachable(self) - } - - fn upgrade( - open_info: Self::OpenInfo, - ) -> SubstreamProtocol { - SubstreamProtocol::new(PassthroughProtocol { ident: None }, open_info) - } -}