Skip to content

Commit

Permalink
feat: add peering support (#194)
Browse files Browse the repository at this point in the history
* feat: add peering support

* fix: updates from review

* fix: allow deprecated code to make clippy happy

* fix: final review comments
  • Loading branch information
smrz2001 authored Nov 21, 2023
1 parent b2df35c commit 1e9cba6
Show file tree
Hide file tree
Showing 10 changed files with 910 additions and 478 deletions.
1,047 changes: 601 additions & 446 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions beetle/iroh-bitswap/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ impl ConnectionHandler for BitswapHandler {
}
}

// TODO(WS1-1344): Remove uses of ConnectionHandlerEvent::Close
#[allow(deprecated)]
#[tracing::instrument(skip(substream))]
fn inbound_substream(
// Include remote_peer_id for tracing context only
Expand Down
24 changes: 23 additions & 1 deletion one/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ struct DaemonOpts {
)]
swarm_addresses: Vec<String>,

/// Extra bootstrap peer addresses to be used in addition to the official bootstrap addresses.
/// A best-effort attempt will be made to maintain a connection to these addresses.
#[arg(
long,
use_value_delimiter = true,
value_delimiter = ',',
env = "CERAMIC_ONE_EXTRA_BOOTSTRAP_ADDRESSES"
)]
extra_bootstrap_addresses: Vec<String>,

/// Path to storage directory
#[arg(short, long, env = "CERAMIC_ONE_STORE_DIR")]
store_dir: Option<PathBuf>,
Expand Down Expand Up @@ -363,7 +373,19 @@ impl Daemon {
max_conns_pending_in: opts.max_conns_pending_in,
max_conns_per_peer: opts.max_conns_per_peer,
idle_connection_timeout: Duration::from_millis(opts.idle_conns_timeout_ms),
bootstrap_peers: opts.network.bootstrap_addresses(),
// Add extra bootstrap addresses to the list of official bootstrap addresses, so that our bootstrap nodes
// are always included.
bootstrap_peers: opts
.network
.bootstrap_addresses()
.into_iter()
.chain(
opts.extra_bootstrap_addresses
.iter()
.map(|addr| addr.parse())
.collect::<Result<Vec<Multiaddr>, multiaddr::Error>>()?,
)
.collect(),
listening_multiaddrs: opts
.swarm_addresses
.iter()
Expand Down
1 change: 1 addition & 0 deletions p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ ahash.workspace = true
anyhow.workspace = true
async-stream.workspace = true
async-trait.workspace = true
backoff.workspace = true
bytes.workspace = true
ceramic-core.workspace = true
ceramic-metrics.workspace = true
Expand Down
5 changes: 3 additions & 2 deletions p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub use self::event::Event;
use self::peer_manager::PeerManager;
use crate::config::Libp2pConfig;
use crate::sqliteblockstore::SQLiteBlockStore;
use crate::Metrics;

mod event;
mod peer_manager;
Expand Down Expand Up @@ -106,8 +107,8 @@ where
relay_client: Option<relay::client::Behaviour>,
recons: Option<(I, M)>,
block_store: SQLiteBlockStore,
metrics: Metrics,
) -> Result<Self> {
let peer_manager = PeerManager::default();
let pub_key = local_key.public();
let peer_id = pub_key.to_peer_id();

Expand Down Expand Up @@ -252,7 +253,7 @@ where
dcutr: dcutr.into(),
relay_client: relay_client.into(),
gossipsub,
peer_manager,
peer_manager: PeerManager::new(&config.bootstrap_peers, metrics)?,
limits,
recon: recon.into(),
})
Expand Down
209 changes: 195 additions & 14 deletions p2p/src/behaviour/peer_manager.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,36 @@
use std::{
fmt::{self, Debug, Formatter},
future,
num::NonZeroUsize,
pin::Pin,
task::{Context, Poll},
time::Duration,
};

use ahash::AHashMap;
use anyhow::{anyhow, Result};
use backoff::{backoff::Backoff, ExponentialBackoff, ExponentialBackoffBuilder};
#[allow(deprecated)]
use ceramic_metrics::core::MRecorder;
use ceramic_metrics::{inc, p2p::P2PMetrics};
use ceramic_metrics::{inc, p2p::P2PMetrics, Recorder};
use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt};
use libp2p::swarm::{dial_opts::DialOpts, ToSwarm};
use libp2p::{
identify::Info as IdentifyInfo,
multiaddr::Protocol,
swarm::{dummy, ConnectionId, DialError, NetworkBehaviour, PollParameters},
PeerId,
Multiaddr, PeerId,
};
use lru::LruCache;
use tokio::time;
use tracing::{info, warn};

