Skip to content

Commit

Permalink
Restore rate limiting of connections opening (#1340)
Browse files Browse the repository at this point in the history
* Restore rate limiting of connections opening

* PR link
  • Loading branch information
tomaka authored Nov 15, 2023
1 parent bdbd88a commit f1e52da
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 4 deletions.
4 changes: 3 additions & 1 deletion light-base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
extern crate alloc;

use alloc::{borrow::ToOwned as _, boxed::Box, format, string::String, sync::Arc, vec, vec::Vec};
use core::{num::NonZeroU32, ops, pin};
use core::{num::NonZeroU32, ops, pin, time::Duration};
use futures_util::FutureExt as _;
use hashbrown::{hash_map::Entry, HashMap};
use itertools::Itertools as _;
Expand Down Expand Up @@ -1098,6 +1098,8 @@ fn start_services<TPlat: platform::PlatformRef>(
platform: platform.clone(),
num_events_receivers: 1, // Configures the length of `network_event_receivers`
identify_agent_version: network_identify_agent_version,
connections_open_pool_size: 5,
connections_open_pool_restore_delay: Duration::from_secs(1),
chains: vec![network_service::ConfigChain {
log_name: log_name.clone(),
num_out_slots: 4,
Expand Down
60 changes: 57 additions & 3 deletions light-base/src/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ pub struct Config<TPlat> {

/// List of chains to connect to. Chains are later referred to by their index in this list.
pub chains: Vec<ConfigChain>,

/// Maximum number of connections that the service can open simultaneously. After this value
/// has been reached, a new connection can be opened after each
/// [`Config::connections_open_pool_restore_delay`].
pub connections_open_pool_size: u32,

/// Delay after which the service can open a new connection.
/// The delay is cumulative. If no connection has been opened for example for twice this
/// duration, then two connections can be opened at the same time, up to a maximum of
/// [`Config::connections_open_pool_size`].
pub connections_open_pool_restore_delay: Duration,
}

/// See [`Config::chains`].
Expand Down Expand Up @@ -251,6 +262,10 @@ impl<TPlat: PlatformRef> NetworkService<TPlat> {
},
),
network,
connections_open_pool_size: config.connections_open_pool_size,
connections_open_pool_restore_delay: config.connections_open_pool_restore_delay,
num_recent_connection_opening: 0,
next_recent_connection_restore: None,
platform: config.platform.clone(),
event_pending_send: None,
event_senders: either::Left(event_senders),
Expand Down Expand Up @@ -880,6 +895,20 @@ struct BackgroundTask<TPlat: PlatformRef> {
/// All known peers and their addresses.
peering_strategy: basic_peering_strategy::BasicPeeringStrategy<ChainId, TPlat::Instant>,

/// See [`Config::connections_open_pool_size`].
connections_open_pool_size: u32,

/// See [`Config::connections_open_pool_restore_delay`].
connections_open_pool_restore_delay: Duration,

/// Every time a connection is opened, the value in this field is increased by one. After
/// [`BackgroundTask::next_recent_connection_restore`] has yielded, the value is reduced by
/// one.
num_recent_connection_opening: u32,

/// Delay after which [`BackgroundTask::num_recent_connection_opening`] is increased by one.
next_recent_connection_restore: Option<Pin<Box<TPlat::Delay>>>,

/// List of nodes that are considered as important for logging purposes.
// TODO: should also detect whenever we fail to open a block announces substream with any of these peers
important_nodes: HashSet<PeerId, fnv::FnvBuildHasher>,
Expand Down Expand Up @@ -938,6 +967,7 @@ async fn background_task<TPlat: PlatformRef>(mut task: BackgroundTask<TPlat>) {
Message(ToBackground),
NetworkEvent(service::Event<async_channel::Sender<service::CoordinatorToConnection>>),
CanAssignSlot(PeerId, ChainId),
NextRecentConnectionRestore,
CanStartConnect(PeerId),
CanOpenGossip(PeerId, ChainId),
MessageToConnection {
Expand All @@ -959,7 +989,9 @@ async fn background_task<TPlat: PlatformRef>(mut task: BackgroundTask<TPlat>) {
{
WakeUpReason::NetworkEvent(event)
} else if let Some(start_connect) = {
let x = task.network.unconnected_desired().next().cloned();
let x = (task.num_recent_connection_opening < task.connections_open_pool_size)
.then(|| task.network.unconnected_desired().next().cloned())
.flatten();
x
} {
WakeUpReason::CanStartConnect(start_connect)
Expand Down Expand Up @@ -1021,6 +1053,23 @@ async fn background_task<TPlat: PlatformRef>(mut task: BackgroundTask<TPlat>) {
}
}
};
let next_recent_connection_restore = async {
if task.num_recent_connection_opening != 0
&& task.next_recent_connection_restore.is_none()
{
task.next_recent_connection_restore = Some(Box::pin(
task.platform
.sleep(task.connections_open_pool_restore_delay),
));
}
if let Some(delay) = task.next_recent_connection_restore.as_mut() {
delay.await;
task.next_recent_connection_restore = None;
WakeUpReason::NextRecentConnectionRestore
} else {
future::pending().await
}
};
let finished_sending_event = async {
if let either::Right(event_sending_future) = &mut task.event_senders {
let event_senders = event_sending_future.await;
Expand All @@ -1035,6 +1084,7 @@ async fn background_task<TPlat: PlatformRef>(mut task: BackgroundTask<TPlat>) {

message_received
.or(service_event)
.or(next_recent_connection_restore)
.or(finished_sending_event)
.await
};
Expand Down Expand Up @@ -1853,9 +1903,11 @@ async fn background_task<TPlat: PlatformRef>(mut task: BackgroundTask<TPlat>) {
service::GossipKind::ConsensusTransactions,
);
}
WakeUpReason::NextRecentConnectionRestore => {
task.num_recent_connection_opening =
task.num_recent_connection_opening.saturating_sub(1);
}
WakeUpReason::CanStartConnect(expected_peer_id) => {
// TODO: restore rate limiting

let Some(multiaddr) = task.peering_strategy.addr_to_connected(&expected_peer_id)
else {
// There is no address for that peer in the address book.
Expand Down Expand Up @@ -1921,6 +1973,8 @@ async fn background_task<TPlat: PlatformRef>(mut task: BackgroundTask<TPlat>) {
peer_id::PublicKey::Ed25519(*noise_key.libp2p_public_ed25519_key()).into_peer_id(),
);

task.num_recent_connection_opening += 1;

let (coordinator_to_connection_tx, coordinator_to_connection_rx) =
async_channel::bounded(8);
let task_name = format!("connection-{}", multiaddr);
Expand Down
4 changes: 4 additions & 0 deletions wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### Changed

- Smoldot will now only try opening a maximum of five connections simultaneously, then one per second. This avoids possible situations where a server is being accidentally hammered by smoldot, and avoids potentially making traffic suspicious to some ISPs. ([#1340](https://github.com/smol-dot/smoldot/pull/1340))

## 2.0.8 - 2023-11-15

### Changed
Expand Down

0 comments on commit f1e52da

Please sign in to comment.