diff --git a/crates/subspace-networking/src/behavior.rs b/crates/subspace-networking/src/behavior.rs index dfa9f5cb24..a28e1598b8 100644 --- a/crates/subspace-networking/src/behavior.rs +++ b/crates/subspace-networking/src/behavior.rs @@ -6,6 +6,9 @@ mod tests; use crate::request_responses::{ Event as RequestResponseEvent, RequestHandler, RequestResponsesBehaviour, }; +use crate::reserved_peers::{ + Behaviour as ReservedPeersBehaviour, Config as ReservedPeersConfig, Event as ReservedPeersEvent, +}; use derive_more::From; use libp2p::allow_block_list::{Behaviour as AllowBlockListBehaviour, BlockedPeers}; use libp2p::connection_limits::{Behaviour as ConnectionLimitsBehaviour, ConnectionLimits}; @@ -37,6 +40,8 @@ pub(crate) struct BehaviorConfig { pub(crate) request_response_protocols: Vec>, /// Connection limits for the swarm. pub(crate) connection_limits: ConnectionLimits, + /// The configuration for the [`ReservedPeersBehaviour`]. + pub(crate) reserved_peers: ReservedPeersConfig, } #[derive(NetworkBehaviour)] @@ -50,6 +55,7 @@ pub(crate) struct Behavior { pub(crate) request_response: RequestResponsesBehaviour, pub(crate) connection_limits: ConnectionLimitsBehaviour, pub(crate) block_list: BlockListBehaviour, + pub(crate) reserved_peers: ReservedPeersBehaviour, } impl Behavior @@ -87,6 +93,7 @@ where .expect("RequestResponse protocols registration failed."), connection_limits: ConnectionLimitsBehaviour::new(config.connection_limits), block_list: BlockListBehaviour::default(), + reserved_peers: ReservedPeersBehaviour::new(config.reserved_peers), } } } @@ -100,4 +107,5 @@ pub(crate) enum Event { RequestResponse(RequestResponseEvent), /// Event stub for connection limits and block list behaviours. We won't receive such events. VoidEventStub(VoidEvent), + ReservedPeers(ReservedPeersEvent), } diff --git a/crates/subspace-networking/src/create.rs b/crates/subspace-networking/src/create.rs index f37dff7e80..e6bc11038b 100644 --- a/crates/subspace-networking/src/create.rs +++ b/crates/subspace-networking/src/create.rs @@ -11,6 +11,7 @@ use crate::create::transport::build_transport; use crate::node::{CircuitRelayClientError, Node}; use crate::node_runner::{NodeRunner, NodeRunnerConfig}; use crate::request_responses::RequestHandler; +use crate::reserved_peers::Config as ReservedPeersConfig; use crate::shared::Shared; use crate::utils::{convert_multiaddresses, ResizableSemaphore}; use backoff::{ExponentialBackoff, SystemClock}; @@ -46,6 +47,7 @@ use tracing::{debug, error, info}; const DEFAULT_NETWORK_PROTOCOL_VERSION: &str = "dev"; const KADEMLIA_PROTOCOL: &[u8] = b"/subspace/kad/0.1.0"; const GOSSIPSUB_PROTOCOL_PREFIX: &str = "subspace/gossipsub"; +const RESERVED_PEERS_PROTOCOL_NAME: &[u8] = b"/subspace/reserved-peers/1.0.0"; // Defines max_negotiating_inbound_streams constant for the swarm. // It must be set for large plots. @@ -428,6 +430,10 @@ where record_store: ProviderOnlyRecordStore::new(provider_storage), request_response_protocols, connection_limits, + reserved_peers: ReservedPeersConfig { + reserved_peers: reserved_peers.clone(), + protocol_name: RESERVED_PEERS_PROTOCOL_NAME, + }, }); let mut swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id) diff --git a/crates/subspace-networking/src/lib.rs b/crates/subspace-networking/src/lib.rs index 8b4ad2272a..abc5d358c0 100644 --- a/crates/subspace-networking/src/lib.rs +++ b/crates/subspace-networking/src/lib.rs @@ -23,6 +23,7 @@ mod node; mod node_runner; mod request_handlers; mod request_responses; +mod reserved_peers; mod shared; pub mod utils; diff --git a/crates/subspace-networking/src/node_runner.rs b/crates/subspace-networking/src/node_runner.rs index 750a7a9dfb..3469673d7a 100644 --- a/crates/subspace-networking/src/node_runner.rs +++ b/crates/subspace-networking/src/node_runner.rs @@ -28,7 +28,7 @@ use libp2p::{futures, Multiaddr, PeerId, Swarm, TransportError}; use nohash_hasher::IntMap; use parking_lot::Mutex; use std::collections::hash_map::Entry; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::fmt::Debug; use std::num::NonZeroUsize; use std::pin::Pin; @@ -214,24 +214,6 @@ where let local_peer_id = *self.swarm.local_peer_id(); let connected_peers = self.swarm.connected_peers().cloned().collect::>(); - // Handle reserved peers first. - if !self.reserved_peers.is_empty() { - trace!(%local_peer_id, "Checking reserved peers connection: {:?}", self.reserved_peers); - - let connected_peers_id_set = connected_peers.iter().cloned().collect(); - let reserved_peers_id_set = self.reserved_peers.keys().cloned().collect::>(); - - let missing_reserved_peer_ids = - reserved_peers_id_set.difference(&connected_peers_id_set); - - // Establish missing connections to reserved peers. - for peer_id in missing_reserved_peer_ids { - if let Some(addr) = self.reserved_peers.get(peer_id) { - self.dial_peer(*peer_id, addr.clone()); - } - } - } - // Maintain target connection number. let (total_current_connections, established_connections) = { let network_info = self.swarm.network_info(); @@ -538,6 +520,8 @@ where let local_peer_id = *self.swarm.local_peer_id(); if let IdentifyEvent::Received { peer_id, mut info } = event { + debug!(?peer_id, protocols=?info.protocols, "IdentifyEvent::Received"); + // Check for network partition if info.protocol_version != self.protocol_version { debug!( diff --git a/crates/subspace-networking/src/reserved_peers.rs b/crates/subspace-networking/src/reserved_peers.rs new file mode 100644 index 0000000000..33a2c42946 --- /dev/null +++ b/crates/subspace-networking/src/reserved_peers.rs @@ -0,0 +1,252 @@ +mod handler; + +use handler::Handler; +use libp2p::core::{Endpoint, Multiaddr}; +use libp2p::swarm::behaviour::{ConnectionEstablished, FromSwarm}; +use libp2p::swarm::dial_opts::DialOpts; +use libp2p::swarm::{ + ConnectionClosed, ConnectionDenied, ConnectionId, DialFailure, NetworkBehaviour, + PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, +}; +use libp2p::PeerId; +use std::collections::HashMap; +use std::ops::Add; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; +use tracing::{debug, trace}; + +use crate::utils::convert_multiaddresses; + +/// `Behaviour` controls and maintains the state of connections to a predefined set of peers. +/// +/// The `Behaviour` struct is part of our custom protocol that aims to maintain persistent +/// connections to a predefined set of peers. It encapsulates the logic of managing the connections, +/// dialing, and handling various states of these connections. +/// +/// ## How it works +/// +/// Each `ReservedPeerState` can be in one of the following states, represented by the +/// `ConnectionStatus` enum: +/// 1. `NotConnected`: This state indicates that the peer is currently not connected. +/// The time for the next connection attempt is scheduled and can be queried. +/// 2. `PendingConnection`: This state means that a connection attempt to the peer is currently +/// in progress. +/// 3. `Connected`: This state signals that the peer is currently connected. +/// +/// The protocol will attempt to establish a connection to a `NotConnected` peer after a set delay, +/// specified by `DIALING_INTERVAL_IN_SECS`, to prevent multiple simultaneous connection attempts +/// to offline peers. This delay not only conserves resources, but also reduces the amount of +/// log output. +/// +/// ## Comments +/// +/// The protocol will establish one or two connections between each pair of reserved peers. +/// +/// IMPORTANT NOTE: For the maintenance of a persistent connection, both peers should have each +/// other in their `reserved peers set`. This is necessary because if only one peer has the other +/// in its `reserved peers set`, regular connection attempts will occur, but these connections will +/// be dismissed on the other side due to the `KeepAlive` policy. +/// +#[derive(Debug)] +pub struct Behaviour { + /// Protocol name. + protocol_name: &'static [u8], + /// A mapping from `PeerId` to `ReservedPeerState`, where each `ReservedPeerState` + /// represents the current state of the connection to a reserved peer. + reserved_peers_state: HashMap, +} + +/// Reserved peers protocol configuration. +#[derive(Debug, Clone)] +pub struct Config { + /// Protocol name. + pub protocol_name: &'static [u8], + /// Predefined set of reserved peers with addresses. + pub reserved_peers: Vec, +} + +/// Reserved peer connection status. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ConnectionStatus { + /// Reserved peer is not connected. The next connection attempt is scheduled. + NotConnected { scheduled_at: Instant }, + /// Reserved peer dialing is in progress. + PendingConnection, + /// Reserved peer is connected. + Connected, +} + +/// We pause between reserved peers dialing otherwise we could do multiple dials to offline peers +/// wasting resources and producing a ton of log records. +const DIALING_INTERVAL_IN_SECS: Duration = Duration::from_secs(1); + +/// Helper-function to schedule a connection attempt. +#[inline] +fn schedule_connection() -> Instant { + Instant::now().add(DIALING_INTERVAL_IN_SECS) +} + +/// Defines the state of a reserved peer connection state. +#[derive(Debug, Clone)] +struct ReservedPeerState { + connection_status: ConnectionStatus, + peer_id: PeerId, + address: Multiaddr, +} + +/// Reserved peer connection events. +/// Initially the "reserved peers behaviour" doesn't produce events. However, we could pass +/// reserved peer state changes to the swarm using this struct in the future. +#[derive(Debug, Clone)] +pub struct Event; + +impl Behaviour { + /// Creates a new `Behaviour` with a predefined set of reserved peers. + pub fn new(config: Config) -> Self { + debug!( + reserved_peers=?config.reserved_peers, + "Reserved peers protocol initialization...." + ); + + let peer_addresses = convert_multiaddresses(config.reserved_peers); + + let reserved_peers_state = peer_addresses + .into_iter() + .map(|(peer_id, address)| { + ( + peer_id, + ReservedPeerState { + peer_id, + address, + connection_status: ConnectionStatus::NotConnected { + scheduled_at: schedule_connection(), + }, + }, + ) + }) + .collect(); + + Self { + protocol_name: config.protocol_name, + reserved_peers_state, + } + } + + /// Create a connection handler for the reserved peers protocol. + #[inline] + fn new_reserved_peers_handler(&self, peer_id: &PeerId) -> Handler { + Handler::new( + self.protocol_name, + self.reserved_peers_state.contains_key(peer_id), + ) + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = Handler; + type OutEvent = Event; + + fn handle_established_inbound_connection( + &mut self, + _: ConnectionId, + peer_id: PeerId, + _: &Multiaddr, + _: &Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(self.new_reserved_peers_handler(&peer_id)) + } + + fn handle_established_outbound_connection( + &mut self, + _: ConnectionId, + peer_id: PeerId, + _: &Multiaddr, + _: Endpoint, + ) -> Result, ConnectionDenied> { + Ok(self.new_reserved_peers_handler(&peer_id)) + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + match event { + FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, .. }) => { + if let Some(state) = self.reserved_peers_state.get_mut(&peer_id) { + state.connection_status = ConnectionStatus::Connected; + + debug!(peer_id=%state.peer_id, "Reserved peer connected."); + } + } + FromSwarm::ConnectionClosed(ConnectionClosed { + peer_id, + remaining_established, + .. + }) => { + if let Some(state) = self.reserved_peers_state.get_mut(&peer_id) { + if remaining_established == 0 { + state.connection_status = ConnectionStatus::NotConnected { + scheduled_at: schedule_connection(), + }; + + debug!(%state.peer_id, "Reserved peer disconnected."); + } + } + } + FromSwarm::DialFailure(DialFailure { peer_id, .. }) => { + if let Some(peer_id) = peer_id { + if let Some(state) = self.reserved_peers_state.get_mut(&peer_id) { + if state.connection_status == ConnectionStatus::PendingConnection { + state.connection_status = ConnectionStatus::NotConnected { + scheduled_at: schedule_connection(), + }; + }; + + debug!(peer_id=%state.peer_id, "Reserved peer dialing failed."); + } + } + } + FromSwarm::AddressChange(_) + | FromSwarm::ListenFailure(_) + | FromSwarm::NewListener(_) + | FromSwarm::NewListenAddr(_) + | FromSwarm::ExpiredListenAddr(_) + | FromSwarm::ListenerError(_) + | FromSwarm::ListenerClosed(_) + | FromSwarm::NewExternalAddr(_) + | FromSwarm::ExpiredExternalAddr(_) => {} + } + } + + fn on_connection_handler_event( + &mut self, + _: PeerId, + _: ConnectionId, + _: THandlerOutEvent, + ) { + } + + fn poll( + &mut self, + _: &mut Context<'_>, + _: &mut impl PollParameters, + ) -> Poll>> { + for (_, state) in self.reserved_peers_state.iter_mut() { + trace!(?state, "Reserved peer state."); + + if let ConnectionStatus::NotConnected { scheduled_at } = state.connection_status { + if Instant::now() > scheduled_at { + state.connection_status = ConnectionStatus::PendingConnection; + + debug!(peer_id=%state.peer_id, "Dialing the reserved peer...."); + + let dial_opts = + DialOpts::peer_id(state.peer_id).addresses(vec![state.address.clone()]); + + return Poll::Ready(ToSwarm::Dial { + opts: dial_opts.build(), + }); + } + } + } + + Poll::Pending + } +} diff --git a/crates/subspace-networking/src/reserved_peers/handler.rs b/crates/subspace-networking/src/reserved_peers/handler.rs new file mode 100644 index 0000000000..d34ab8fa46 --- /dev/null +++ b/crates/subspace-networking/src/reserved_peers/handler.rs @@ -0,0 +1,90 @@ +use libp2p::core::upgrade::ReadyUpgrade; +use libp2p::swarm::handler::ConnectionEvent; +use libp2p::swarm::{ConnectionHandler, ConnectionHandlerEvent, KeepAlive, SubstreamProtocol}; +use std::error::Error; +use std::fmt; +use std::task::{Context, Poll}; +use void::Void; + +/// Connection handler for managing connections within our `reserved peers` protocol. +/// +/// This `Handler` is part of our custom protocol designed to maintain persistent connections +/// with a set of predefined peers. +/// +/// ## Connection Handling +/// +/// The `Handler` manages the lifecycle of a connection to each peer. If it's connected to a +/// reserved peer, it maintains the connection alive (`KeepAlive::Yes`). If not, it allows the +/// connection to close (`KeepAlive::No`). +/// +/// This behavior ensures that connections to reserved peers are maintained persistently, +/// while connections to non-reserved peers are allowed to close. +pub struct Handler { + /// Protocol name. + protocol_name: &'static [u8], + /// A boolean flag indicating whether the handler is currently connected to a reserved peer. + connected_to_reserved_peer: bool, +} + +impl Handler { + /// Builds a new [`Handler`]. + pub fn new(protocol_name: &'static [u8], connected_to_reserved_peer: bool) -> Self { + Handler { + protocol_name, + connected_to_reserved_peer, + } + } +} + +#[derive(Debug)] +pub struct ReservedPeersError; + +impl fmt::Display for ReservedPeersError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Reserved peers error.") + } +} + +impl Error for ReservedPeersError {} + +impl ConnectionHandler for Handler { + type InEvent = Void; + type OutEvent = (); + type Error = ReservedPeersError; + type InboundProtocol = ReadyUpgrade<&'static [u8]>; + type OutboundProtocol = ReadyUpgrade<&'static [u8]>; + type OutboundOpenInfo = (); + type InboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol, ()> { + SubstreamProtocol::new(ReadyUpgrade::new(self.protocol_name), ()) + } + + fn on_behaviour_event(&mut self, _: Void) {} + + fn connection_keep_alive(&self) -> KeepAlive { + if self.connected_to_reserved_peer { + KeepAlive::Yes + } else { + KeepAlive::No + } + } + + fn poll( + &mut self, + _: &mut Context<'_>, + ) -> Poll, (), (), Self::Error>> { + Poll::Pending + } + + fn on_connection_event( + &mut self, + _: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + } +} diff --git a/crates/subspace-service/src/dsn.rs b/crates/subspace-service/src/dsn.rs index 66ccba5638..b4e15b741d 100644 --- a/crates/subspace-service/src/dsn.rs +++ b/crates/subspace-service/src/dsn.rs @@ -240,6 +240,7 @@ where max_pending_incoming_connections: dsn_config.max_pending_in_connections, max_pending_outgoing_connections: dsn_config.max_pending_out_connections, target_connections: dsn_config.target_connections, + reserved_peers: dsn_config.reserved_peers, ..default_networking_config };