Skip to content

Commit

Permalink
feat(kad): add refresh_interval config used to poll bootstrap
Browse files Browse the repository at this point in the history
Previously, users were responsible for calling `bootstrap` on an interval. This was documented but hard to discover for people new to the library. To maintain healthy routing tables, it is advised to regularly call `bootstrap`. By default, we will now do this automatically every 5 minutes and once we add a peer to our routing table, assuming we didn't bootstrap yet. This is especially useful as part of nodes starting up and connecting to bootstrap nodes.

Closes: #4730.

Pull-Request: #4838.

Co-authored-by: stormshield-frb <francois.ribeau.external@stormshield.eu>
  • Loading branch information
2 people authored and guillaumemichel committed Mar 28, 2024
1 parent 47899ef commit 8f7f705
Show file tree
Hide file tree
Showing 9 changed files with 434 additions and 20 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ libp2p-quic = { version = "0.10.2", path = "transports/quic" }
libp2p-relay = { version = "0.17.1", path = "protocols/relay" }
libp2p-rendezvous = { version = "0.14.0", path = "protocols/rendezvous" }
libp2p-request-response = { version = "0.26.2", path = "protocols/request-response" }
libp2p-server = { version = "0.12.6", path = "misc/server" }
libp2p-server = { version = "0.12.7", path = "misc/server" }
libp2p-stream = { version = "0.1.0-alpha.1", path = "protocols/stream" }
libp2p-swarm = { version = "0.44.2", path = "swarm" }
libp2p-swarm-derive = { version = "=0.34.3", path = "swarm-derive" } # `libp2p-swarm-derive` may not be compatible with different `libp2p-swarm` non-breaking releases. E.g. `libp2p-swarm` might introduce a new enum variant `FromSwarm` (which is `#[non-exhaustive]`) in a non-breaking release. Older versions of `libp2p-swarm-derive` would not forward this enum variant within the `NetworkBehaviour` hierarchy. Thus the version pinning is required.
Expand Down
8 changes: 8 additions & 0 deletions misc/server/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
## 0.12.7

### Changed

