Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add reserved-peers protocol for DSN. #1474

Merged
merged 5 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions crates/subspace-networking/src/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ mod tests;
use crate::request_responses::{
Event as RequestResponseEvent, RequestHandler, RequestResponsesBehaviour,
};
use crate::reserved_peers::{
Behaviour as ReservedPeersBehaviour, Config as ReservedPeersConfig, Event as ReservedPeersEvent,
};
use derive_more::From;
use libp2p::allow_block_list::{Behaviour as AllowBlockListBehaviour, BlockedPeers};
use libp2p::connection_limits::{Behaviour as ConnectionLimitsBehaviour, ConnectionLimits};
Expand Down Expand Up @@ -37,6 +40,8 @@ pub(crate) struct BehaviorConfig<RecordStore> {
pub(crate) request_response_protocols: Vec<Box<dyn RequestHandler>>,
/// Connection limits for the swarm.
pub(crate) connection_limits: ConnectionLimits,
/// The configuration for the [`ReservedPeersBehaviour`].
pub(crate) reserved_peers: ReservedPeersConfig,
}

#[derive(NetworkBehaviour)]
Expand All @@ -50,6 +55,7 @@ pub(crate) struct Behavior<RecordStore> {
pub(crate) request_response: RequestResponsesBehaviour,
pub(crate) connection_limits: ConnectionLimitsBehaviour,
pub(crate) block_list: BlockListBehaviour,
pub(crate) reserved_peers: ReservedPeersBehaviour,
}

impl<RecordStore> Behavior<RecordStore>
Expand Down Expand Up @@ -87,6 +93,7 @@ where
.expect("RequestResponse protocols registration failed."),
connection_limits: ConnectionLimitsBehaviour::new(config.connection_limits),
block_list: BlockListBehaviour::default(),
reserved_peers: ReservedPeersBehaviour::new(config.reserved_peers),
}
}
}
Expand All @@ -100,4 +107,5 @@ pub(crate) enum Event {
RequestResponse(RequestResponseEvent),
/// Event stub for connection limits and block list behaviours. We won't receive such events.
VoidEventStub(VoidEvent),
ReservedPeers(ReservedPeersEvent),
}
6 changes: 6 additions & 0 deletions crates/subspace-networking/src/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::create::transport::build_transport;
use crate::node::{CircuitRelayClientError, Node};
use crate::node_runner::{NodeRunner, NodeRunnerConfig};
use crate::request_responses::RequestHandler;
use crate::reserved_peers::Config as ReservedPeersConfig;
use crate::shared::Shared;
use crate::utils::{convert_multiaddresses, ResizableSemaphore};
use backoff::{ExponentialBackoff, SystemClock};
Expand Down Expand Up @@ -46,6 +47,7 @@ use tracing::{debug, error, info};
const DEFAULT_NETWORK_PROTOCOL_VERSION: &str = "dev";
const KADEMLIA_PROTOCOL: &[u8] = b"/subspace/kad/0.1.0";
const GOSSIPSUB_PROTOCOL_PREFIX: &str = "subspace/gossipsub";
const RESERVED_PEERS_PROTOCOL_NAME: &[u8] = b"/subspace/reserved-peers/1.0.0";

// Defines max_negotiating_inbound_streams constant for the swarm.
// It must be set for large plots.
Expand Down Expand Up @@ -428,6 +430,10 @@ where
record_store: ProviderOnlyRecordStore::new(provider_storage),
request_response_protocols,
connection_limits,
reserved_peers: ReservedPeersConfig {
reserved_peers: reserved_peers.clone(),
protocol_name: RESERVED_PEERS_PROTOCOL_NAME,
},
});

let mut swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id)
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod node;
mod node_runner;
mod request_handlers;
mod request_responses;
mod reserved_peers;
mod shared;
pub mod utils;

Expand Down
22 changes: 3 additions & 19 deletions crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use libp2p::{futures, Multiaddr, PeerId, Swarm, TransportError};
use nohash_hasher::IntMap;
use parking_lot::Mutex;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::fmt::Debug;
use std::num::NonZeroUsize;
use std::pin::Pin;
Expand Down Expand Up @@ -214,24 +214,6 @@ where
let local_peer_id = *self.swarm.local_peer_id();
let connected_peers = self.swarm.connected_peers().cloned().collect::<Vec<_>>();

