Skip to content

Commit

Permalink
Merge pull request #1746 from eqlabs/sistemd/inbound-connection-rate-…
Browse files Browse the repository at this point in the history
…limiting

feat(p2p): inbound connections rate limiting
  • Loading branch information
sistemd authored Feb 7, 2024
2 parents 93eb6c4 + 8f8eab0 commit eaba589
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 13 deletions.
22 changes: 19 additions & 3 deletions crates/p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ impl NetworkBehaviour for Behaviour {
// and peers connecting over a relay.
if is_relayed {
if self.num_inbound_relayed_peers() >= self.cfg.max_inbound_relayed_peers {
tracing::debug!(%connection_id, "Too many inbound relay peers, closing");
tracing::debug!(%peer, %connection_id, "Too many inbound relay peers, closing");
return Err(ConnectionDenied::new(anyhow!(
"too many inbound relay peers"
)));
}
} else if self.num_inbound_direct_peers() >= self.cfg.max_inbound_direct_peers {
tracing::debug!(%connection_id, "Too many inbound direct peers, closing");
tracing::debug!(%peer, %connection_id, "Too many inbound direct peers, closing");
return Err(ConnectionDenied::new(anyhow!(
"too many inbound direct peers"
)));
Expand Down Expand Up @@ -217,6 +217,22 @@ impl NetworkBehaviour for Behaviour {
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<(), ConnectionDenied> {
// Apply rate limiting to inbound connections.
let rate_limit_interval =
Instant::now() - self.cfg.inbound_connections_rate_limit.interval..Instant::now();
let num_connected = self
.peers()
.filter(|(_, peer)| peer.is_inbound())
.filter_map(|(_, peer)| peer.connected_at())
.filter(|t| rate_limit_interval.contains(t))
.count();
if num_connected >= self.cfg.inbound_connections_rate_limit.max {
tracing::debug!(%connection_id, %remote_addr, "Too many inbound connections, closing");
return Err(ConnectionDenied::new(anyhow!(
"too many inbound connections"
)));
}

// Extract the IP address of the peer from his multiaddr.
let peer_ip = remote_addr.iter().find_map(|p| match p {
Protocol::Ip4(ip) => Some(IpAddr::V4(ip)),
Expand All @@ -237,7 +253,7 @@ impl NetworkBehaviour for Behaviour {
.iter()
.any(|net| net.contains(&peer_ip))
{
tracing::debug!(%peer_ip, %connection_id, "Disconnected peer not in IP whitelist");
tracing::debug!(%peer_ip, %connection_id, "Peer not in IP whitelist, disconnecting");
return Err(ConnectionDenied::new(anyhow!("peer not in IP whitelist")));
}

Expand Down
7 changes: 7 additions & 0 deletions crates/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ pub struct Config {
pub eviction_timeout: Duration,
pub ip_whitelist: Vec<IpNet>,
pub bootstrap: BootstrapConfig,
pub inbound_connections_rate_limit: RateLimit,
}

#[derive(Debug, Clone)]
pub struct RateLimit {
pub max: usize,
pub interval: Duration,
}

#[derive(Copy, Clone, Debug)]
Expand Down
102 changes: 92 additions & 10 deletions crates/p2p/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use rstest::rstest;
use tokio::task::JoinHandle;

use crate::peers::Peer;
use crate::{BootstrapConfig, Config, Event, EventReceiver, TestEvent};
use crate::{BootstrapConfig, Config, Event, EventReceiver, RateLimit, TestEvent};

#[allow(dead_code)]
#[derive(Debug)]
Expand Down Expand Up @@ -83,14 +83,18 @@ impl Default for TestPeer {
fn default() -> Self {
Self::new(
Config {
direct_connection_timeout: Duration::from_secs(30),
relay_connection_timeout: Duration::from_secs(10),
direct_connection_timeout: Duration::from_secs(0),
relay_connection_timeout: Duration::from_secs(0),
max_inbound_direct_peers: 10,
max_inbound_relayed_peers: 10,
low_watermark: 10,
ip_whitelist: vec!["::/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()],
bootstrap: Default::default(),
eviction_timeout: Duration::from_secs(15 * 60),
inbound_connections_rate_limit: RateLimit {
max: 1000,
interval: Duration::from_secs(1),
},
},
Keypair::generate_ed25519(),
)
Expand Down Expand Up @@ -246,6 +250,10 @@ async fn periodic_bootstrap() {
start_offset: Duration::from_secs(1),
},
eviction_timeout: Duration::from_secs(15 * 60),
inbound_connections_rate_limit: RateLimit {
max: 1000,
interval: Duration::from_secs(1),
},
};
let mut boot = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
let mut peer1 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
Expand Down Expand Up @@ -352,7 +360,7 @@ async fn reconnect_too_quickly() {
const CONNECTION_TIMEOUT: Duration = Duration::from_secs(1);
let cfg = Config {
direct_connection_timeout: CONNECTION_TIMEOUT,
relay_connection_timeout: Duration::from_millis(500),
relay_connection_timeout: Duration::from_secs(0),
ip_whitelist: vec!["::1/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()],
max_inbound_direct_peers: 10,
max_inbound_relayed_peers: 10,
Expand All @@ -362,6 +370,10 @@ async fn reconnect_too_quickly() {
start_offset: Duration::from_secs(10),
},
eviction_timeout: Duration::from_secs(15 * 60),
inbound_connections_rate_limit: RateLimit {
max: 1000,
interval: Duration::from_secs(1),
},
};

let mut peer1 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
Expand Down Expand Up @@ -448,7 +460,7 @@ async fn duplicate_connection() {
const CONNECTION_TIMEOUT: Duration = Duration::from_millis(50);
let cfg = Config {
direct_connection_timeout: CONNECTION_TIMEOUT,
relay_connection_timeout: Duration::from_millis(500),
relay_connection_timeout: Duration::from_secs(0),
ip_whitelist: vec!["::1/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()],
max_inbound_direct_peers: 10,
max_inbound_relayed_peers: 10,
Expand All @@ -459,6 +471,10 @@ async fn duplicate_connection() {
start_offset: Duration::from_secs(10),
},
eviction_timeout: Duration::from_secs(15 * 60),
inbound_connections_rate_limit: RateLimit {
max: 1000,
interval: Duration::from_secs(1),
},
};
let keypair = Keypair::generate_ed25519();
let mut peer1 = TestPeer::new(cfg.clone(), keypair.clone());
Expand Down Expand Up @@ -530,7 +546,7 @@ async fn max_inbound_connections() {
const CONNECTION_TIMEOUT: Duration = Duration::from_millis(50);
let cfg = Config {
direct_connection_timeout: CONNECTION_TIMEOUT,
relay_connection_timeout: Duration::from_millis(500),
relay_connection_timeout: Duration::from_secs(0),
ip_whitelist: vec!["::1/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()],
max_inbound_direct_peers: 2,
max_inbound_relayed_peers: 0,
Expand All @@ -541,6 +557,10 @@ async fn max_inbound_connections() {
start_offset: Duration::from_secs(10),
},
eviction_timeout: Duration::from_secs(15 * 60),
inbound_connections_rate_limit: RateLimit {
max: 1000,
interval: Duration::from_secs(1),
},
};
let mut peer1 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
let mut peer2 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
Expand Down Expand Up @@ -647,8 +667,8 @@ async fn max_inbound_connections() {
#[test_log::test(tokio::test)]
async fn ip_whitelist() {
let cfg = Config {
direct_connection_timeout: Duration::from_millis(50),
relay_connection_timeout: Duration::from_millis(50),
direct_connection_timeout: Duration::from_secs(0),
relay_connection_timeout: Duration::from_secs(0),
ip_whitelist: vec!["127.0.0.2/32".parse().unwrap()],
max_inbound_direct_peers: 10,
max_inbound_relayed_peers: 10,
Expand All @@ -659,6 +679,10 @@ async fn ip_whitelist() {
start_offset: Duration::from_secs(10),
},
eviction_timeout: Duration::from_secs(15 * 60),
inbound_connections_rate_limit: RateLimit {
max: 1000,
interval: Duration::from_secs(1),
},
};
let mut peer1 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
let peer2 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
Expand All @@ -675,8 +699,8 @@ async fn ip_whitelist() {

// Start another peer accepting connections from 127.0.0.1.
let cfg = Config {
direct_connection_timeout: Duration::from_millis(50),
relay_connection_timeout: Duration::from_millis(50),
direct_connection_timeout: Duration::from_secs(0),
relay_connection_timeout: Duration::from_secs(0),
ip_whitelist: vec!["127.0.0.1/32".parse().unwrap()],
max_inbound_direct_peers: 10,
max_inbound_relayed_peers: 10,
Expand All @@ -687,6 +711,10 @@ async fn ip_whitelist() {
start_offset: Duration::from_secs(10),
},
eviction_timeout: Duration::from_secs(15 * 60),
inbound_connections_rate_limit: RateLimit {
max: 1000,
interval: Duration::from_secs(1),
},
};
let mut peer3 = TestPeer::new(cfg, Keypair::generate_ed25519());

Expand All @@ -698,6 +726,60 @@ async fn ip_whitelist() {
assert!(result.is_ok());
}

/// Check that inbound connections get rate limited.
#[test_log::test(tokio::test)]
async fn rate_limit() {
const RATE_LIMIT_INTERVAL: Duration = Duration::from_secs(1);

let cfg = Config {
direct_connection_timeout: Duration::from_secs(0),
relay_connection_timeout: Duration::from_secs(0),
ip_whitelist: vec!["::1/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()],
max_inbound_direct_peers: 10,
max_inbound_relayed_peers: 10,
// Don't open connections automatically.
low_watermark: 0,
bootstrap: BootstrapConfig {
period: Duration::from_millis(500),
start_offset: Duration::from_secs(10),
},
eviction_timeout: Duration::from_secs(15 * 60),
inbound_connections_rate_limit: RateLimit {
max: 2,
interval: RATE_LIMIT_INTERVAL,
},
};

let mut peer1 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
let peer2 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
let peer3 = TestPeer::new(cfg.clone(), Keypair::generate_ed25519());
let peer4 = TestPeer::new(cfg, Keypair::generate_ed25519());

let addr1 = peer1.start_listening().await.unwrap();
tracing::info!(%peer1.peer_id, %addr1);

consume_events(peer1.event_receiver);
consume_events(peer2.event_receiver);
consume_events(peer3.event_receiver);
consume_events(peer4.event_receiver);

// Two connections can be opened, but the third one is rate limited.

peer2
.client
.dial(peer1.peer_id, addr1.clone())
.await
.unwrap();
peer3
.client
.dial(peer1.peer_id, addr1.clone())
.await
.unwrap();

let result = peer4.client.dial(peer1.peer_id, addr1.clone()).await;
assert!(result.is_err());
}

#[rstest]
#[case::server_to_client(server_to_client().await)]
#[case::client_to_server(client_to_server().await)]
Expand Down
4 changes: 4 additions & 0 deletions crates/pathfinder/src/bin/pathfinder/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,10 @@ async fn start_p2p(
ip_whitelist: config.ip_whitelist,
bootstrap: Default::default(),
eviction_timeout: Duration::from_secs(15 * 60),
inbound_connections_rate_limit: p2p::RateLimit {
max: 10,
interval: Duration::from_secs(1),
},
},
chain_id,
storage,
Expand Down

0 comments on commit eaba589

Please sign in to comment.