From a5a208a696bf39f27b9d23a17f1c44ee18d69e0b Mon Sep 17 00:00:00 2001 From: sistemd Date: Thu, 8 Feb 2024 17:41:22 +0100 Subject: [PATCH 1/6] store keyed network group for peers --- crates/p2p/src/behaviour.rs | 135 ++++++++++++++++++++++-------------- crates/p2p/src/lib.rs | 1 + crates/p2p/src/peers.rs | 19 +++++ crates/p2p/src/secret.rs | 22 ++++++ 4 files changed, 126 insertions(+), 51 deletions(-) create mode 100644 crates/p2p/src/secret.rs diff --git a/crates/p2p/src/behaviour.rs b/crates/p2p/src/behaviour.rs index b47126e415..2fc5e4327c 100644 --- a/crates/p2p/src/behaviour.rs +++ b/crates/p2p/src/behaviour.rs @@ -3,10 +3,10 @@ use std::hash::{Hash, Hasher}; use std::net::IpAddr; use std::time::{Duration, Instant}; -use crate::peers::{Connectivity, Direction, Peer}; +use crate::peers::{Connectivity, Direction, KeyedNetworkGroup, Peer}; +use crate::secret::Secret; use crate::sync::codec; use crate::{peers::PeerSet, Config}; -use anyhow::anyhow; use libp2p::core::Endpoint; use libp2p::dcutr; use libp2p::gossipsub::{self, IdentTopic, MessageAuthenticity, MessageId}; @@ -42,11 +42,7 @@ pub struct Behaviour { cfg: Config, peers: PeerSet, swarm: crate::Client, - /// Secret value used in the peer eviction process. - /// - /// This value is used to pick the peer to be evicted in a deterministic, but - /// unpredictable way. - eviction_secret: [u8; 32], + secret: Secret, inner: Inner, } @@ -89,17 +85,16 @@ impl NetworkBehaviour for Behaviour { if is_relayed { if self.num_inbound_relayed_peers() >= self.cfg.max_inbound_relayed_peers { tracing::debug!(%peer, %connection_id, "Too many inbound relay peers, closing"); - return Err(ConnectionDenied::new(anyhow!( - "too many inbound relay peers" - ))); + return Err(ConnectionDenied::new("too many inbound relay peers")); } } else if self.num_inbound_direct_peers() >= self.cfg.max_inbound_direct_peers { tracing::debug!(%peer, %connection_id, "Too many inbound direct peers, closing"); - return Err(ConnectionDenied::new(anyhow!( - "too many inbound direct peers" - ))); + return Err(ConnectionDenied::new("too many inbound direct peers")); } + // Disconnect peers without an IP address. + Self::get_ip(remote_addr)?; + self.inner.handle_established_inbound_connection( connection_id, peer, @@ -117,6 +112,10 @@ impl NetworkBehaviour for Behaviour { ) -> Result, ConnectionDenied> { self.check_duplicate_connection(peer)?; self.prevent_evicted_peer_reconnections(peer)?; + + // Disconnect peers without an IP address. + Self::get_ip(addr)?; + self.inner .handle_established_outbound_connection(connection_id, peer, addr, role_override) } @@ -126,6 +125,42 @@ impl NetworkBehaviour for Behaviour { FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, endpoint, .. }) => { + let direction = if endpoint.is_dialer() { + Direction::Outbound + } else { + Direction::Inbound + }; + + // Disconnect peers without an IP address. + let Ok(peer_ip) = Self::get_ip(endpoint.get_remote_address()) else { + tracing::debug!(%peer_id, "Peer has no IP address, disconnecting"); + self.peers.upsert( + peer_id, + |peer| { + peer.connectivity = Connectivity::Disconnecting { + connected_at: Some(Instant::now()), + }; + }, + || Peer { + connectivity: Connectivity::Disconnecting { + connected_at: Some(Instant::now()), + }, + direction, + addr: None, + keyed_network_group: None, + evicted: false, + useful: true, + }, + ); + let swarm = self.swarm.clone(); + tokio::spawn(async move { + if let Err(err) = swarm.disconnect(peer_id).await { + tracing::debug!(%peer_id, %err, "Failed to disconnect peer"); + } + }); + return; + }; + self.peers.upsert( peer_id, |peer| { @@ -133,17 +168,16 @@ impl NetworkBehaviour for Behaviour { connected_at: Instant::now(), }; peer.addr = Some(endpoint.get_remote_address().clone()); + peer.keyed_network_group = + Some(KeyedNetworkGroup::new(&self.secret, peer_ip)); }, || Peer { connectivity: Connectivity::Connected { connected_at: Instant::now(), }, - direction: if endpoint.is_dialer() { - Direction::Outbound - } else { - Direction::Inbound - }, + direction, addr: Some(endpoint.get_remote_address().clone()), + keyed_network_group: Some(KeyedNetworkGroup::new(&self.secret, peer_ip)), evicted: false, useful: true, }, @@ -172,6 +206,7 @@ impl NetworkBehaviour for Behaviour { }, direction: Direction::Outbound, addr: None, + keyed_network_group: None, evicted: false, useful: true, }, @@ -236,23 +271,11 @@ impl NetworkBehaviour for Behaviour { .count(); if num_connected >= self.cfg.inbound_connections_rate_limit.max { tracing::debug!(%connection_id, %remote_addr, "Too many inbound connections, closing"); - return Err(ConnectionDenied::new(anyhow!( - "too many inbound connections" - ))); + return Err(ConnectionDenied::new("too many inbound connections")); } - // Extract the IP address of the peer from his multiaddr. - let peer_ip = remote_addr.iter().find_map(|p| match p { - Protocol::Ip4(ip) => Some(IpAddr::V4(ip)), - Protocol::Ip6(ip) => Some(IpAddr::V6(ip)), - _ => None, - }); - - // If the peer has no IP address, disconnect. - let Some(peer_ip) = peer_ip else { - tracing::debug!(%connection_id, "Disconnected peer without IP"); - return Err(ConnectionDenied::new(anyhow!("peer without IP"))); - }; + // Extract the peer IP from the multiaddr, or disconnect the peer if he doesn't have one. + let peer_ip = Self::get_ip(remote_addr)?; // If the peer is not in the IP whitelist, disconnect. if !self @@ -262,7 +285,7 @@ impl NetworkBehaviour for Behaviour { .any(|net| net.contains(&peer_ip)) { tracing::debug!(%peer_ip, %connection_id, "Peer not in IP whitelist, disconnecting"); - return Err(ConnectionDenied::new(anyhow!("peer not in IP whitelist"))); + return Err(ConnectionDenied::new("peer not in IP whitelist")); } // Is the peer connecting over a relay? @@ -298,7 +321,7 @@ impl NetworkBehaviour for Behaviour { // reconnect too quickly. Close the connection. if recent_peers.any(|ip| ip == peer_ip) { tracing::debug!(%connection_id, "Peer attempted to reconnect too quickly, closing"); - return Err(ConnectionDenied::new(anyhow!("reconnect too quickly"))); + return Err(ConnectionDenied::new("reconnect too quickly")); } // Attempt to extract peer ID from the multiaddr. @@ -324,15 +347,11 @@ impl NetworkBehaviour for Behaviour { if is_relayed { if self.num_inbound_relayed_peers() >= self.cfg.max_inbound_relayed_peers { tracing::debug!(%connection_id, "Too many inbound relay peers, closing"); - return Err(ConnectionDenied::new(anyhow!( - "too many inbound relay peers" - ))); + return Err(ConnectionDenied::new("too many inbound relay peers")); } } else if self.num_inbound_direct_peers() >= self.cfg.max_inbound_direct_peers { tracing::debug!(%connection_id, "Too many inbound direct peers, closing"); - return Err(ConnectionDenied::new(anyhow!( - "too many inbound direct peers" - ))); + return Err(ConnectionDenied::new("too many inbound direct peers")); } drop(recent_peers); @@ -370,6 +389,7 @@ impl NetworkBehaviour for Behaviour { connectivity: Connectivity::Dialing, direction: Direction::Outbound, addr: None, + keyed_network_group: None, evicted: false, useful: true, }, @@ -440,7 +460,7 @@ impl Behaviour { peers: PeerSet::new(cfg.eviction_timeout), cfg, swarm, - eviction_secret: identity.derive_secret(b"eviction").unwrap(), + secret: Secret::new(identity), inner: Inner { relay, autonat: autonat::Behaviour::new(peer_id, Default::default()), @@ -482,15 +502,16 @@ impl Behaviour { Ok(()) } + /// Only allow one connection per peer. If the peer is already connected, close the new + /// connection. fn check_duplicate_connection(&mut self, peer_id: PeerId) -> Result<(), ConnectionDenied> { - // Only allow one connection per peer. if self .peers .get(peer_id) .map_or(false, |peer| peer.is_connected()) { tracing::debug!(%peer_id, "Peer already connected, closing"); - return Err(ConnectionDenied::new(anyhow!("duplicate connection"))); + return Err(ConnectionDenied::new("duplicate connection")); } Ok(()) } @@ -510,15 +531,13 @@ impl Behaviour { candidates.sort_by_key(|(peer_id, _)| { use sha3::{Digest, Sha3_256}; let mut hasher = Sha3_256::default(); - hasher.update(self.eviction_secret); + self.secret.hash_into(&mut hasher); hasher.update(peer_id.to_bytes()); hasher.finalize() }); let (peer_id, _) = candidates.pop().ok_or_else(|| { tracing::debug!("Outbound peer limit reached, but no peers could be evicted"); - ConnectionDenied::new(anyhow!( - "outbound peer limit reached and no peers could be evicted" - )) + ConnectionDenied::new("outbound peer limit reached and no peers could be evicted") })?; drop(candidates); @@ -559,14 +578,28 @@ impl Behaviour { .. }) if disconnected_at.elapsed() < timeout => { tracing::debug!(%peer_id, "Evicted peer attempting to reconnect too quickly, disconnecting"); - Err(ConnectionDenied::new(anyhow!( - "evicted peer reconnecting too quickly" - ))) + Err(ConnectionDenied::new( + "evicted peer reconnecting too quickly", + )) } _ => Ok(()), } } + /// Get the IP address from a multiaddr, or disconnect the peer if it doesn't have one. + fn get_ip(addr: &Multiaddr) -> Result { + addr.iter() + .find_map(|p| match p { + Protocol::Ip4(ip) => Some(IpAddr::V4(ip)), + Protocol::Ip6(ip) => Some(IpAddr::V6(ip)), + _ => None, + }) + .ok_or_else(|| { + tracing::debug!(%addr, "Peer has no IP address, disconnecting"); + ConnectionDenied::new("peer has no IP") + }) + } + pub fn not_useful(&mut self, peer_id: PeerId) { self.peers.update(peer_id, |peer| { peer.useful = false; diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index 60e08ebabd..ced4cd7c60 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -24,6 +24,7 @@ mod behaviour; pub mod client; mod main_loop; mod peers; +mod secret; mod sync; #[cfg(test)] mod test_utils; diff --git a/crates/p2p/src/peers.rs b/crates/p2p/src/peers.rs index cfa6723924..5aee84d0b0 100644 --- a/crates/p2p/src/peers.rs +++ b/crates/p2p/src/peers.rs @@ -5,12 +5,16 @@ use std::{ }; use libp2p::{multiaddr::Protocol, Multiaddr, PeerId}; +use sha3::{Digest, Sha3_256}; + +use crate::secret::Secret; #[derive(Debug, Clone)] pub struct Peer { pub connectivity: Connectivity, pub direction: Direction, pub addr: Option, + pub keyed_network_group: Option, pub evicted: bool, pub useful: bool, // TODO are we still able to maintain info about peers' sync heads? @@ -140,3 +144,18 @@ impl PeerSet { }) } } + +#[derive(Debug, Clone)] +pub struct KeyedNetworkGroup(pub [u8; 32]); + +impl KeyedNetworkGroup { + pub fn new(secret: &Secret, addr: IpAddr) -> Self { + let mut hasher = Sha3_256::default(); + secret.hash_into(&mut hasher); + match addr { + IpAddr::V4(ip) => hasher.update(&ip.octets()[..2]), + IpAddr::V6(ip) => hasher.update(&ip.octets()[..4]), + } + Self(hasher.finalize().into()) + } +} diff --git a/crates/p2p/src/secret.rs b/crates/p2p/src/secret.rs new file mode 100644 index 0000000000..b3fa5ec139 --- /dev/null +++ b/crates/p2p/src/secret.rs @@ -0,0 +1,22 @@ +use libp2p::identity::Keypair; +use sha3::Digest; +use zeroize::{Zeroize, ZeroizeOnDrop}; + +/// Secret value used to make certain decisions unpredictable. +/// +/// This value is used to pick the peer to be evicted during the peer eviction process. +#[derive(Zeroize, ZeroizeOnDrop)] +pub struct Secret([u8; 32]); + +impl Secret { + pub fn new(identity: &Keypair) -> Self { + Self(identity.derive_secret(b"pathfinder").unwrap()) + } + + /// Updates the given hasher with the secret. + pub fn hash_into(&self, hasher: &mut impl Digest) { + // Intentionally takes a reference to avoid copying the secret. + #[allow(clippy::needless_borrows_for_generic_args)] + hasher.update(&self.0); + } +} From 654961ec0496da8eeb316fa516935a2b87a2c459 Mon Sep 17 00:00:00 2001 From: sistemd Date: Thu, 8 Feb 2024 19:05:04 +0100 Subject: [PATCH 2/6] track min ping time --- crates/p2p/src/behaviour.rs | 21 +++++++++++++++++++++ crates/p2p/src/main_loop.rs | 6 ++++++ crates/p2p/src/peers.rs | 3 +++ 3 files changed, 30 insertions(+) diff --git a/crates/p2p/src/behaviour.rs b/crates/p2p/src/behaviour.rs index 2fc5e4327c..7daddc7010 100644 --- a/crates/p2p/src/behaviour.rs +++ b/crates/p2p/src/behaviour.rs @@ -148,6 +148,7 @@ impl NetworkBehaviour for Behaviour { direction, addr: None, keyed_network_group: None, + min_ping: None, evicted: false, useful: true, }, @@ -178,6 +179,7 @@ impl NetworkBehaviour for Behaviour { direction, addr: Some(endpoint.get_remote_address().clone()), keyed_network_group: Some(KeyedNetworkGroup::new(&self.secret, peer_ip)), + min_ping: None, evicted: false, useful: true, }, @@ -205,6 +207,7 @@ impl NetworkBehaviour for Behaviour { disconnected_at: Instant::now(), }, direction: Direction::Outbound, + min_ping: None, addr: None, keyed_network_group: None, evicted: false, @@ -390,6 +393,7 @@ impl NetworkBehaviour for Behaviour { direction: Direction::Outbound, addr: None, keyed_network_group: None, + min_ping: None, evicted: false, useful: true, }, @@ -502,6 +506,23 @@ impl Behaviour { Ok(()) } + /// Notify the behaviour of a ping event. + pub fn pinged(&mut self, event: ping::Event) { + match event.result { + Ok(duration) => { + self.peers.update(event.peer, |peer| { + peer.min_ping = Some(match peer.min_ping { + Some(min_ping) => min_ping.min(duration), + None => duration, + }); + }); + } + Err(err) => { + tracing::debug!(%err, peer_id = %event.peer, "Ping failed"); + } + } + } + /// Only allow one connection per peer. If the peer is already connected, close the new /// connection. fn check_duplicate_connection(&mut self, peer_id: PeerId) -> Result<(), ConnectionDenied> { diff --git a/crates/p2p/src/main_loop.rs b/crates/p2p/src/main_loop.rs index 5026cc0e38..6d74e6f39b 100644 --- a/crates/p2p/src/main_loop.rs +++ b/crates/p2p/src/main_loop.rs @@ -303,6 +303,12 @@ impl MainLoop { } } // =========================== + // Pings + // =========================== + SwarmEvent::Behaviour(behaviour::Event::Ping(event)) => { + self.swarm.behaviour_mut().pinged(event); + } + // =========================== // Block propagation // =========================== SwarmEvent::Behaviour(behaviour::Event::Gossipsub(gossipsub::Event::Message { diff --git a/crates/p2p/src/peers.rs b/crates/p2p/src/peers.rs index 5aee84d0b0..46982fed92 100644 --- a/crates/p2p/src/peers.rs +++ b/crates/p2p/src/peers.rs @@ -15,6 +15,9 @@ pub struct Peer { pub direction: Direction, pub addr: Option, pub keyed_network_group: Option, + /// All peers send and receive periodic pings. This field holds the smallest ping time from all the + /// pings sent and received from this peer. + pub min_ping: Option, pub evicted: bool, pub useful: bool, // TODO are we still able to maintain info about peers' sync heads? From 6ced82409e7cd836131c955146374a30122984c7 Mon Sep 17 00:00:00 2001 From: sistemd Date: Thu, 8 Feb 2024 20:53:06 +0100 Subject: [PATCH 3/6] inbound peer eviction --- crates/p2p/src/behaviour.rs | 195 +++++++++++++++++++++++++++++++----- crates/p2p/src/peers.rs | 2 +- 2 files changed, 170 insertions(+), 27 deletions(-) diff --git a/crates/p2p/src/behaviour.rs b/crates/p2p/src/behaviour.rs index 7daddc7010..2724a6ec63 100644 --- a/crates/p2p/src/behaviour.rs +++ b/crates/p2p/src/behaviour.rs @@ -1,4 +1,5 @@ use std::collections::hash_map::DefaultHasher; +use std::collections::HashMap; use std::hash::{Hash, Hasher}; use std::net::IpAddr; use std::time::{Duration, Instant}; @@ -30,7 +31,7 @@ use p2p_proto::event::{EventsRequest, EventsResponse}; use p2p_proto::receipt::{ReceiptsRequest, ReceiptsResponse}; use p2p_proto::transaction::{TransactionsRequest, TransactionsResponse}; use pathfinder_common::ChainId; -use std::task; +use std::{cmp, task}; pub const IDENTIFY_PROTOCOL_NAME: &str = "/starknet/id/1.0.0"; @@ -83,13 +84,19 @@ impl NetworkBehaviour for Behaviour { // Limit the number of inbound peer connections. Different limits apply to direct peers // and peers connecting over a relay. if is_relayed { - if self.num_inbound_relayed_peers() >= self.cfg.max_inbound_relayed_peers { - tracing::debug!(%peer, %connection_id, "Too many inbound relay peers, closing"); - return Err(ConnectionDenied::new("too many inbound relay peers")); + if self.inbound_relayed_peers().count() >= self.cfg.max_inbound_relayed_peers { + self.evict_inbound_peer( + self.inbound_relayed_peers() + .map(|(peer_id, peer)| (peer_id, peer.clone())) + .collect(), + )?; } - } else if self.num_inbound_direct_peers() >= self.cfg.max_inbound_direct_peers { - tracing::debug!(%peer, %connection_id, "Too many inbound direct peers, closing"); - return Err(ConnectionDenied::new("too many inbound direct peers")); + } else if self.inbound_direct_peers().count() >= self.cfg.max_inbound_direct_peers { + self.evict_inbound_peer( + self.inbound_direct_peers() + .map(|(peer_id, peer)| (peer_id, peer.clone())) + .collect(), + )?; } // Disconnect peers without an IP address. @@ -338,6 +345,8 @@ impl NetworkBehaviour for Behaviour { self.prevent_evicted_peer_reconnections(peer_id)?; } + drop(recent_peers); + // Limit the number of inbound peer connections. Different limits apply to direct peers // and peers connecting over a relay. // @@ -348,16 +357,21 @@ impl NetworkBehaviour for Behaviour { // The check must be repeated when the connection is established due to race conditions, // since multiple peers may be attempting to connect at the same time. if is_relayed { - if self.num_inbound_relayed_peers() >= self.cfg.max_inbound_relayed_peers { - tracing::debug!(%connection_id, "Too many inbound relay peers, closing"); - return Err(ConnectionDenied::new("too many inbound relay peers")); + if self.inbound_relayed_peers().count() >= self.cfg.max_inbound_relayed_peers { + self.evict_inbound_peer( + self.inbound_relayed_peers() + .map(|(peer_id, peer)| (peer_id, peer.clone())) + .collect(), + )?; } - } else if self.num_inbound_direct_peers() >= self.cfg.max_inbound_direct_peers { - tracing::debug!(%connection_id, "Too many inbound direct peers, closing"); - return Err(ConnectionDenied::new("too many inbound direct peers")); + } else if self.inbound_direct_peers().count() >= self.cfg.max_inbound_direct_peers { + self.evict_inbound_peer( + self.inbound_direct_peers() + .map(|(peer_id, peer)| (peer_id, peer.clone())) + .collect(), + )?; } - drop(recent_peers); self.inner .handle_pending_inbound_connection(connection_id, local_addr, remote_addr) } @@ -376,7 +390,9 @@ impl NetworkBehaviour for Behaviour { self.prevent_evicted_peer_reconnections(peer_id)?; - self.evict_outbound_peer()?; + if self.outbound_peers().count() >= self.cfg.max_outbound_peers { + self.evict_outbound_peer()?; + } self.peers.upsert( peer_id, @@ -537,12 +553,12 @@ impl Behaviour { Ok(()) } - /// Evict an outbound peer if the maximum number of outbound peers has been reached. + /// Evict an outbound peer to make space for a new outbound connection. + /// + /// Only peers which are flagged as not useful are considered for eviction. + /// If there are no such peers, the incoming connection gets denied. fn evict_outbound_peer(&mut self) -> Result<(), ConnectionDenied> { let mut candidates: Vec<_> = self.outbound_peers().collect(); - if candidates.len() < self.cfg.max_outbound_peers { - return Ok(()); - } // Only peers which are flagged as not useful are considered for eviction. candidates.retain(|(_, peer)| !peer.useful); @@ -557,7 +573,9 @@ impl Behaviour { hasher.finalize() }); let (peer_id, _) = candidates.pop().ok_or_else(|| { - tracing::debug!("Outbound peer limit reached, but no peers could be evicted"); + tracing::debug!( + "Outbound peer limit reached, but no peers could be evicted, disconnecting" + ); ConnectionDenied::new("outbound peer limit reached and no peers could be evicted") })?; drop(candidates); @@ -582,6 +600,133 @@ impl Behaviour { Ok(()) } + /// Disconnect an inbound peer to make space for a new inbound connection. + /// + /// This method is written with the goal of not allowing an attacker to + /// control which peers are evicted, so that the attacker cannot eclipse + /// our node. + /// + /// If no peer can be evicted, the incoming connection gets denied. + fn evict_inbound_peer( + &mut self, + mut candidates: HashMap, + ) -> Result<(), ConnectionDenied> { + // Group the peers by the keyed network group, and pick 4 groups + // with the smallest value (which is deterministic, but unpredictable + // by the attacker). Pick one peer from each group, and protect that + // peer from eviction. For the attacker to circumvent this step, he + // would need to be able to allocate very specific IPs, and he would + // need to be able to predict which prefixes we are going to protect, + // which is impossible. The goal is to ensure we are connected to a + // diverse set of IP addresses. + + // Group the peers by keyed network group. + let mut grouped = HashMap::>::new(); + for (&peer_id, peer) in candidates.iter() { + grouped + .entry(peer.keyed_network_group.expect("peer is connected")) + .or_default() + .push(peer_id); + } + let grouped = grouped; + + // Pick the peers to protect. + let mut sorted: Vec<_> = grouped.iter().collect(); + sorted.sort_by_key(|&(group, _)| group); + for (_, peers) in sorted.iter().take(4) { + // Pick the peer with the smallest SHA3(eviction_secret || peer_id) value and protect + // it from eviction. This is deterministic but unpredictable by any outside observer. + if let Some(peer_id) = peers.iter().min_by_key(|peer_id| { + use sha3::{Digest, Sha3_256}; + let mut hasher = Sha3_256::default(); + self.secret.hash_into(&mut hasher); + hasher.update(peer_id.to_bytes()); + hasher.finalize() + }) { + candidates.remove(peer_id); + } + } + + // Protect 8 peers with the lowest minimum ping time. To circumvent this + // step, the attacker would have to be able to run nodes that are + // geographically closer to us than these peers, which is difficult to do. + let mut ping_times: Vec<_> = candidates + .iter() + .filter_map(|(&peer_id, peer)| peer.min_ping.map(|ping| (peer_id, ping))) + .collect(); + ping_times.sort_by_key(|&(_, ping)| ping); + for (peer_id, _) in ping_times.iter().take(8) { + candidates.remove(peer_id); + } + + // TODO Move this into a separate issue and link it here. + // TODO Save 4 nodes that have most recently gossiped valid transactions, + // and 8 nodes that have most recently gossiped a valid new head (or any + // other block if we are still syncing). + + // Of the remaining nodes, protect half of them which have been connected + // for the longest time. + let mut connected_at: Vec<_> = candidates + .iter() + .map(|(&peer_id, peer)| (peer_id, peer.connected_at().expect("peer is connected"))) + .collect(); + connected_at.sort_by_key(|&(_, connected_at)| cmp::Reverse(connected_at)); + for (peer_id, _) in connected_at.iter().take(candidates.len() / 2) { + candidates.remove(peer_id); + } + + // Finally, evict the youngest peer in the most populous group. + // This is achieved by sorting all the nodes by a) total number + // of connected nodes which share their keyed network, breaking + // ties with b) keyed network group (in reverse order, since we + // use regular order in the first step of the eviction algorithm), + // breaking ties with c) connection time. Evict the first peer + // after sorting. + let mut candidates: Vec<_> = candidates.into_iter().collect(); + candidates.sort_by(|(_, a), (_, b)| { + match grouped[&a.keyed_network_group.expect("peer is connected")] + .len() + .cmp(&grouped[&b.keyed_network_group.expect("peer is connected")].len()) + { + cmp::Ordering::Equal => match cmp::Reverse(a.keyed_network_group) + .cmp(&cmp::Reverse(b.keyed_network_group)) + { + cmp::Ordering::Equal => a + .connected_at() + .expect("peer is connected") + .cmp(&b.connected_at().expect("peer is connected")), + other => other, + }, + other => other, + } + }); + let (peer_id, _) = candidates.into_iter().next().ok_or_else(|| { + tracing::debug!( + "Inbound peer limit reached, but no peers could be evicted, disconnecting" + ); + ConnectionDenied::new("inbound peer limit reached and no peers could be evicted") + })?; + + // Disconnect the evicted peer. + tracing::debug!(%peer_id, "Evicting inbound peer"); + self.peers.update(peer_id, |peer| { + peer.connectivity = Connectivity::Disconnecting { + connected_at: peer.connected_at(), + }; + peer.evicted = true; + }); + tokio::spawn({ + let swarm = self.swarm.clone(); + async move { + if let Err(e) = swarm.disconnect(peer_id).await { + tracing::debug!(%peer_id, %e, "Failed to disconnect evicted peer"); + } + } + }); + + Ok(()) + } + /// Prevent evicted peers from reconnecting too quickly. fn prevent_evicted_peer_reconnections(&self, peer_id: PeerId) -> Result<(), ConnectionDenied> { let timeout = if cfg!(test) { @@ -666,20 +811,18 @@ impl Behaviour { .filter(|(_, peer)| peer.is_connected() && peer.is_outbound()) } - /// Number of inbound non-relayed peers. - fn num_inbound_direct_peers(&self) -> usize { + /// Inbound non-relayed peers connected to us. + fn inbound_direct_peers(&self) -> impl Iterator { self.peers .iter() .filter(|(_, peer)| peer.is_connected() && peer.is_inbound() && !peer.is_relayed()) - .count() } - /// Number of inbound relayed peers. - fn num_inbound_relayed_peers(&self) -> usize { + /// Inbound relayed peers connected to us. + fn inbound_relayed_peers(&self) -> impl Iterator { self.peers .iter() .filter(|(_, peer)| peer.is_connected() && peer.is_inbound() && peer.is_relayed()) - .count() } } diff --git a/crates/p2p/src/peers.rs b/crates/p2p/src/peers.rs index 46982fed92..e027a053bd 100644 --- a/crates/p2p/src/peers.rs +++ b/crates/p2p/src/peers.rs @@ -148,7 +148,7 @@ impl PeerSet { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct KeyedNetworkGroup(pub [u8; 32]); impl KeyedNetworkGroup { From 077509af08ec619d6319cb09051b240ca499c2ba Mon Sep 17 00:00:00 2001 From: sistemd Date: Thu, 8 Feb 2024 21:30:18 +0100 Subject: [PATCH 4/6] test --- crates/p2p/src/tests.rs | 91 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 88 insertions(+), 3 deletions(-) diff --git a/crates/p2p/src/tests.rs b/crates/p2p/src/tests.rs index 9b046f1a79..3fc26e74a9 100644 --- a/crates/p2p/src/tests.rs +++ b/crates/p2p/src/tests.rs @@ -703,13 +703,12 @@ async fn max_inbound_connections() { peer_4_connection_established.recv().await; } -/// Ensure that outbound peers marked as not useful get evicted if new outbound connections +/// Ensure that outbound peers marked as not useful get evicted if new inbound connections /// are attempted. #[test_log::test(tokio::test)] async fn outbound_peer_eviction() { - const CONNECTION_TIMEOUT: Duration = Duration::from_millis(50); let cfg = Config { - direct_connection_timeout: CONNECTION_TIMEOUT, + direct_connection_timeout: Duration::from_secs(0), relay_connection_timeout: Duration::from_secs(0), ip_whitelist: vec!["::1/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()], max_inbound_direct_peers: 2, @@ -833,6 +832,92 @@ async fn outbound_peer_eviction() { assert!(peers.contains_key(&inbound2.peer_id)); } +/// Ensure that inbound peers get evicted if new outbound connections +/// are attempted. +#[test_log::test(tokio::test)] +async fn inbound_peer_eviction() { + let cfg = Config { + direct_connection_timeout: Duration::from_secs(0), + relay_connection_timeout: Duration::from_secs(0), + ip_whitelist: vec!["::1/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()], + max_inbound_direct_peers: 25, + max_inbound_relayed_peers: 0, + max_outbound_peers: 100, + // Don't open connections automatically. + low_watermark: 0, + bootstrap: BootstrapConfig { + period: Duration::from_millis(500), + start_offset: Duration::from_secs(10), + }, + eviction_timeout: Duration::from_secs(15 * 60), + inbound_connections_rate_limit: RateLimit { + max: 1000, + interval: Duration::from_secs(1), + }, + }; + + let mut peer = TestPeer::new(cfg.clone(), Keypair::generate_ed25519()); + let inbound_peers = (0..26) + .map(|_| TestPeer::new(cfg.clone(), Keypair::generate_ed25519())) + .collect::>(); + let mut outbound1 = TestPeer::new(cfg, Keypair::generate_ed25519()); + + let peer_addr = peer.start_listening().await.unwrap(); + tracing::info!(%peer.peer_id, %peer_addr); + let outbound_addr1 = outbound1.start_listening().await.unwrap(); + tracing::info!(%outbound1.peer_id, %outbound_addr1); + + // Open one outbound connection. This connection is never touched. + peer.client + .dial(outbound1.peer_id, outbound_addr1.clone()) + .await + .unwrap(); + + // We can open 25 connections because the limit is 25. + for inbound_peer in inbound_peers.iter().take(25) { + inbound_peer + .client + .dial(peer.peer_id, peer_addr.clone()) + .await + .unwrap(); + } + + let connected = peer.connected().await; + // 25 inbound and 1 outbound peer. + assert_eq!(connected.len(), 26); + assert!(connected.contains_key(&outbound1.peer_id)); + + exhaust_events(&mut peer.event_receiver).await; + + // Trying to open another one causes an eviction. + inbound_peers + .last() + .unwrap() + .client + .dial(peer.peer_id, peer_addr.clone()) + .await + .unwrap(); + + // Ensure that a peer got disconnected. + let disconnected = wait_for_event(&mut peer.event_receiver, |event| match event { + Event::Test(TestEvent::ConnectionClosed { remote, .. }) + if inbound_peers.iter().take(25).any(|p| p.peer_id == remote) => + { + Some(remote) + } + _ => None, + }) + .await + .unwrap(); + + let connected = peer.connected().await; + // 25 inbound and 1 outbound peer. + assert_eq!(connected.len(), 26); + assert!(!connected.contains_key(&disconnected)); + assert!(connected.contains_key(&inbound_peers.last().unwrap().peer_id)); + assert!(connected.contains_key(&outbound1.peer_id)); +} + /// Ensure that evicted peers can't reconnect too quickly. #[test_log::test(tokio::test)] async fn evicted_peer_reconnection() { From e802064d4fd04abf183584b9cbc496f8b4c2f952 Mon Sep 17 00:00:00 2001 From: sistemd Date: Fri, 9 Feb 2024 11:17:52 +0100 Subject: [PATCH 5/6] touchups --- crates/p2p/src/behaviour.rs | 21 ++++++++++----------- crates/p2p/src/peers.rs | 6 ++++++ crates/p2p/src/tests.rs | 4 ++-- 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/crates/p2p/src/behaviour.rs b/crates/p2p/src/behaviour.rs index 2724a6ec63..e14099bdd5 100644 --- a/crates/p2p/src/behaviour.rs +++ b/crates/p2p/src/behaviour.rs @@ -75,6 +75,9 @@ impl NetworkBehaviour for Behaviour { local_addr: &Multiaddr, remote_addr: &Multiaddr, ) -> Result, ConnectionDenied> { + // Disconnect peers without an IP address. + Self::get_ip(remote_addr)?; + self.check_duplicate_connection(peer)?; self.prevent_evicted_peer_reconnections(peer)?; @@ -99,9 +102,6 @@ impl NetworkBehaviour for Behaviour { )?; } - // Disconnect peers without an IP address. - Self::get_ip(remote_addr)?; - self.inner.handle_established_inbound_connection( connection_id, peer, @@ -117,12 +117,12 @@ impl NetworkBehaviour for Behaviour { addr: &Multiaddr, role_override: Endpoint, ) -> Result, ConnectionDenied> { - self.check_duplicate_connection(peer)?; - self.prevent_evicted_peer_reconnections(peer)?; - // Disconnect peers without an IP address. Self::get_ip(addr)?; + self.check_duplicate_connection(peer)?; + self.prevent_evicted_peer_reconnections(peer)?; + self.inner .handle_established_outbound_connection(connection_id, peer, addr, role_override) } @@ -556,7 +556,7 @@ impl Behaviour { /// Evict an outbound peer to make space for a new outbound connection. /// /// Only peers which are flagged as not useful are considered for eviction. - /// If there are no such peers, the incoming connection gets denied. + /// If there are no such peers, the outgoing connection gets denied. fn evict_outbound_peer(&mut self) -> Result<(), ConnectionDenied> { let mut candidates: Vec<_> = self.outbound_peers().collect(); @@ -659,8 +659,7 @@ impl Behaviour { candidates.remove(peer_id); } - // TODO Move this into a separate issue and link it here. - // TODO Save 4 nodes that have most recently gossiped valid transactions, + // TODO #1754: Save 4 nodes that have most recently gossiped valid transactions, // and 8 nodes that have most recently gossiped a valid new head (or any // other block if we are still syncing). @@ -676,8 +675,8 @@ impl Behaviour { } // Finally, evict the youngest peer in the most populous group. - // This is achieved by sorting all the nodes by a) total number - // of connected nodes which share their keyed network, breaking + // This is achieved by sorting all the peers by a) total number + // of connected peers which share their keyed network, breaking // ties with b) keyed network group (in reverse order, since we // use regular order in the first step of the eviction algorithm), // breaking ties with c) connection time. Evict the first peer diff --git a/crates/p2p/src/peers.rs b/crates/p2p/src/peers.rs index e027a053bd..1e878fe50b 100644 --- a/crates/p2p/src/peers.rs +++ b/crates/p2p/src/peers.rs @@ -148,6 +148,12 @@ impl PeerSet { } } +/// A network group that is keyed by a secret, calculated as SHA3(secret || 16 bit prefix for IPv4 +/// or 32 bit prefix for IPv6 addresses). +/// +/// For a given secret and IP address, the network group is deterministic, but unpredictable +/// by the attacker. The keyed network group is used to ensure that our node is connected to a +/// diverse set of IP addresses. #[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct KeyedNetworkGroup(pub [u8; 32]); diff --git a/crates/p2p/src/tests.rs b/crates/p2p/src/tests.rs index 3fc26e74a9..1a5b6056c0 100644 --- a/crates/p2p/src/tests.rs +++ b/crates/p2p/src/tests.rs @@ -703,7 +703,7 @@ async fn max_inbound_connections() { peer_4_connection_established.recv().await; } -/// Ensure that outbound peers marked as not useful get evicted if new inbound connections +/// Ensure that outbound peers marked as not useful get evicted if new outbound connections /// are attempted. #[test_log::test(tokio::test)] async fn outbound_peer_eviction() { @@ -832,7 +832,7 @@ async fn outbound_peer_eviction() { assert!(peers.contains_key(&inbound2.peer_id)); } -/// Ensure that inbound peers get evicted if new outbound connections +/// Ensure that inbound peers get evicted if new inbound connections /// are attempted. #[test_log::test(tokio::test)] async fn inbound_peer_eviction() { From 7a60c8fd5c7e8b37b7b3e9e5f782889657e21da4 Mon Sep 17 00:00:00 2001 From: sistemd Date: Mon, 12 Feb 2024 10:40:30 +0100 Subject: [PATCH 6/6] syntactical improvements --- crates/p2p/src/behaviour.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/crates/p2p/src/behaviour.rs b/crates/p2p/src/behaviour.rs index e14099bdd5..62467ae764 100644 --- a/crates/p2p/src/behaviour.rs +++ b/crates/p2p/src/behaviour.rs @@ -572,12 +572,14 @@ impl Behaviour { hasher.update(peer_id.to_bytes()); hasher.finalize() }); - let (peer_id, _) = candidates.pop().ok_or_else(|| { + let Some((peer_id, _)) = candidates.pop() else { tracing::debug!( "Outbound peer limit reached, but no peers could be evicted, disconnecting" ); - ConnectionDenied::new("outbound peer limit reached and no peers could be evicted") - })?; + return Err(ConnectionDenied::new( + "outbound peer limit reached and no peers could be evicted", + )); + }; drop(candidates); // Disconnect the evicted peer. @@ -699,12 +701,14 @@ impl Behaviour { other => other, } }); - let (peer_id, _) = candidates.into_iter().next().ok_or_else(|| { + let Some((peer_id, _)) = candidates.into_iter().next() else { tracing::debug!( "Inbound peer limit reached, but no peers could be evicted, disconnecting" ); - ConnectionDenied::new("inbound peer limit reached and no peers could be evicted") - })?; + return Err(ConnectionDenied::new( + "inbound peer limit reached and no peers could be evicted", + )); + }; // Disconnect the evicted peer. tracing::debug!(%peer_id, "Evicting inbound peer");