- Use periodic and automatic bootstrap of Kademlia.
See [PR 4838](https://github.com/libp2p/rust-libp2p/pull/4838).

## 0.12.6

### Changed

- Stop using kad default protocol.
See [PR 5122](https://github.com/libp2p/rust-libp2p/pull/5122)

Expand Down
2 changes: 1 addition & 1 deletion misc/server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "libp2p-server"
version = "0.12.6"
version = "0.12.7"
authors = ["Max Inden <mail@max-inden.de>"]
edition = "2021"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
16 changes: 0 additions & 16 deletions misc/server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use base64::Engine;
use clap::Parser;
use futures::stream::StreamExt;
use futures_timer::Delay;
use libp2p::identity;
use libp2p::identity::PeerId;
use libp2p::kad;
Expand All @@ -14,17 +13,13 @@ use prometheus_client::registry::Registry;
use std::error::Error;
use std::path::PathBuf;
use std::str::FromStr;
use std::task::Poll;
use std::time::Duration;
use tracing_subscriber::EnvFilter;
use zeroize::Zeroizing;

mod behaviour;
mod config;
mod http_service;

const BOOTSTRAP_INTERVAL: Duration = Duration::from_secs(5 * 60);

#[derive(Debug, Parser)]
#[clap(name = "libp2p server", about = "A rust-libp2p server binary.")]
struct Opts {
Expand Down Expand Up @@ -127,18 +122,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
});

let mut bootstrap_timer = Delay::new(BOOTSTRAP_INTERVAL);

loop {
if let Poll::Ready(()) = futures::poll!(&mut bootstrap_timer) {
bootstrap_timer.reset(BOOTSTRAP_INTERVAL);
let _ = swarm
.behaviour_mut()
.kademlia
.as_mut()
.map(|k| k.bootstrap());
}

let event = swarm.next().await.expect("Swarm not to terminate.");
metrics.record(&event);
match event {
Expand Down
59 changes: 59 additions & 0 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
mod test;

use crate::addresses::Addresses;
use crate::bootstrap;
use crate::handler::{Handler, HandlerEvent, HandlerIn, RequestId};
use crate::kbucket::{self, Distance, KBucketsTable, NodeStatus};
use crate::protocol::{ConnectionType, KadPeer, ProtocolConfig};
Expand Down Expand Up @@ -116,6 +117,9 @@ pub struct Behaviour<TStore> {

/// The record storage.
store: TStore,

/// Tracks the status of the current bootstrap.
bootstrap_status: bootstrap::Status,
}

/// The configurable strategies for the insertion of peers
Expand Down Expand Up @@ -181,6 +185,8 @@ pub struct Config {
provider_publication_interval: Option<Duration>,
kbucket_inserts: BucketInserts,
caching: Caching,
periodic_bootstrap_interval: Option<Duration>,
automatic_bootstrap_throttle: Option<Duration>,
}

impl Default for Config {
Expand Down Expand Up @@ -222,6 +228,8 @@ impl Config {
provider_record_ttl: Some(Duration::from_secs(48 * 60 * 60)),
kbucket_inserts: BucketInserts::OnConnected,
caching: Caching::Enabled { max_peers: 1 },
periodic_bootstrap_interval: Some(Duration::from_secs(5 * 60)),
automatic_bootstrap_throttle: Some(bootstrap::DEFAULT_AUTOMATIC_THROTTLE),
}
}

Expand Down Expand Up @@ -408,6 +416,34 @@ impl Config {
self.caching = c;
self
}

/// Sets the interval on which [`Behaviour::bootstrap`] is called periodically.
///
/// * Default to `5` minutes.
/// * Set to `None` to disable periodic bootstrap.
pub fn set_periodic_bootstrap_interval(&mut self, interval: Option<Duration>) -> &mut Self {
self.periodic_bootstrap_interval = interval;
self
}

/// Sets the time to wait before calling [`Behaviour::bootstrap`] after a new peer is inserted in the routing table.
/// This prevent cascading bootstrap requests when multiple peers are inserted into the routing table "at the same time".
/// This also allows to wait a little bit for other potential peers to be inserted into the routing table before
/// triggering a bootstrap, giving more context to the future bootstrap request.
///
/// * Default to `500` ms.
/// * Set to `Some(Duration::ZERO)` to never wait before triggering a bootstrap request when a new peer
/// is inserted in the routing table.
/// * Set to `None` to disable automatic bootstrap (no bootstrap request will be triggered when a new
/// peer is inserted in the routing table).
#[cfg(test)]
pub(crate) fn set_automatic_bootstrap_throttle(
&mut self,
duration: Option<Duration>,
) -> &mut Self {
self.automatic_bootstrap_throttle = duration;
self
}
}

impl<TStore> Behaviour<TStore>
Expand Down Expand Up @@ -465,6 +501,10 @@ where
mode: Mode::Client,
auto_mode: true,
no_events_waker: None,
bootstrap_status: bootstrap::Status::new(
config.periodic_bootstrap_interval,
config.automatic_bootstrap_throttle,
),
}
}

