From ad65ccc38d91ac58d7c1bcbca3da4f5ba894d154 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Wed, 22 Nov 2023 13:39:03 -0700 Subject: [PATCH] fix: do not use bootstrap list with kademlia (#199) * fix: do not use bootstrap list with kademlia Prior to the peer_manager connecting to bootstrap peers we relied on kademlia to connect to them. Now not only is it not necessary but the peer_manager applies an appropriate backoff for redials where kademlia does not. There should be no change in external behavior except that we no longer spam dials to disconnected bootstrap peers. Now the peer_manager (renamed to ceramic_peer_manager) can report of a peer id is a ceramic peer. If we connect to a ceramic peer we add it to the kademlia routing table regardless of its reported protocols. This is because we discovered that peers sometimes do not report they support the kademlia protocol. * fix: typo --- one/src/lib.rs | 13 +- p2p/src/behaviour.rs | 25 +- ...eer_manager.rs => ceramic_peer_manager.rs} | 254 ++++++++---------- p2p/src/behaviour/event.rs | 2 +- p2p/src/config.rs | 6 +- p2p/src/node.rs | 67 ++--- 6 files changed, 158 insertions(+), 209 deletions(-) rename p2p/src/behaviour/{peer_manager.rs => ceramic_peer_manager.rs} (66%) diff --git a/one/src/lib.rs b/one/src/lib.rs index 2a746fe7f..ebcefd6ac 100644 --- a/one/src/lib.rs +++ b/one/src/lib.rs @@ -70,15 +70,15 @@ struct DaemonOpts { )] swarm_addresses: Vec, - /// Extra bootstrap peer addresses to be used in addition to the official bootstrap addresses. + /// Extra addresses of peers that participate in the Ceramic network. /// A best-effort attempt will be made to maintain a connection to these addresses. #[arg( long, use_value_delimiter = true, value_delimiter = ',', - env = "CERAMIC_ONE_EXTRA_BOOTSTRAP_ADDRESSES" + env = "CERAMIC_ONE_EXTRA_CERAMIC_PEER_ADDRESSES" )] - extra_bootstrap_addresses: Vec, + extra_ceramic_peer_addresses: Vec, /// Path to storage directory #[arg(short, long, env = "CERAMIC_ONE_STORE_DIR")] @@ -373,14 +373,13 @@ impl Daemon { max_conns_pending_in: opts.max_conns_pending_in, max_conns_per_peer: opts.max_conns_per_peer, idle_connection_timeout: Duration::from_millis(opts.idle_conns_timeout_ms), - // Add extra bootstrap addresses to the list of official bootstrap addresses, so that our bootstrap nodes - // are always included. - bootstrap_peers: opts + // Add extra ceramic peer addresses to the list of official ceramic peer addresses. + ceramic_peers: opts .network .bootstrap_addresses() .into_iter() .chain( - opts.extra_bootstrap_addresses + opts.extra_ceramic_peer_addresses .iter() .map(|addr| addr.parse()) .collect::, multiaddr::Error>>()?, diff --git a/p2p/src/behaviour.rs b/p2p/src/behaviour.rs index d29c74f1b..9724320f2 100644 --- a/p2p/src/behaviour.rs +++ b/p2p/src/behaviour.rs @@ -17,7 +17,6 @@ use libp2p::{ store::{MemoryStore, MemoryStoreConfig}, }, mdns::tokio::Behaviour as Mdns, - multiaddr::Protocol, ping::Behaviour as Ping, relay, swarm::behaviour::toggle::Toggle, @@ -27,14 +26,14 @@ use libp2p_identity::Keypair; use recon::{libp2p::Recon, Sha256a}; use tracing::{info, warn}; +use self::ceramic_peer_manager::CeramicPeerManager; pub use self::event::Event; -use self::peer_manager::PeerManager; use crate::config::Libp2pConfig; use crate::sqliteblockstore::SQLiteBlockStore; use crate::Metrics; +mod ceramic_peer_manager; mod event; -mod peer_manager; pub const PROTOCOL_VERSION: &str = "ipfs/0.1.0"; pub const AGENT_VERSION: &str = concat!("ceramic-one/", env!("CARGO_PKG_VERSION")); @@ -49,6 +48,7 @@ pub(crate) struct NodeBehaviour { // end up being denied because of the limits. // See https://github.com/libp2p/rust-libp2p/pull/4777#discussion_r1391833734 for more context. limits: connection_limits::Behaviour, + pub(crate) peer_manager: CeramicPeerManager, ping: Ping, identify: identify::Behaviour, pub(crate) bitswap: Toggle>, @@ -59,7 +59,6 @@ pub(crate) struct NodeBehaviour { relay_client: Toggle, dcutr: Toggle, pub(crate) gossipsub: Toggle, - pub(crate) peer_manager: PeerManager, recon: Toggle>, } @@ -158,17 +157,11 @@ where // Provider records are re-published via the [`crate::publisher::Publisher`]. .set_provider_publication_interval(None); - let mut kademlia = kad::Behaviour::with_config(pub_key.to_peer_id(), store, kad_config); - for multiaddr in &config.bootstrap_peers { - // TODO: move parsing into config - let mut addr = multiaddr.to_owned(); - if let Some(Protocol::P2p(peer_id)) = addr.pop() { - kademlia.add_address(&peer_id, addr); - } else { - warn!("Could not parse bootstrap addr {}", multiaddr); - } - } - Some(kademlia) + Some(kad::Behaviour::with_config( + pub_key.to_peer_id(), + store, + kad_config, + )) } else { None } @@ -253,7 +246,7 @@ where dcutr: dcutr.into(), relay_client: relay_client.into(), gossipsub, - peer_manager: PeerManager::new(&config.bootstrap_peers, metrics)?, + peer_manager: CeramicPeerManager::new(&config.ceramic_peers, metrics)?, limits, recon: recon.into(), }) diff --git a/p2p/src/behaviour/peer_manager.rs b/p2p/src/behaviour/ceramic_peer_manager.rs similarity index 66% rename from p2p/src/behaviour/peer_manager.rs rename to p2p/src/behaviour/ceramic_peer_manager.rs index 53a637b9f..e4255fa9b 100644 --- a/p2p/src/behaviour/peer_manager.rs +++ b/p2p/src/behaviour/ceramic_peer_manager.rs @@ -1,8 +1,6 @@ use std::{ fmt::{self, Debug, Formatter}, future, - num::NonZeroUsize, - pin::Pin, task::{Context, Poll}, time::Duration, }; @@ -11,27 +9,33 @@ use ahash::AHashMap; use anyhow::{anyhow, Result}; use backoff::{backoff::Backoff, ExponentialBackoff, ExponentialBackoffBuilder}; #[allow(deprecated)] -use ceramic_metrics::core::MRecorder; -use ceramic_metrics::{inc, p2p::P2PMetrics, Recorder}; -use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt}; -use libp2p::swarm::{dial_opts::DialOpts, ToSwarm}; +use ceramic_metrics::Recorder; +use futures_util::{future::BoxFuture, FutureExt}; +use libp2p::swarm::{ + dial_opts::{DialOpts, PeerCondition}, + ToSwarm, +}; use libp2p::{ identify::Info as IdentifyInfo, multiaddr::Protocol, swarm::{dummy, ConnectionId, DialError, NetworkBehaviour, PollParameters}, Multiaddr, PeerId, }; -use lru::LruCache; use tokio::time; use tracing::{info, warn}; use crate::metrics::{self, Metrics}; -pub struct PeerManager { +/// Manages state for Ceramic peers. +/// Ceramic peers are peers that participate in the Ceramic network. +/// +/// Not all connected peers will be Ceramic peers, for example a peer may be participating in the +/// DHT without being a Ceramic peer. +pub struct CeramicPeerManager { + metrics: Metrics, info: AHashMap, - bad_peers: LruCache, - bootstrap_peer_manager: BootstrapPeerManager, supported_protocols: Vec, + ceramic_peers: AHashMap, } #[derive(Default, Debug, Clone)] @@ -47,29 +51,40 @@ impl Info { } } -const DEFAULT_BAD_PEER_CAP: Option = NonZeroUsize::new(10 * 4096); -const BOOTSTRAP_MIN_DIAL_SECS: Duration = Duration::from_secs(1); // 1 second min between redials -const BOOTSTRAP_MAX_DIAL_SECS: Duration = Duration::from_secs(300); // 5 minutes max between redials -const BOOTSTRAP_DIAL_BACKOFF: f64 = 1.4; -const BOOTSTRAP_DIAL_JITTER: f64 = 0.1; +const PEERING_MIN_DIAL_SECS: Duration = Duration::from_secs(1); // 1 second min between redials +const PEERING_MAX_DIAL_SECS: Duration = Duration::from_secs(300); // 5 minutes max between redials +const PEERING_DIAL_BACKOFF: f64 = 1.4; +const PEERING_DIAL_JITTER: f64 = 0.1; #[derive(Debug)] pub enum PeerManagerEvent {} -impl PeerManager { - pub fn new(bootstrap_peers: &[Multiaddr], metrics: Metrics) -> Result { +impl CeramicPeerManager { + pub fn new(ceramic_peers: &[Multiaddr], metrics: Metrics) -> Result { + let ceramic_peers = ceramic_peers + .iter() + // Extract peer id from multiaddr + .map(|multiaddr| { + if let Some(peer) = multiaddr.iter().find_map(|proto| match proto { + Protocol::P2p(peer_id) => { + Some((peer_id, CeramicPeer::new(multiaddr.to_owned()))) + } + _ => None, + }) { + Ok(peer) + } else { + Err(anyhow!("Could not parse bootstrap addr {}", multiaddr)) + } + }) + .collect::, anyhow::Error>>()?; Ok(Self { + metrics, info: Default::default(), - bad_peers: LruCache::new(DEFAULT_BAD_PEER_CAP.unwrap()), - bootstrap_peer_manager: BootstrapPeerManager::new(bootstrap_peers, metrics)?, supported_protocols: Default::default(), + ceramic_peers, }) } - pub fn is_bad_peer(&self, peer_id: &PeerId) -> bool { - self.bad_peers.contains(peer_id) - } - pub fn inject_identify_info(&mut self, peer_id: PeerId, new_info: IdentifyInfo) { self.info.entry(peer_id).or_default().last_info = Some(new_info); } @@ -85,9 +100,46 @@ impl PeerManager { pub fn supported_protocols(&self) -> Vec { self.supported_protocols.clone() } + + pub fn is_ceramic_peer(&self, peer_id: &PeerId) -> bool { + self.ceramic_peers.contains_key(peer_id) + } + + fn handle_connection_established(&mut self, peer_id: &PeerId) { + if let Some(peer) = self.ceramic_peers.get_mut(peer_id) { + info!( + multiaddr = %peer.multiaddr, + "connection established, stop dialing ceramic peer", + ); + peer.stop_redial(); + self.metrics.record(&metrics::PeeringEvent::Connected); + } + } + + fn handle_connection_closed(&mut self, peer_id: &PeerId) { + if let Some(peer) = self.ceramic_peers.get_mut(peer_id) { + warn!( + multiaddr = %peer.multiaddr, + "Connection closed, redial ceramic peer", + ); + peer.start_redial(); + self.metrics.record(&metrics::PeeringEvent::Disconnected); + } + } + + fn handle_dial_failure(&mut self, peer_id: &PeerId) { + if let Some(peer) = self.ceramic_peers.get_mut(peer_id) { + warn!( + multiaddr = %peer.multiaddr, + "Dail failed, redial ceramic peer" + ); + peer.backoff_redial(); + self.metrics.record(&metrics::PeeringEvent::DialFailure); + } + } } -impl NetworkBehaviour for PeerManager { +impl NetworkBehaviour for CeramicPeerManager { type ConnectionHandler = dummy::ConnectionHandler; type ToSwarm = PeerManagerEvent; @@ -96,12 +148,7 @@ impl NetworkBehaviour for PeerManager { libp2p::swarm::FromSwarm::ConnectionEstablished(event) => { // First connection if event.other_established == 0 { - let p = self.bad_peers.pop(&event.peer_id); - if p.is_some() { - inc!(P2PMetrics::BadPeerRemoved); - } - self.bootstrap_peer_manager - .handle_connection_established(&event.peer_id) + self.handle_connection_established(&event.peer_id) } if let Some(info) = self.info.get_mut(&event.peer_id) { @@ -114,23 +161,21 @@ impl NetworkBehaviour for PeerManager { libp2p::swarm::FromSwarm::ConnectionClosed(event) => { // Last connection if event.remaining_established == 0 { - self.bootstrap_peer_manager - .handle_connection_closed(&event.peer_id) + self.handle_connection_closed(&event.peer_id) } } libp2p::swarm::FromSwarm::DialFailure(event) => { if let Some(peer_id) = event.peer_id { match event.error { - // TODO check that the denied cause is because of a connection limit. - DialError::Denied { cause: _ } | DialError::DialPeerConditionFalse(_) => {} - _ => { - if self.bad_peers.put(peer_id, ()).is_none() { - inc!(P2PMetrics::BadPeer); - } - self.info.remove(&peer_id); + DialError::DialPeerConditionFalse(_) => { + // Ignore dial failures that failed because of a peer condition. + // These are not an indication that something was wrong with the peer + // rather we didn't even attempt to dial the peer because we were + // already connected or attempting to dial concurrently etc. } + // For any other dial failures, increase the backoff + _ => self.handle_dial_failure(&peer_id), } - self.bootstrap_peer_manager.handle_dial_failure(&peer_id) } } // Not interested in any other events @@ -173,14 +218,25 @@ impl NetworkBehaviour for PeerManager { .collect(); } - // Check if a bootstrap peer needs to be dialed - match self.bootstrap_peer_manager.poll_next_unpin(cx) { - // TODO: Maybe we don't want to dial if there was an ongoing incoming dial attempt - Poll::Ready(Some(multiaddr)) => Poll::Ready(ToSwarm::Dial { - opts: DialOpts::unknown_peer_id().address(multiaddr).build(), - }), - _ => Poll::Pending, + for (peer_id, peer) in self.ceramic_peers.iter_mut() { + if let Some(mut dial_future) = peer.dial_future.take() { + match dial_future.as_mut().poll_unpin(cx) { + Poll::Ready(()) => { + return Poll::Ready(ToSwarm::Dial { + opts: DialOpts::peer_id(*peer_id) + .addresses(vec![peer.multiaddr.clone()]) + .condition(PeerCondition::Disconnected) + .build(), + }) + } + Poll::Pending => { + // Put the future back + peer.dial_future.replace(dial_future); + } + } + } } + Poll::Pending } fn handle_established_inbound_connection( @@ -204,26 +260,14 @@ impl NetworkBehaviour for PeerManager { } } -pub struct BootstrapPeerManager { - bootstrap_peers: AHashMap, - metrics: Metrics, -} - -impl Debug for BootstrapPeerManager { - fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> { - f.debug_struct("BootstrapPeerManager") - .field("bootstrap_peers", &self.bootstrap_peers) - .finish() - } -} - -pub struct BootstrapPeer { +// State of Ceramic peer. +struct CeramicPeer { multiaddr: Multiaddr, dial_backoff: ExponentialBackoff, dial_future: Option>, } -impl Debug for BootstrapPeer { +impl Debug for CeramicPeer { fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> { f.debug_struct("BootstrapPeer") .field("multiaddr", &self.multiaddr) @@ -233,89 +277,13 @@ impl Debug for BootstrapPeer { } } -impl BootstrapPeerManager { - fn new(bootstrap_peers: &[Multiaddr], metrics: Metrics) -> Result { - let bootstrap_peers = bootstrap_peers - .iter() - .map(|multiaddr| { - if let Some(peer) = multiaddr.iter().find_map(|proto| match proto { - Protocol::P2p(peer_id) => { - Some((peer_id, BootstrapPeer::new(multiaddr.to_owned()))) - } - _ => None, - }) { - Ok(peer) - } else { - Err(anyhow!("Could not parse bootstrap addr {}", multiaddr)) - } - }) - .collect::, anyhow::Error>>()?; - Ok(Self { - bootstrap_peers, - metrics, - }) - } - - fn handle_connection_established(&mut self, peer_id: &PeerId) { - if let Some(peer) = self.bootstrap_peers.get_mut(peer_id) { - info!( - multiaddr = %peer.multiaddr, - "connection established, stop dialing bootstrap peer", - ); - peer.stop_redial(); - self.metrics.record(&metrics::PeeringEvent::Connected); - } - } - - fn handle_connection_closed(&mut self, peer_id: &PeerId) { - if let Some(peer) = self.bootstrap_peers.get_mut(peer_id) { - warn!( - multiaddr = %peer.multiaddr, - "Connection closed, redial bootstrap peer", - ); - peer.start_redial(); - self.metrics.record(&metrics::PeeringEvent::Disconnected); - } - } - - fn handle_dial_failure(&mut self, peer_id: &PeerId) { - if let Some(peer) = self.bootstrap_peers.get_mut(peer_id) { - warn!( - multiaddr = %peer.multiaddr, - "Dail failed, redial bootstrap peer" - ); - peer.backoff_redial(); - self.metrics.record(&metrics::PeeringEvent::DialFailure); - } - } -} - -impl Stream for BootstrapPeerManager { - type Item = Multiaddr; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - for (_, peer) in self.bootstrap_peers.iter_mut() { - if let Some(mut dial_future) = peer.dial_future.take() { - match dial_future.as_mut().poll_unpin(cx) { - Poll::Ready(()) => return Poll::Ready(Some(peer.multiaddr.clone())), - Poll::Pending => { - // Put the future back - peer.dial_future.replace(dial_future); - } - } - } - } - Poll::Pending - } -} - -impl BootstrapPeer { +impl CeramicPeer { fn new(multiaddr: Multiaddr) -> Self { let dial_backoff = ExponentialBackoffBuilder::new() - .with_initial_interval(BOOTSTRAP_MIN_DIAL_SECS) - .with_multiplier(BOOTSTRAP_DIAL_BACKOFF) - .with_randomization_factor(BOOTSTRAP_DIAL_JITTER) - .with_max_interval(BOOTSTRAP_MAX_DIAL_SECS) + .with_initial_interval(PEERING_MIN_DIAL_SECS) + .with_multiplier(PEERING_DIAL_BACKOFF) + .with_randomization_factor(PEERING_DIAL_JITTER) + .with_max_interval(PEERING_MAX_DIAL_SECS) .with_max_elapsed_time(None) .build(); // Expire initial future so that we dial peers immediately diff --git a/p2p/src/behaviour/event.rs b/p2p/src/behaviour/event.rs index cb8b5eedb..9e98e2b3c 100644 --- a/p2p/src/behaviour/event.rs +++ b/p2p/src/behaviour/event.rs @@ -1,7 +1,7 @@ use iroh_bitswap::BitswapEvent; use libp2p::{autonat, dcutr, gossipsub, identify, kad, mdns, ping, relay}; -use super::peer_manager::PeerManagerEvent; +use super::ceramic_peer_manager::PeerManagerEvent; /// Event type which is emitted from the [`NodeBehaviour`]. /// diff --git a/p2p/src/config.rs b/p2p/src/config.rs index 60669987f..dafd2e58c 100644 --- a/p2p/src/config.rs +++ b/p2p/src/config.rs @@ -36,8 +36,8 @@ pub struct Libp2pConfig { pub external_multiaddrs: Vec, /// Local address. pub listening_multiaddrs: Vec, - /// Bootstrap peer list. - pub bootstrap_peers: Vec, + /// Ceramic peer list. + pub ceramic_peers: Vec, /// Mdns discovery enabled. pub mdns: bool, /// Bitswap server mode enabled. @@ -113,7 +113,7 @@ impl Default for Libp2pConfig { "/ip4/0.0.0.0/tcp/4444".parse().unwrap(), "/ip4/0.0.0.0/udp/4445/quic-v1".parse().unwrap(), ], - bootstrap_peers: vec![], + ceramic_peers: vec![], mdns: false, kademlia: true, autonat: true, diff --git a/p2p/src/node.rs b/p2p/src/node.rs index 3649639e1..ad6fad5b2 100644 --- a/p2p/src/node.rs +++ b/p2p/src/node.rs @@ -553,46 +553,25 @@ where QueryResult::StartProviding(result) => { self.publisher.handle_start_providing_result(result); } - QueryResult::GetProviders(Ok(p)) => { - match p { - GetProvidersOk::FoundProviders { key, providers } => { - let behaviour = self.swarm.behaviour_mut(); - // Filter out bad providers. - let providers: HashSet<_> = providers - .into_iter() - .filter(|provider| { - let is_bad = - behaviour.peer_manager.is_bad_peer(provider); - if is_bad { - inc!(P2PMetrics::SkippedPeerKad); - } - !is_bad - }) - .collect(); - - if let Some(kad) = behaviour.kad.as_mut() { - debug!( - "provider results for {:?} last: {}", - key, step.last - ); - - self.providers.handle_get_providers_ok( - id, step.last, key, providers, kad, - ); - } + QueryResult::GetProviders(Ok(p)) => match p { + GetProvidersOk::FoundProviders { key, providers } => { + let behaviour = self.swarm.behaviour_mut(); + if let Some(kad) = behaviour.kad.as_mut() { + debug!("provider results for {:?} last: {}", key, step.last); + + self.providers.handle_get_providers_ok( + id, step.last, key, providers, kad, + ); } - GetProvidersOk::FinishedWithNoAdditionalRecord { .. } => { - let swarm = self.swarm.behaviour_mut(); - if let Some(kad) = swarm.kad.as_mut() { - debug!( - "FinishedWithNoAdditionalRecord for query {:#?}", - id - ); - self.providers.handle_no_additional_records(id, kad); - } + } + GetProvidersOk::FinishedWithNoAdditionalRecord { .. } => { + let swarm = self.swarm.behaviour_mut(); + if let Some(kad) = swarm.kad.as_mut() { + debug!("FinishedWithNoAdditionalRecord for query {:#?}", id); + self.providers.handle_no_additional_records(id, kad); } } - } + }, QueryResult::GetProviders(Err(error)) => { if let Some(kad) = self.swarm.behaviour_mut().kad.as_mut() { self.providers.handle_get_providers_error(id, error, kad); @@ -703,7 +682,17 @@ where }; for protocol in &info.protocols { - if protocol == &kad::PROTOCOL_NAME { + // Sometimes peers do not report that they support the kademlia protocol. + // Here we assume that all ceramic peers do support the protocol. + // Therefore we add all ceramic peers and any peers that explicitly support + // kademlia to the kademlia routing table. + if self + .swarm + .behaviour() + .peer_manager + .is_ceramic_peer(&peer_id) + || protocol == &kad::PROTOCOL_NAME + { for addr in &info.listen_addrs { if let Some(kad) = self.swarm.behaviour_mut().kad.as_mut() { kad.add_address(&peer_id, addr.clone()); @@ -1356,7 +1345,7 @@ mod tests { } if !self.bootstrap { - network_config.libp2p.bootstrap_peers = vec![]; + network_config.libp2p.ceramic_peers = vec![]; } let keypair = if let Some(seed) = self.seed { Ed25519Keypair::random(seed)