From 8f8eab0ea003d02f844905bbb120186713c9fe45 Mon Sep 17 00:00:00 2001 From: sistemd Date: Tue, 6 Feb 2024 11:30:39 +0100 Subject: [PATCH] inbound connections rate limiting --- crates/p2p/src/behaviour.rs | 22 +++- crates/p2p/src/lib.rs | 7 ++ crates/p2p/src/tests.rs | 102 +++++++++++++++++-- crates/pathfinder/src/bin/pathfinder/main.rs | 4 + 4 files changed, 122 insertions(+), 13 deletions(-) diff --git a/crates/p2p/src/behaviour.rs b/crates/p2p/src/behaviour.rs index 66bc10aa4a..6621115998 100644 --- a/crates/p2p/src/behaviour.rs +++ b/crates/p2p/src/behaviour.rs @@ -81,13 +81,13 @@ impl NetworkBehaviour for Behaviour { // and peers connecting over a relay. if is_relayed { if self.num_inbound_relayed_peers() >= self.cfg.max_inbound_relayed_peers { - tracing::debug!(%connection_id, "Too many inbound relay peers, closing"); + tracing::debug!(%peer, %connection_id, "Too many inbound relay peers, closing"); return Err(ConnectionDenied::new(anyhow!( "too many inbound relay peers" ))); } } else if self.num_inbound_direct_peers() >= self.cfg.max_inbound_direct_peers { - tracing::debug!(%connection_id, "Too many inbound direct peers, closing"); + tracing::debug!(%peer, %connection_id, "Too many inbound direct peers, closing"); return Err(ConnectionDenied::new(anyhow!( "too many inbound direct peers" ))); @@ -217,6 +217,22 @@ impl NetworkBehaviour for Behaviour { local_addr: &Multiaddr, remote_addr: &Multiaddr, ) -> Result<(), ConnectionDenied> { + // Apply rate limiting to inbound connections. + let rate_limit_interval = + Instant::now() - self.cfg.inbound_connections_rate_limit.interval..Instant::now(); + let num_connected = self + .peers() + .filter(|(_, peer)| peer.is_inbound()) + .filter_map(|(_, peer)| peer.connected_at()) + .filter(|t| rate_limit_interval.contains(t)) + .count(); + if num_connected >= self.cfg.inbound_connections_rate_limit.max { + tracing::debug!(%connection_id, %remote_addr, "Too many inbound connections, closing"); + return Err(ConnectionDenied::new(anyhow!( + "too many inbound connections" + ))); + } + // Extract the IP address of the peer from his multiaddr. let peer_ip = remote_addr.iter().find_map(|p| match p { Protocol::Ip4(ip) => Some(IpAddr::V4(ip)), @@ -237,7 +253,7 @@ impl NetworkBehaviour for Behaviour { .iter() .any(|net| net.contains(&peer_ip)) { - tracing::debug!(%peer_ip, %connection_id, "Disconnected peer not in IP whitelist"); + tracing::debug!(%peer_ip, %connection_id, "Peer not in IP whitelist, disconnecting"); return Err(ConnectionDenied::new(anyhow!("peer not in IP whitelist"))); } diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index 85569f6b84..605c7f55e0 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -89,6 +89,13 @@ pub struct Config { pub eviction_timeout: Duration, pub ip_whitelist: Vec, pub bootstrap: BootstrapConfig, + pub inbound_connections_rate_limit: RateLimit, +} + +#[derive(Debug, Clone)] +pub struct RateLimit { + pub max: usize, + pub interval: Duration, } #[derive(Copy, Clone, Debug)] diff --git a/crates/p2p/src/tests.rs b/crates/p2p/src/tests.rs index da87bdde5f..9c22300324 100644 --- a/crates/p2p/src/tests.rs +++ b/crates/p2p/src/tests.rs @@ -20,7 +20,7 @@ use rstest::rstest; use tokio::task::JoinHandle; use crate::peers::Peer; -use crate::{BootstrapConfig, Config, Event, EventReceiver, TestEvent}; +use crate::{BootstrapConfig, Config, Event, EventReceiver, RateLimit, TestEvent}; #[allow(dead_code)] #[derive(Debug)] @@ -83,14 +83,18 @@ impl Default for TestPeer { fn default() -> Self { Self::new( Config { - direct_connection_timeout: Duration::from_secs(30), - relay_connection_timeout: Duration::from_secs(10), + direct_connection_timeout: Duration::from_secs(0), + relay_connection_timeout: Duration::from_secs(0), max_inbound_direct_peers: 10, max_inbound_relayed_peers: 10, low_watermark: 10, ip_whitelist: vec!["::/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()], bootstrap: Default::default(), eviction_timeout: Duration::from_secs(15 * 60), + inbound_connections_rate_limit: RateLimit { + max: 1000, + interval: Duration::from_secs(1), + }, }, Keypair::generate_ed25519(), ) @@ -246,6 +250,10 @@ async fn periodic_bootstrap() { start_offset: Duration::from_secs(1), }, eviction_timeout: Duration::from_secs(15 * 60), + inbound_connections_rate_limit: RateLimit { + max: 1000, + interval: Duration::from_secs(1), + }, }; let mut boot = TestPeer::new(cfg.clone(), Keypair::generate_ed25519()); let mut peer1 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519()); @@ -352,7 +360,7 @@ async fn reconnect_too_quickly() { const CONNECTION_TIMEOUT: Duration = Duration::from_secs(1); let cfg = Config { direct_connection_timeout: CONNECTION_TIMEOUT, - relay_connection_timeout: Duration::from_millis(500), + relay_connection_timeout: Duration::from_secs(0), ip_whitelist: vec!["::1/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()], max_inbound_direct_peers: 10, max_inbound_relayed_peers: 10, @@ -362,6 +370,10 @@ async fn reconnect_too_quickly() { start_offset: Duration::from_secs(10), }, eviction_timeout: Duration::from_secs(15 * 60), + inbound_connections_rate_limit: RateLimit { + max: 1000, + interval: Duration::from_secs(1), + }, }; let mut peer1 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519()); @@ -448,7 +460,7 @@ async fn duplicate_connection() { const CONNECTION_TIMEOUT: Duration = Duration::from_millis(50); let cfg = Config { direct_connection_timeout: CONNECTION_TIMEOUT, - relay_connection_timeout: Duration::from_millis(500), + relay_connection_timeout: Duration::from_secs(0), ip_whitelist: vec!["::1/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()], max_inbound_direct_peers: 10, max_inbound_relayed_peers: 10, @@ -459,6 +471,10 @@ async fn duplicate_connection() { start_offset: Duration::from_secs(10), }, eviction_timeout: Duration::from_secs(15 * 60), + inbound_connections_rate_limit: RateLimit { + max: 1000, + interval: Duration::from_secs(1), + }, }; let keypair = Keypair::generate_ed25519(); let mut peer1 = TestPeer::new(cfg.clone(), keypair.clone()); @@ -530,7 +546,7 @@ async fn max_inbound_connections() { const CONNECTION_TIMEOUT: Duration = Duration::from_millis(50); let cfg = Config { direct_connection_timeout: CONNECTION_TIMEOUT, - relay_connection_timeout: Duration::from_millis(500), + relay_connection_timeout: Duration::from_secs(0), ip_whitelist: vec!["::1/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()], max_inbound_direct_peers: 2, max_inbound_relayed_peers: 0, @@ -541,6 +557,10 @@ async fn max_inbound_connections() { start_offset: Duration::from_secs(10), }, eviction_timeout: Duration::from_secs(15 * 60), + inbound_connections_rate_limit: RateLimit { + max: 1000, + interval: Duration::from_secs(1), + }, }; let mut peer1 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519()); let mut peer2 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519()); @@ -647,8 +667,8 @@ async fn max_inbound_connections() { #[test_log::test(tokio::test)] async fn ip_whitelist() { let cfg = Config { - direct_connection_timeout: Duration::from_millis(50), - relay_connection_timeout: Duration::from_millis(50), + direct_connection_timeout: Duration::from_secs(0), + relay_connection_timeout: Duration::from_secs(0), ip_whitelist: vec!["127.0.0.2/32".parse().unwrap()], max_inbound_direct_peers: 10, max_inbound_relayed_peers: 10, @@ -659,6 +679,10 @@ async fn ip_whitelist() { start_offset: Duration::from_secs(10), }, eviction_timeout: Duration::from_secs(15 * 60), + inbound_connections_rate_limit: RateLimit { + max: 1000, + interval: Duration::from_secs(1), + }, }; let mut peer1 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519()); let peer2 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519()); @@ -675,8 +699,8 @@ async fn ip_whitelist() { // Start another peer accepting connections from 127.0.0.1. let cfg = Config { - direct_connection_timeout: Duration::from_millis(50), - relay_connection_timeout: Duration::from_millis(50), + direct_connection_timeout: Duration::from_secs(0), + relay_connection_timeout: Duration::from_secs(0), ip_whitelist: vec!["127.0.0.1/32".parse().unwrap()], max_inbound_direct_peers: 10, max_inbound_relayed_peers: 10, @@ -687,6 +711,10 @@ async fn ip_whitelist() { start_offset: Duration::from_secs(10), }, eviction_timeout: Duration::from_secs(15 * 60), + inbound_connections_rate_limit: RateLimit { + max: 1000, + interval: Duration::from_secs(1), + }, }; let mut peer3 = TestPeer::new(cfg, Keypair::generate_ed25519()); @@ -698,6 +726,60 @@ async fn ip_whitelist() { assert!(result.is_ok()); } +/// Check that inbound connections get rate limited. +#[test_log::test(tokio::test)] +async fn rate_limit() { + const RATE_LIMIT_INTERVAL: Duration = Duration::from_secs(1); + + let cfg = Config { + direct_connection_timeout: Duration::from_secs(0), + relay_connection_timeout: Duration::from_secs(0), + ip_whitelist: vec!["::1/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()], + max_inbound_direct_peers: 10, + max_inbound_relayed_peers: 10, + // Don't open connections automatically. + low_watermark: 0, + bootstrap: BootstrapConfig { + period: Duration::from_millis(500), + start_offset: Duration::from_secs(10), + }, + eviction_timeout: Duration::from_secs(15 * 60), + inbound_connections_rate_limit: RateLimit { + max: 2, + interval: RATE_LIMIT_INTERVAL, + }, + }; + + let mut peer1 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519()); + let peer2 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519()); + let peer3 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519()); + let peer4 = TestPeer::new(cfg, Keypair::generate_ed25519()); + + let addr1 = peer1.start_listening().await.unwrap(); + tracing::info!(%peer1.peer_id, %addr1); + + consume_events(peer1.event_receiver); + consume_events(peer2.event_receiver); + consume_events(peer3.event_receiver); + consume_events(peer4.event_receiver); + + // Two connections can be opened, but the third one is rate limited. + + peer2 + .client + .dial(peer1.peer_id, addr1.clone()) + .await + .unwrap(); + peer3 + .client + .dial(peer1.peer_id, addr1.clone()) + .await + .unwrap(); + + let result = peer4.client.dial(peer1.peer_id, addr1.clone()).await; + assert!(result.is_err()); +} + #[rstest] #[case::server_to_client(server_to_client().await)] #[case::client_to_server(client_to_server().await)] diff --git a/crates/pathfinder/src/bin/pathfinder/main.rs b/crates/pathfinder/src/bin/pathfinder/main.rs index cebb032738..0416577d8e 100644 --- a/crates/pathfinder/src/bin/pathfinder/main.rs +++ b/crates/pathfinder/src/bin/pathfinder/main.rs @@ -390,6 +390,10 @@ async fn start_p2p( ip_whitelist: config.ip_whitelist, bootstrap: Default::default(), eviction_timeout: Duration::from_secs(15 * 60), + inbound_connections_rate_limit: p2p::RateLimit { + max: 10, + interval: Duration::from_secs(1), + }, }, chain_id, storage,