// Handle reserved peers first.
if !self.reserved_peers.is_empty() {
trace!(%local_peer_id, "Checking reserved peers connection: {:?}", self.reserved_peers);

let connected_peers_id_set = connected_peers.iter().cloned().collect();
let reserved_peers_id_set = self.reserved_peers.keys().cloned().collect::<HashSet<_>>();

let missing_reserved_peer_ids =
reserved_peers_id_set.difference(&connected_peers_id_set);

// Establish missing connections to reserved peers.
for peer_id in missing_reserved_peer_ids {
if let Some(addr) = self.reserved_peers.get(peer_id) {
self.dial_peer(*peer_id, addr.clone());
}
}
}

// Maintain target connection number.
let (total_current_connections, established_connections) = {
let network_info = self.swarm.network_info();
Expand Down Expand Up @@ -538,6 +520,8 @@ where
let local_peer_id = *self.swarm.local_peer_id();

if let IdentifyEvent::Received { peer_id, mut info } = event {
debug!(?peer_id, protocols=?info.protocols, "IdentifyEvent::Received");

// Check for network partition
if info.protocol_version != self.protocol_version {
debug!(
Expand Down
252 changes: 252 additions & 0 deletions crates/subspace-networking/src/reserved_peers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
mod handler;

use handler::Handler;
use libp2p::core::{Endpoint, Multiaddr};
use libp2p::swarm::behaviour::{ConnectionEstablished, FromSwarm};
use libp2p::swarm::dial_opts::DialOpts;
use libp2p::swarm::{
ConnectionClosed, ConnectionDenied, ConnectionId, DialFailure, NetworkBehaviour,
PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
};
use libp2p::PeerId;
use std::collections::HashMap;
use std::ops::Add;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use tracing::{debug, trace};

use crate::utils::convert_multiaddresses;

/// `Behaviour` controls and maintains the state of connections to a predefined set of peers.
///
/// The `Behaviour` struct is part of our custom protocol that aims to maintain persistent
/// connections to a predefined set of peers. It encapsulates the logic of managing the connections,
/// dialing, and handling various states of these connections.
///
/// ## How it works
///
/// Each `ReservedPeerState` can be in one of the following states, represented by the
/// `ConnectionStatus` enum:
/// 1. `NotConnected`: This state indicates that the peer is currently not connected.
/// The time for the next connection attempt is scheduled and can be queried.
/// 2. `PendingConnection`: This state means that a connection attempt to the peer is currently
/// in progress.
/// 3. `Connected`: This state signals that the peer is currently connected.
///
/// The protocol will attempt to establish a connection to a `NotConnected` peer after a set delay,
/// specified by `DIALING_INTERVAL_IN_SECS`, to prevent multiple simultaneous connection attempts
/// to offline peers. This delay not only conserves resources, but also reduces the amount of
/// log output.
///
/// ## Comments
///
/// The protocol will establish one or two connections between each pair of reserved peers.
///
/// IMPORTANT NOTE: For the maintenance of a persistent connection, both peers should have each
/// other in their `reserved peers set`. This is necessary because if only one peer has the other
/// in its `reserved peers set`, regular connection attempts will occur, but these connections will
/// be dismissed on the other side due to the `KeepAlive` policy.
///
#[derive(Debug)]
pub struct Behaviour {
/// Protocol name.
protocol_name: &'static [u8],
/// A mapping from `PeerId` to `ReservedPeerState`, where each `ReservedPeerState`
/// represents the current state of the connection to a reserved peer.
reserved_peers_state: HashMap<PeerId, ReservedPeerState>,
}

/// Reserved peers protocol configuration.
#[derive(Debug, Clone)]
pub struct Config {
/// Protocol name.
pub protocol_name: &'static [u8],
/// Predefined set of reserved peers with addresses.
pub reserved_peers: Vec<Multiaddr>,
}

/// Reserved peer connection status.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectionStatus {
/// Reserved peer is not connected. The next connection attempt is scheduled.
NotConnected { scheduled_at: Instant },
/// Reserved peer dialing is in progress.
PendingConnection,
/// Reserved peer is connected.
Connected,
}

/// We pause between reserved peers dialing otherwise we could do multiple dials to offline peers
/// wasting resources and producing a ton of log records.
const DIALING_INTERVAL_IN_SECS: Duration = Duration::from_secs(1);

/// Helper-function to schedule a connection attempt.
#[inline]
nazar-pc marked this conversation as resolved.
Show resolved Hide resolved
fn schedule_connection() -> Instant {
Instant::now().add(DIALING_INTERVAL_IN_SECS)
}

/// Defines the state of a reserved peer connection state.
#[derive(Debug, Clone)]
struct ReservedPeerState {
pub connection_status: ConnectionStatus,
pub peer_id: PeerId,
pub address: Multiaddr,
}

/// Reserved peer connection events.
/// Initially the "reserved peers behaviour" doesn't produce events. However, we could pass
/// reserved peer state changes to the swarm using this struct in the future.
nazar-pc marked this conversation as resolved.
Show resolved Hide resolved
#[derive(Debug, Clone)]
pub struct Event;

impl Behaviour {
/// Creates a new `Behaviour` with a predefined set of reserved peers.
pub fn new(config: Config) -> Self {
debug!(
reserved_peers=?config.reserved_peers,
"Reserved peers protocol initialization...."
);

let peer_addresses = convert_multiaddresses(config.reserved_peers);

let reserved_peers_state = peer_addresses
.into_iter()
.map(|(peer_id, address)| {
(
peer_id,
ReservedPeerState {
peer_id,
address,
connection_status: ConnectionStatus::NotConnected {
scheduled_at: schedule_connection(),
},
},
)
})
.collect();

Self {
protocol_name: config.protocol_name,
reserved_peers_state,
}
}

/// Create a connection handler for the reserved peers protocol.
#[inline]
fn new_reserved_peers_handler(&self, peer_id: &PeerId) -> Handler {
Handler::new(
self.protocol_name,
self.reserved_peers_state.contains_key(peer_id),
)
}
}

impl NetworkBehaviour for Behaviour {
type ConnectionHandler = Handler;
type OutEvent = Event;

fn handle_established_inbound_connection(
&mut self,
_: ConnectionId,
peer_id: PeerId,
_: &Multiaddr,
_: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(self.new_reserved_peers_handler(&peer_id))
}

fn handle_established_outbound_connection(
&mut self,
_: ConnectionId,
peer_id: PeerId,
_: &Multiaddr,
_: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(self.new_reserved_peers_handler(&peer_id))
}

fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
match event {
FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, .. }) => {
if let Some(state) = self.reserved_peers_state.get_mut(&peer_id) {
state.connection_status = ConnectionStatus::Connected;

debug!(peer_id=%state.peer_id, "Reserved peer connected.");
}
}
FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
remaining_established,
..
}) => {
if let Some(state) = self.reserved_peers_state.get_mut(&peer_id) {
if remaining_established == 0 {
state.connection_status = ConnectionStatus::NotConnected {
scheduled_at: schedule_connection(),
};

debug!(%state.peer_id, "Reserved peer disconnected.");
}
}
}
FromSwarm::DialFailure(DialFailure { peer_id, .. }) => {
if let Some(peer_id) = peer_id {
if let Some(state) = self.reserved_peers_state.get_mut(&peer_id) {
if state.connection_status == ConnectionStatus::PendingConnection {
state.connection_status = ConnectionStatus::NotConnected {
scheduled_at: schedule_connection(),
};
};

debug!(peer_id=%state.peer_id, "Reserved peer dialing failed.");
}
}
}
FromSwarm::AddressChange(_)
| FromSwarm::ListenFailure(_)
| FromSwarm::NewListener(_)
| FromSwarm::NewListenAddr(_)
| FromSwarm::ExpiredListenAddr(_)
| FromSwarm::ListenerError(_)
| FromSwarm::ListenerClosed(_)
| FromSwarm::NewExternalAddr(_)
| FromSwarm::ExpiredExternalAddr(_) => {}
}
}

fn on_connection_handler_event(
&mut self,
_: PeerId,
_: ConnectionId,
_: THandlerOutEvent<Self>,
) {
}

fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
for (_, state) in self.reserved_peers_state.iter_mut() {
trace!(?state, "Reserved peer state.");

if let ConnectionStatus::NotConnected { scheduled_at } = state.connection_status {
if Instant::now() > scheduled_at {
state.connection_status = ConnectionStatus::PendingConnection;

debug!(peer_id=%state.peer_id, "Dialing the reserved peer....");

let dial_opts =
DialOpts::peer_id(state.peer_id).addresses(vec![state.address.clone()]);

return Poll::Ready(ToSwarm::Dial {
opts: dial_opts.build(),
});
}
}
}

Poll::Pending
}
}
Loading