diff --git a/Cargo.lock b/Cargo.lock index 9bc11d5057..38ae1a7b41 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5913,6 +5913,7 @@ dependencies = [ "serde", "serde_json", "sha2", + "sha3", "test-log", "tokio", "tracing", diff --git a/crates/p2p/Cargo.toml b/crates/p2p/Cargo.toml index 83713f3a78..a6bbe3fd0b 100644 --- a/crates/p2p/Cargo.toml +++ b/crates/p2p/Cargo.toml @@ -43,6 +43,7 @@ rayon = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } sha2 = "0.10.7" +sha3 = { workspace = true } tokio = { version = "1.32.0", features = ["macros", "rt-multi-thread", "sync"] } tracing = "0.1.37" tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } diff --git a/crates/p2p/src/behaviour.rs b/crates/p2p/src/behaviour.rs index 6621115998..2ccff35a31 100644 --- a/crates/p2p/src/behaviour.rs +++ b/crates/p2p/src/behaviour.rs @@ -41,6 +41,12 @@ pub fn kademlia_protocol_name(chain_id: ChainId) -> String { pub struct Behaviour { cfg: Config, peers: PeerSet, + swarm: crate::Client, + /// Secret value used in the peer eviction process. + /// + /// This value is used to pick the peer to be evicted in a deterministic, but + /// unpredictable way. + eviction_secret: [u8; 32], inner: Inner, } @@ -332,6 +338,9 @@ impl NetworkBehaviour for Behaviour { if effective_role.is_dialer() { // This really is an outbound connection, and not a connection that requires // hole-punching. + + self.evict_outbound_peer()?; + self.peers.upsert( peer_id, |peer| { @@ -365,6 +374,7 @@ impl Behaviour { pub fn new( identity: &identity::Keypair, chain_id: ChainId, + swarm: crate::Client, cfg: Config, ) -> (Self, relay::client::Transport) { const PROVIDER_PUBLICATION_INTERVAL: Duration = Duration::from_secs(600); @@ -414,6 +424,8 @@ impl Behaviour { Self { peers: PeerSet::new(cfg.eviction_timeout), cfg, + swarm, + eviction_secret: identity.derive_secret(b"eviction").unwrap(), inner: Inner { relay, autonat: autonat::Behaviour::new(peer_id, Default::default()), @@ -468,6 +480,58 @@ impl Behaviour { Ok(()) } + /// Evict an outbound peer if the maximum number of outbound peers has been reached. + fn evict_outbound_peer(&mut self) -> Result<(), ConnectionDenied> { + let mut candidates: Vec<_> = self.outbound_peers().collect(); + + // Only evict a peer if the maximum number of outbound peers has been reached. + if candidates.len() < self.cfg.max_outbound_peers { + return Ok(()); + } + + // Only peers which are flagged as not useful are considered for eviction. + candidates.retain(|(_, peer)| !peer.useful); + + // The peer to be evicted is the one with the highest + candidates.sort_by_key(|(peer_id, _)| { + use sha3::{Digest, Sha3_256}; + let mut hasher = Sha3_256::default(); + hasher.update(self.eviction_secret); + hasher.update(peer_id.to_bytes()); + hasher.finalize() + }); + let (peer_id, _) = candidates.pop().ok_or_else(|| { + ConnectionDenied::new(anyhow!( + "outbound peer limit reached and no peers could be evicted" + )) + })?; + drop(candidates); + self.peers.update(peer_id, |peer| { + peer.connectivity = Connectivity::Disconnecting { + connected_at: peer.connected_at(), + }; + peer.evicted = true; + }); + + // Disconnect the evicted peer. + tokio::spawn({ + let swarm = self.swarm.clone(); + async move { + if let Err(e) = swarm.disconnect(peer_id).await { + tracing::debug!(%peer_id, %e, "Failed to disconnect evicted peer"); + } + } + }); + + Ok(()) + } + + pub fn not_useful(&mut self, peer_id: PeerId) { + self.peers.update(peer_id, |peer| { + peer.useful = false; + }); + } + pub fn kademlia_mut(&mut self) -> &mut kad::Behaviour { &mut self.inner.kademlia } @@ -500,26 +564,26 @@ impl Behaviour { self.peers.iter() } - pub fn num_outbound_peers(&self) -> usize { + /// Outbound peers connected to us. + pub fn outbound_peers(&self) -> impl Iterator { self.peers .iter() - .filter(|(_, peer)| peer.is_outbound()) - .count() + .filter(|(_, peer)| peer.is_connected() && peer.is_outbound()) } - /// Number of inbound non-relayed peers. + /// Inbound non-relayed peers connected to us. fn num_inbound_direct_peers(&self) -> usize { self.peers .iter() - .filter(|(_, peer)| peer.is_inbound() && !peer.is_relayed()) + .filter(|(_, peer)| peer.is_connected() && peer.is_inbound() && !peer.is_relayed()) .count() } - /// Number of inbound relayed peers. + /// Inbound relayed peers connected to us. fn num_inbound_relayed_peers(&self) -> usize { self.peers .iter() - .filter(|(_, peer)| peer.is_inbound() && peer.is_relayed()) + .filter(|(_, peer)| peer.is_connected() && peer.is_inbound() && peer.is_relayed()) .count() } } diff --git a/crates/p2p/src/client/peer_aware.rs b/crates/p2p/src/client/peer_aware.rs index a278d27693..25d0a118d3 100644 --- a/crates/p2p/src/client/peer_aware.rs +++ b/crates/p2p/src/client/peer_aware.rs @@ -179,6 +179,18 @@ impl Client { receiver.await.expect("Sender not to be dropped") } + /// Mark a peer as not useful. + /// + /// These peers will be candidates for outbound peer eviction. + pub async fn not_useful(&self, peer_id: PeerId) { + let (sender, receiver) = oneshot::channel(); + self.sender + .send(Command::NotUseful { peer_id, sender }) + .await + .expect("Command receiver not to be dropped"); + receiver.await.expect("Sender not to be dropped") + } + #[cfg(test)] pub(crate) fn for_test(&self) -> test_utils::Client { test_utils::Client::new(self.sender.clone()) diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index 605c7f55e0..3535b8ea81 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -43,7 +43,11 @@ pub use behaviour::{kademlia_protocol_name, IDENTIFY_PROTOCOL_NAME}; pub fn new(keypair: Keypair, cfg: Config, chain_id: ChainId) -> (Client, EventReceiver, MainLoop) { let local_peer_id = keypair.public().to_peer_id(); - let (behaviour, relay_transport) = behaviour::Behaviour::new(&keypair, chain_id, cfg.clone()); + let (command_sender, command_receiver) = mpsc::channel(1); + let client = Client::new(command_sender, local_peer_id); + + let (behaviour, relay_transport) = + behaviour::Behaviour::new(&keypair, chain_id, client.clone(), cfg.clone()); let swarm = Swarm::new( transport::create(&keypair, relay_transport), @@ -61,11 +65,10 @@ pub fn new(keypair: Keypair, cfg: Config, chain_id: ChainId) -> (Client, EventRe swarm::Config::with_tokio_executor().with_idle_connection_timeout(Duration::MAX), ); - let (command_sender, command_receiver) = mpsc::channel(1); let (event_sender, event_receiver) = mpsc::channel(1); ( - Client::new(command_sender, local_peer_id), + client, event_receiver, MainLoop::new(swarm, command_receiver, event_sender, cfg, chain_id), ) @@ -78,10 +81,12 @@ pub struct Config { pub direct_connection_timeout: Duration, /// A relayed peer can only connect once in this period. pub relay_connection_timeout: Duration, - /// Maximum number of direct (non-relayed) peers. + /// Maximum number of direct (non-relayed) inbound peers. pub max_inbound_direct_peers: usize, - /// Maximum number of relayed peers. + /// Maximum number of relayed inbound peers. pub max_inbound_relayed_peers: usize, + /// Maximum number of outbound peers. + pub max_outbound_peers: usize, /// The minimum number of peers to maintain. If the number of outbound peers drops below this /// number, the node will attempt to connect to more peers. pub low_watermark: usize, @@ -175,6 +180,10 @@ enum Command { new_block: NewBlock, sender: EmptyResultSender, }, + NotUseful { + peer_id: PeerId, + sender: oneshot::Sender<()>, + }, /// For testing purposes only _Test(TestCommand), } diff --git a/crates/p2p/src/main_loop.rs b/crates/p2p/src/main_loop.rs index de0856c509..ab8dc970a4 100644 --- a/crates/p2p/src/main_loop.rs +++ b/crates/p2p/src/main_loop.rs @@ -172,7 +172,7 @@ impl MainLoop { _ => self.ongoing_bootstrap = None, } } - if self.swarm.behaviour_mut().num_outbound_peers() < self.cfg.low_watermark { + if self.swarm.behaviour_mut().outbound_peers().count() < self.cfg.low_watermark { if let Ok(query_id) = self.swarm.behaviour_mut().kademlia_mut().bootstrap() { self.ongoing_bootstrap = Some(query_id); } @@ -857,6 +857,10 @@ impl MainLoop { let result = self.publish_data(topic, &data); let _ = sender.send(result); } + Command::NotUseful { peer_id, sender } => { + self.swarm.behaviour_mut().not_useful(peer_id); + let _ = sender.send(()); + } Command::_Test(command) => self.handle_test_command(command).await, }; } diff --git a/crates/p2p/src/peers.rs b/crates/p2p/src/peers.rs index 9f7fda85e2..01a7f041ef 100644 --- a/crates/p2p/src/peers.rs +++ b/crates/p2p/src/peers.rs @@ -48,7 +48,8 @@ impl Peer { pub fn connected_at(&self) -> Option { match self.connectivity { - Connectivity::Connected { connected_at } => Some(connected_at), + Connectivity::Connected { connected_at, .. } => Some(connected_at), + Connectivity::Disconnecting { connected_at, .. } => connected_at, Connectivity::Disconnected { connected_at, .. } => connected_at, Connectivity::Dialing => None, } @@ -62,6 +63,10 @@ pub enum Connectivity { /// When the peer was connected. connected_at: Instant, }, + Disconnecting { + /// When the peer was connected, if he was connected. + connected_at: Option, + }, Disconnected { /// When the peer was connected, if he was connected. connected_at: Option, diff --git a/crates/p2p/src/tests.rs b/crates/p2p/src/tests.rs index 9c22300324..ccd8d35920 100644 --- a/crates/p2p/src/tests.rs +++ b/crates/p2p/src/tests.rs @@ -36,14 +36,24 @@ impl TestPeer { #[must_use] pub fn new(cfg: Config, keypair: Keypair) -> Self { let peer_id = keypair.public().to_peer_id(); - let (client, event_receiver, main_loop) = + let (client, mut event_receiver, main_loop) = crate::new(keypair.clone(), cfg, ChainId::GOERLI_TESTNET); + + // Ensure that the channel keeps being polled to move the main loop forward. + // Store the polled events into a buffered channel instead. + let (buf_sender, buf_receiver) = tokio::sync::mpsc::channel(1024); + tokio::spawn(async move { + while let Some(event) = event_receiver.recv().await { + buf_sender.send(event).await.unwrap(); + } + }); + let main_loop_jh = tokio::spawn(main_loop.run()); Self { keypair, peer_id, client, - event_receiver, + event_receiver: buf_receiver, main_loop_jh, } } @@ -87,6 +97,7 @@ impl Default for TestPeer { relay_connection_timeout: Duration::from_secs(0), max_inbound_direct_peers: 10, max_inbound_relayed_peers: 10, + max_outbound_peers: 10, low_watermark: 10, ip_whitelist: vec!["::/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()], bootstrap: Default::default(), @@ -244,6 +255,7 @@ async fn periodic_bootstrap() { ip_whitelist: vec!["::1/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()], max_inbound_direct_peers: 10, max_inbound_relayed_peers: 10, + max_outbound_peers: 10, low_watermark: 3, bootstrap: BootstrapConfig { period: BOOTSTRAP_PERIOD, @@ -364,6 +376,7 @@ async fn reconnect_too_quickly() { ip_whitelist: vec!["::1/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()], max_inbound_direct_peers: 10, max_inbound_relayed_peers: 10, + max_outbound_peers: 10, low_watermark: 0, bootstrap: BootstrapConfig { period: Duration::from_millis(500), @@ -464,6 +477,7 @@ async fn duplicate_connection() { ip_whitelist: vec!["::1/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()], max_inbound_direct_peers: 10, max_inbound_relayed_peers: 10, + max_outbound_peers: 10, // Don't open connections automatically. low_watermark: 0, bootstrap: BootstrapConfig { @@ -550,6 +564,7 @@ async fn max_inbound_connections() { ip_whitelist: vec!["::1/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()], max_inbound_direct_peers: 2, max_inbound_relayed_peers: 0, + max_outbound_peers: 10, // Don't open connections automatically. low_watermark: 0, bootstrap: BootstrapConfig { @@ -663,6 +678,123 @@ async fn max_inbound_connections() { peer_4_connection_established.recv().await; } +/// Ensure that outbound peers marked as not useful get evicted if new connections are attempted. +#[test_log::test(tokio::test)] +async fn outbound_peer_eviction() { + const CONNECTION_TIMEOUT: Duration = Duration::from_millis(50); + let cfg = Config { + direct_connection_timeout: CONNECTION_TIMEOUT, + relay_connection_timeout: Duration::from_secs(0), + ip_whitelist: vec!["::1/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()], + max_inbound_direct_peers: 2, + max_inbound_relayed_peers: 0, + max_outbound_peers: 2, + // Don't open connections automatically. + low_watermark: 0, + bootstrap: BootstrapConfig { + period: Duration::from_millis(500), + start_offset: Duration::from_secs(10), + }, + eviction_timeout: Duration::from_secs(15 * 60), + inbound_connections_rate_limit: RateLimit { + max: 1000, + interval: Duration::from_secs(1), + }, + }; + + let mut peer = TestPeer::new(cfg.clone(), Keypair::generate_ed25519()); + let mut outbound1 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519()); + let mut outbound2 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519()); + let mut outbound3 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519()); + let mut outbound4 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519()); + let inbound = TestPeer::new(cfg, Keypair::generate_ed25519()); + + let peer_addr = peer.start_listening().await.unwrap(); + tracing::info!(%peer.peer_id, %peer_addr); + let outbound_addr1 = outbound1.start_listening().await.unwrap(); + tracing::info!(%outbound1.peer_id, %outbound_addr1); + let outbound_addr2 = outbound2.start_listening().await.unwrap(); + tracing::info!(%outbound2.peer_id, %outbound_addr2); + let outbound_addr3 = outbound3.start_listening().await.unwrap(); + tracing::info!(%outbound3.peer_id, %outbound_addr3); + let outbound_addr4 = outbound4.start_listening().await.unwrap(); + tracing::info!(%outbound4.peer_id, %outbound_addr4); + + consume_events(outbound1.event_receiver); + consume_events(outbound2.event_receiver); + consume_events(outbound3.event_receiver); + consume_events(outbound4.event_receiver); + consume_events(inbound.event_receiver); + + // Open one inbound connection. This connection is never touched. + inbound + .client + .dial(peer.peer_id, peer_addr.clone()) + .await + .unwrap(); + + // We can open two connections because the limit is 2. + peer.client + .dial(outbound1.peer_id, outbound_addr1.clone()) + .await + .unwrap(); + peer.client + .dial(outbound2.peer_id, outbound_addr2.clone()) + .await + .unwrap(); + + exhaust_events(&mut peer.event_receiver).await; + + // Trying to open another one fails, because no peers are marked as not useful, and hence no + // peer can be evicted. + let result = peer + .client + .dial(outbound3.peer_id, outbound_addr3.clone()) + .await; + assert!(result.is_err()); + + let peers = peer.connected().await; + assert_eq!(peers.len(), 3); + assert!(peers.contains_key(&outbound1.peer_id)); + assert!(peers.contains_key(&outbound2.peer_id)); + assert!(peers.contains_key(&inbound.peer_id)); + + // Mark one of the connected peers as not useful. + peer.client.not_useful(outbound1.peer_id).await; + + // Now the connection to outbound3 can be opened, because outbound1 is marked as not useful and will be + // evicted. + peer.client + .dial(outbound3.peer_id, outbound_addr3.clone()) + .await + .unwrap(); + + // No longer connected to outbound1. + let peers = peer.connected().await; + assert_eq!(peers.len(), 3); + assert!(!peers.contains_key(&outbound1.peer_id)); + assert!(peers.contains_key(&outbound2.peer_id)); + assert!(peers.contains_key(&outbound3.peer_id)); + assert!(peers.contains_key(&inbound.peer_id)); + + // Ensure that outbound1 actually got disconnected. + wait_for_event(&mut peer.event_receiver, |event| match event { + Event::Test(TestEvent::ConnectionClosed { remote, .. }) if remote == outbound1.peer_id => { + Some(()) + } + _ => None, + }) + .await + .unwrap(); + + // The limit is reached again, so no new connections can be opened. + let result = peer + .client + .dial(outbound4.peer_id, outbound_addr4.clone()) + .await; + assert!(result.is_err()); +} + /// Test that peers can only connect if they are whitelisted. #[test_log::test(tokio::test)] async fn ip_whitelist() { @@ -672,6 +804,7 @@ async fn ip_whitelist() { ip_whitelist: vec!["127.0.0.2/32".parse().unwrap()], max_inbound_direct_peers: 10, max_inbound_relayed_peers: 10, + max_outbound_peers: 10, // Don't open connections automatically. low_watermark: 0, bootstrap: BootstrapConfig { @@ -704,6 +837,7 @@ async fn ip_whitelist() { ip_whitelist: vec!["127.0.0.1/32".parse().unwrap()], max_inbound_direct_peers: 10, max_inbound_relayed_peers: 10, + max_outbound_peers: 10, // Don't open connections automatically. low_watermark: 0, bootstrap: BootstrapConfig { @@ -737,6 +871,7 @@ async fn rate_limit() { ip_whitelist: vec!["::1/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()], max_inbound_direct_peers: 10, max_inbound_relayed_peers: 10, + max_outbound_peers: 10, // Don't open connections automatically. low_watermark: 0, bootstrap: BootstrapConfig { diff --git a/crates/pathfinder/src/bin/pathfinder/config.rs b/crates/pathfinder/src/bin/pathfinder/config.rs index 76c4e9efc2..4e79167bf1 100644 --- a/crates/pathfinder/src/bin/pathfinder/config.rs +++ b/crates/pathfinder/src/bin/pathfinder/config.rs @@ -370,6 +370,15 @@ Example: )] max_inbound_relayed_connections: u32, + #[arg( + long = "p2p.max-inbound-relayed-connections", + long_help = "The maximum number of inbound relayed connections.", + value_name = "MAX_INBOUND_RELAYED_CONNECTIONS", + env = "PATHFINDER_MAX_INBOUND_RELAYED_CONNECTIONS", + default_value = "50" + )] + max_outbound_connections: u32, + #[arg( long = "p2p.low-watermark", long_help = "The minimum number of outbound peers to maintain. If the number of outbound peers drops below this number, the node will attempt to connect to more peers.", @@ -558,6 +567,7 @@ pub struct P2PConfig { pub predefined_peers: Vec, pub max_inbound_direct_connections: usize, pub max_inbound_relayed_connections: usize, + pub max_outbound_connections: usize, pub ip_whitelist: Vec, pub low_watermark: usize, } @@ -646,6 +656,7 @@ impl P2PConfig { .max_inbound_relayed_connections .try_into() .unwrap(), + max_outbound_connections: args.max_outbound_connections.try_into().unwrap(), proxy: args.proxy, identity_config_file: args.identity_config_file, listen_on: args.listen_on, diff --git a/crates/pathfinder/src/bin/pathfinder/main.rs b/crates/pathfinder/src/bin/pathfinder/main.rs index 0416577d8e..ab9b5b3a92 100644 --- a/crates/pathfinder/src/bin/pathfinder/main.rs +++ b/crates/pathfinder/src/bin/pathfinder/main.rs @@ -386,6 +386,7 @@ async fn start_p2p( relay_connection_timeout: Duration::from_secs(10), max_inbound_direct_peers: config.max_inbound_direct_connections, max_inbound_relayed_peers: config.max_inbound_relayed_connections, + max_outbound_peers: config.max_outbound_connections, low_watermark: config.low_watermark, ip_whitelist: config.ip_whitelist, bootstrap: Default::default(),