Expand Down Expand Up @@ -566,6 +606,7 @@ where
};
match entry.insert(addresses.clone(), status) {
kbucket::InsertResult::Inserted => {
self.bootstrap_status.on_new_peer_in_routing_table();
self.queued_events.push_back(ToSwarm::GenerateEvent(
Event::RoutingUpdated {
peer: *peer,
Expand Down Expand Up @@ -884,6 +925,13 @@ where
///
/// > **Note**: Bootstrapping requires at least one node of the DHT to be known.
/// > See [`Behaviour::add_address`].
///
/// > **Note**: Bootstrap does not require to be called manually. It is periodically
/// invoked at regular intervals based on the configured `periodic_bootstrap_interval` (see
/// [`Config::set_periodic_bootstrap_interval`] for details) and it is also automatically invoked
/// when a new peer is inserted in the routing table.
/// This parameter is used to call [`Behaviour::bootstrap`] periodically and automatically
/// to ensure a healthy routing table.
pub fn bootstrap(&mut self) -> Result<QueryId, NoKnownPeers> {
let local_key = self.kbuckets.local_key().clone();
let info = QueryInfo::Bootstrap {
Expand All @@ -895,6 +943,7 @@ where
if peers.is_empty() {
Err(NoKnownPeers())
} else {
self.bootstrap_status.on_started();
let inner = QueryInner::new(info);
Ok(self.queries.add_iter_closest(local_key, peers, inner))
}
Expand Down Expand Up @@ -1291,6 +1340,7 @@ where
let addresses = Addresses::new(a);
match entry.insert(addresses.clone(), new_status) {
kbucket::InsertResult::Inserted => {
self.bootstrap_status.on_new_peer_in_routing_table();
let event = Event::RoutingUpdated {
peer,
is_new_peer: true,
Expand Down Expand Up @@ -1406,6 +1456,7 @@ where
.continue_iter_closest(query_id, target.clone(), peers, inner);
} else {
step.last = true;
self.bootstrap_status.on_finish();
};

Some(Event::OutboundQueryProgressed {
Expand Down Expand Up @@ -1608,6 +1659,7 @@ where
.continue_iter_closest(query_id, target.clone(), peers, inner);
} else {
step.last = true;
self.bootstrap_status.on_finish();
}

Some(Event::OutboundQueryProgressed {
Expand Down Expand Up @@ -2480,6 +2532,13 @@ where
self.put_record_job = Some(job);
}

// Poll bootstrap periodically and automatically.
if let Poll::Ready(()) = self.bootstrap_status.poll_next_bootstrap(cx) {
if let Err(e) = self.bootstrap() {
tracing::warn!("Failed to trigger bootstrap: {e}");
}
}

loop {
// Drain queued events first.
if let Some(event) = self.queued_events.pop_front() {
Expand Down
18 changes: 17 additions & 1 deletion protocols/kad/src/behaviour/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ fn bootstrap() {
let num_group = rng.gen_range(1..(num_total % K_VALUE.get()) + 2);

let mut cfg = Config::new(PROTOCOL_NAME);
// Disabling periodic bootstrap and automatic bootstrap to prevent the bootstrap from triggering automatically.
cfg.set_periodic_bootstrap_interval(None);
cfg.set_automatic_bootstrap_throttle(None);
if rng.gen() {
cfg.disjoint_query_paths(true);
}
Expand Down Expand Up @@ -252,7 +255,11 @@ fn query_iter() {

fn run(rng: &mut impl Rng) {
let num_total = rng.gen_range(2..20);
let mut swarms = build_connected_nodes(num_total, 1)
let mut config = Config::new(PROTOCOL_NAME);
// Disabling periodic bootstrap and automatic bootstrap to prevent the bootstrap from triggering automatically.
config.set_periodic_bootstrap_interval(None);
config.set_automatic_bootstrap_throttle(None);
let mut swarms = build_connected_nodes_with_config(num_total, 1, config)
.into_iter()
.map(|(_a, s)| s)
.collect::<Vec<_>>();
Expand Down Expand Up @@ -500,6 +507,9 @@ fn put_record() {

let mut config = Config::new(PROTOCOL_NAME);
config.set_replication_factor(replication_factor);
// Disabling periodic bootstrap and automatic bootstrap to prevent the bootstrap from triggering automatically.
config.set_periodic_bootstrap_interval(None);
config.set_automatic_bootstrap_throttle(None);
if rng.gen() {
config.disjoint_query_paths(true);
}
Expand Down Expand Up @@ -869,6 +879,9 @@ fn add_provider() {

let mut config = Config::new(PROTOCOL_NAME);
config.set_replication_factor(replication_factor);
// Disabling periodic bootstrap and automatic bootstrap to prevent the bootstrap from triggering automatically.
config.set_periodic_bootstrap_interval(None);
config.set_automatic_bootstrap_throttle(None);
if rng.gen() {
config.disjoint_query_paths(true);
}
Expand Down Expand Up @@ -1094,6 +1107,9 @@ fn disjoint_query_does_not_finish_before_all_paths_did() {
config.disjoint_query_paths(true);
// I.e. setting the amount disjoint paths to be explored to 2.
config.set_parallelism(NonZeroUsize::new(2).unwrap());
// Disabling periodic bootstrap and automatic bootstrap to prevent the bootstrap from triggering automatically.
config.set_periodic_bootstrap_interval(None);
config.set_automatic_bootstrap_throttle(None);

let mut alice = build_node_with_config(config);
let mut trudy = build_node(); // Trudy the intrudor, an adversary.
Expand Down
Loading

0 comments on commit 8f7f705

Please sign in to comment.