use crate::metrics::{self, Metrics};

pub struct PeerManager {
info: AHashMap<PeerId, Info>,
bad_peers: LruCache<PeerId, ()>,
bootstrap_peer_manager: BootstrapPeerManager,
supported_protocols: Vec<String>,
}

Expand All @@ -35,21 +48,24 @@ impl Info {
}

const DEFAULT_BAD_PEER_CAP: Option<NonZeroUsize> = NonZeroUsize::new(10 * 4096);
const BOOTSTRAP_MIN_DIAL_SECS: Duration = Duration::from_secs(1); // 1 second min between redials
const BOOTSTRAP_MAX_DIAL_SECS: Duration = Duration::from_secs(300); // 5 minutes max between redials
const BOOTSTRAP_DIAL_BACKOFF: f64 = 1.4;
const BOOTSTRAP_DIAL_JITTER: f64 = 0.1;

#[derive(Debug)]
pub enum PeerManagerEvent {}

impl Default for PeerManager {
fn default() -> Self {
PeerManager {
impl PeerManager {
pub fn new(bootstrap_peers: &[Multiaddr], metrics: Metrics) -> Result<Self> {
Ok(Self {
info: Default::default(),
bad_peers: LruCache::new(DEFAULT_BAD_PEER_CAP.unwrap()),
bootstrap_peer_manager: BootstrapPeerManager::new(bootstrap_peers, metrics)?,
supported_protocols: Default::default(),
}
})
}
}

#[derive(Debug)]
pub enum PeerManagerEvent {}

impl PeerManager {
pub fn is_bad_peer(&self, peer_id: &PeerId) -> bool {
self.bad_peers.contains(peer_id)
}
Expand Down Expand Up @@ -78,11 +94,14 @@ impl NetworkBehaviour for PeerManager {
fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm<Self::ConnectionHandler>) {
match event {
libp2p::swarm::FromSwarm::ConnectionEstablished(event) => {
// First connection
if event.other_established == 0 {
let p = self.bad_peers.pop(&event.peer_id);
if p.is_some() {
inc!(P2PMetrics::BadPeerRemoved);
}
self.bootstrap_peer_manager
.handle_connection_established(&event.peer_id)
}

if let Some(info) = self.info.get_mut(&event.peer_id) {
Expand All @@ -92,7 +111,13 @@ impl NetworkBehaviour for PeerManager {
}
}
}

libp2p::swarm::FromSwarm::ConnectionClosed(event) => {
// Last connection
if event.remaining_established == 0 {
self.bootstrap_peer_manager
.handle_connection_closed(&event.peer_id)
}
}
libp2p::swarm::FromSwarm::DialFailure(event) => {
if let Some(peer_id) = event.peer_id {
match event.error {
Expand All @@ -105,6 +130,7 @@ impl NetworkBehaviour for PeerManager {
self.info.remove(&peer_id);
}
}
self.bootstrap_peer_manager.handle_dial_failure(&peer_id)
}
}
// Not interested in any other events
Expand All @@ -122,7 +148,7 @@ impl NetworkBehaviour for PeerManager {

fn poll(
&mut self,
_cx: &mut Context<'_>,
cx: &mut Context<'_>,
params: &mut impl PollParameters,
) -> Poll<libp2p::swarm::ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>> {
// TODO(nathanielc):
Expand All @@ -147,7 +173,14 @@ impl NetworkBehaviour for PeerManager {
.collect();
}

Poll::Pending
// Check if a bootstrap peer needs to be dialed
match self.bootstrap_peer_manager.poll_next_unpin(cx) {
// TODO: Maybe we don't want to dial if there was an ongoing incoming dial attempt
Poll::Ready(Some(multiaddr)) => Poll::Ready(ToSwarm::Dial {
opts: DialOpts::unknown_peer_id().address(multiaddr).build(),
}),
_ => Poll::Pending,
}
}

fn handle_established_inbound_connection(
Expand All @@ -170,3 +203,151 @@ impl NetworkBehaviour for PeerManager {
Ok(dummy::ConnectionHandler)
}
}

pub struct BootstrapPeerManager {
bootstrap_peers: AHashMap<PeerId, BootstrapPeer>,
metrics: Metrics,
}

impl Debug for BootstrapPeerManager {
fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
f.debug_struct("BootstrapPeerManager")
.field("bootstrap_peers", &self.bootstrap_peers)
.finish()
}
}

pub struct BootstrapPeer {
multiaddr: Multiaddr,
dial_backoff: ExponentialBackoff,
dial_future: Option<BoxFuture<'static, ()>>,
}

impl Debug for BootstrapPeer {
fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
f.debug_struct("BootstrapPeer")
.field("multiaddr", &self.multiaddr)
.field("dial_backoff", &self.dial_backoff)
.field("dial_future", &self.dial_future.is_some())
.finish()
}
}

impl BootstrapPeerManager {
fn new(bootstrap_peers: &[Multiaddr], metrics: Metrics) -> Result<Self> {
let bootstrap_peers = bootstrap_peers
.iter()
.map(|multiaddr| {
if let Some(peer) = multiaddr.iter().find_map(|proto| match proto {
Protocol::P2p(peer_id) => {
Some((peer_id, BootstrapPeer::new(multiaddr.to_owned())))
}
_ => None,
}) {
Ok(peer)
} else {
Err(anyhow!("Could not parse bootstrap addr {}", multiaddr))
}
})
.collect::<Result<AHashMap<PeerId, BootstrapPeer>, anyhow::Error>>()?;
Ok(Self {
bootstrap_peers,
metrics,
})
}

fn handle_connection_established(&mut self, peer_id: &PeerId) {
if let Some(peer) = self.bootstrap_peers.get_mut(peer_id) {
info!(
multiaddr = %peer.multiaddr,
"connection established, stop dialing bootstrap peer",
);
peer.stop_redial();
self.metrics.record(&metrics::PeeringEvent::Connected);
}
}

fn handle_connection_closed(&mut self, peer_id: &PeerId) {
if let Some(peer) = self.bootstrap_peers.get_mut(peer_id) {
warn!(
multiaddr = %peer.multiaddr,
"Connection closed, redial bootstrap peer",
);
peer.start_redial();
self.metrics.record(&metrics::PeeringEvent::Disconnected);
}
}

fn handle_dial_failure(&mut self, peer_id: &PeerId) {
if let Some(peer) = self.bootstrap_peers.get_mut(peer_id) {
warn!(
multiaddr = %peer.multiaddr,
"Dail failed, redial bootstrap peer"
);
peer.backoff_redial();
self.metrics.record(&metrics::PeeringEvent::DialFailure);
}
}
}

impl Stream for BootstrapPeerManager {
type Item = Multiaddr;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
for (_, peer) in self.bootstrap_peers.iter_mut() {
if let Some(mut dial_future) = peer.dial_future.take() {
match dial_future.as_mut().poll_unpin(cx) {
Poll::Ready(()) => return Poll::Ready(Some(peer.multiaddr.clone())),
Poll::Pending => {
// Put the future back
peer.dial_future.replace(dial_future);
}
}
}
}
Poll::Pending
}
}

impl BootstrapPeer {
fn new(multiaddr: Multiaddr) -> Self {
let dial_backoff = ExponentialBackoffBuilder::new()
.with_initial_interval(BOOTSTRAP_MIN_DIAL_SECS)
.with_multiplier(BOOTSTRAP_DIAL_BACKOFF)
.with_randomization_factor(BOOTSTRAP_DIAL_JITTER)
.with_max_interval(BOOTSTRAP_MAX_DIAL_SECS)
.with_max_elapsed_time(None)
.build();
// Expire initial future so that we dial peers immediately
let dial_future = Some(future::ready(()).boxed());
Self {
multiaddr,
dial_backoff,
dial_future,
}
}

fn start_redial(&mut self) {
self.dial_backoff.reset();
let next_backoff = self.dial_backoff.next_backoff();
self.update_dial_future(next_backoff);
}

fn stop_redial(&mut self) {
self.dial_backoff.reset();
self.update_dial_future(None);
}

fn backoff_redial(&mut self) {
let next_backoff = self.dial_backoff.next_backoff();
self.update_dial_future(next_backoff);
}

fn update_dial_future(&mut self, duration: Option<Duration>) {
// This will drop the existing sleep future, if present, thereby canceling it.
self.dial_future = None;
if let Some(duration) = duration {
self.dial_future = Some(Box::pin(time::sleep(duration)));
}
}
}
Loading

0 comments on commit 1e9cba6

Please sign in to comment.