Skip to content

Commit

Permalink
--wip-- [skip ci]
Browse files Browse the repository at this point in the history
  • Loading branch information
RolandSherwin committed Dec 4, 2024
1 parent ee36239 commit 772ef1d
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 17 deletions.
1 change: 1 addition & 0 deletions ant-networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ websockets = ["libp2p/tcp"]

[dependencies]
aes-gcm-siv = "0.11.1"
ant-bootstrap = { path = "../ant-bootstrap", version = "0.1.0" }
ant-build-info = { path = "../ant-build-info", version = "0.1.19" }
ant-evm = { path = "../ant-evm", version = "0.1.4" }
ant-protocol = { path = "../ant-protocol", version = "0.17.15" }
Expand Down
103 changes: 93 additions & 10 deletions ant-networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{
};
use crate::{transport, NodeIssue};

use ant_bootstrap::BootstrapCacheStore;
use ant_evm::PaymentQuote;
use ant_protocol::{
messages::{ChunkProof, Nonce, Request, Response},
Expand Down Expand Up @@ -71,8 +72,11 @@ use std::{
num::NonZeroUsize,
path::PathBuf,
};
use tokio::sync::{mpsc, oneshot};
use tokio::time::Duration;
use tokio::{
sync::{mpsc, oneshot},
time::Interval,
};
use tracing::warn;
use xor_name::XorName;

Expand Down Expand Up @@ -260,13 +264,13 @@ pub(super) struct NodeBehaviour {

#[derive(Debug)]
pub struct NetworkBuilder {
bootstrap_cache: Option<BootstrapCacheStore>,
is_behind_home_network: bool,
keypair: Keypair,
local: bool,
listen_addr: Option<SocketAddr>,
request_timeout: Option<Duration>,
concurrency_limit: Option<usize>,
initial_peers: Vec<Multiaddr>,
#[cfg(feature = "open-metrics")]
metrics_registries: Option<MetricsRegistries>,
#[cfg(feature = "open-metrics")]
Expand All @@ -278,13 +282,13 @@ pub struct NetworkBuilder {
impl NetworkBuilder {
pub fn new(keypair: Keypair, local: bool) -> Self {
Self {
bootstrap_cache: None,
is_behind_home_network: false,
keypair,
local,
listen_addr: None,
request_timeout: None,
concurrency_limit: None,
initial_peers: Default::default(),
#[cfg(feature = "open-metrics")]
metrics_registries: None,
#[cfg(feature = "open-metrics")]
Expand All @@ -294,6 +298,10 @@ impl NetworkBuilder {
}
}

pub fn bootstrap_cache(&mut self, bootstrap_cache: BootstrapCacheStore) {
self.bootstrap_cache = Some(bootstrap_cache);
}

pub fn is_behind_home_network(&mut self, enable: bool) {
self.is_behind_home_network = enable;
}
Expand All @@ -310,10 +318,6 @@ impl NetworkBuilder {
self.concurrency_limit = Some(concurrency_limit);
}

pub fn initial_peers(&mut self, initial_peers: Vec<Multiaddr>) {
self.initial_peers = initial_peers;
}

/// Set the registries used inside the metrics server.
/// Configure the `metrics_server_port` to enable the metrics server.
#[cfg(feature = "open-metrics")]
Expand Down Expand Up @@ -720,6 +724,7 @@ impl NetworkBuilder {
close_group: Vec::with_capacity(CLOSE_GROUP_SIZE),
peers_in_rt: 0,
bootstrap,
bootstrap_cache: self.bootstrap_cache,
relay_manager,
connected_relay_clients: Default::default(),
external_address_manager,
Expand Down Expand Up @@ -815,6 +820,7 @@ pub struct SwarmDriver {
pub(crate) close_group: Vec<PeerId>,
pub(crate) peers_in_rt: usize,
pub(crate) bootstrap: ContinuousNetworkDiscover,
pub(crate) bootstrap_cache: Option<BootstrapCacheStore>,
pub(crate) external_address_manager: Option<ExternalAddressManager>,
pub(crate) relay_manager: Option<RelayManager>,
/// The peers that are using our relay service.
Expand Down Expand Up @@ -843,7 +849,7 @@ pub struct SwarmDriver {
pub(crate) bootstrap_peers: BTreeMap<Option<u32>, HashSet<PeerId>>,
// Peers that having live connection to. Any peer got contacted during kad network query
// will have live connection established. And they may not appear in the RT.
pub(crate) live_connected_peers: BTreeMap<ConnectionId, (PeerId, Instant)>,
pub(crate) live_connected_peers: BTreeMap<ConnectionId, (PeerId, Multiaddr, Instant)>,
/// The list of recently established connections ids.
/// This is used to prevent log spamming.
pub(crate) latest_established_connection_ids: HashMap<usize, (IpAddr, Instant)>,
Expand Down Expand Up @@ -876,6 +882,17 @@ impl SwarmDriver {
let mut set_farthest_record_interval = interval(CLOSET_RECORD_CHECK_INTERVAL);
let mut relay_manager_reservation_interval = interval(RELAY_MANAGER_RESERVATION_INTERVAL);

let mut bootstrap_cache_save_interval = self.bootstrap_cache.as_ref().and_then(|cache| {
if cache.config().disable_cache_writing {
None
} else {
// add a variance of 10% to the interval, to avoid all nodes writing to disk at the same time.
let duration =
Self::duration_with_variance(cache.config().min_cache_save_duration, 10);
Some(interval(duration))
}
});

// temporarily skip processing IncomingConnectionError swarm event to avoid log spamming
let mut previous_incoming_connection_error_event = None;
loop {
Expand Down Expand Up @@ -1005,6 +1022,36 @@ impl SwarmDriver {
relay_manager.try_connecting_to_relay(&mut self.swarm, &self.bad_nodes)
}
},
Some(_) = Self::conditional_interval(&mut bootstrap_cache_save_interval) => {
let Some(bootstrap_cache) = self.bootstrap_cache.as_mut() else {
continue;
};
let Some(current_interval) = bootstrap_cache_save_interval.as_mut() else {
continue;
};

if let Err(err) = bootstrap_cache.sync_and_save_to_disk(true).await {
error!("Failed to save bootstrap cache: {err}");
}

if current_interval.period() >= bootstrap_cache.config().max_cache_save_duration {
continue;
}

// add a variance of 1% to the max interval to avoid all nodes writing to disk at the same time.
let max_cache_save_duration =
Self::duration_with_variance(bootstrap_cache.config().max_cache_save_duration, 1);

// scale up the interval until we reach the max
let new_duration = Duration::from_secs(
std::cmp::min(
current_interval.period().as_secs() * bootstrap_cache.config().cache_save_scaling_factor,
max_cache_save_duration.as_secs(),
));
debug!("Scaling up the bootstrap cache save interval to {new_duration:?}");
*current_interval = interval(new_duration);

},
}
}
}
Expand Down Expand Up @@ -1156,13 +1203,35 @@ impl SwarmDriver {
info!("Listening on {id:?} with addr: {addr:?}");
Ok(())
}

/// Returns a new duration that is within +/- variance of the provided duration.
fn duration_with_variance(duration: Duration, variance: u32) -> Duration {
let actual_variance = duration / variance;
let random_adjustment =
Duration::from_secs(rand::thread_rng().gen_range(0..actual_variance.as_secs()));
if random_adjustment.as_secs() % 2 == 0 {
duration - random_adjustment
} else {
duration + random_adjustment
}
}

/// To tick an optional interval inside tokio::select! without looping forever.
async fn conditional_interval(i: &mut Option<Interval>) -> Option<()> {
match i {
Some(i) => {
i.tick().await;
Some(())
}
None => None,
}
}
}

#[cfg(test)]
mod tests {
use super::check_and_wipe_storage_dir_if_necessary;

use std::{fs, io::Read};
use std::{fs, io::Read, time::Duration};

#[tokio::test]
async fn version_file_update() {
Expand Down Expand Up @@ -1219,4 +1288,18 @@ mod tests {
// The storage_dir shall be removed as version_key changed
assert!(fs::metadata(storage_dir.clone()).is_err());
}

#[tokio::test]
async fn test_duration_variance_fn() {
let duration = Duration::from_secs(100);
let variance = 10;
for _ in 0..10000 {
let new_duration = crate::SwarmDriver::duration_with_variance(duration, variance);
if new_duration < duration - duration / variance
|| new_duration > duration + duration / variance
{
panic!("new_duration: {new_duration:?} is not within the expected range",);
}
}
}
}
3 changes: 2 additions & 1 deletion ant-networking/src/event/kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,12 @@ impl SwarmDriver {
peer,
is_new_peer,
old_peer,
addresses,
..
} => {
event_string = "kad_event::RoutingUpdated";
if is_new_peer {
self.update_on_peer_addition(peer);
self.update_on_peer_addition(peer, addresses);

// This should only happen once
if self.bootstrap.notify_new_peer() {
Expand Down
10 changes: 8 additions & 2 deletions ant-networking/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use custom_debug::Debug as CustomDebug;
#[cfg(feature = "local")]
use libp2p::mdns;
use libp2p::{
kad::{Record, RecordKey, K_VALUE},
kad::{Addresses, Record, RecordKey, K_VALUE},
request_response::ResponseChannel as PeerResponseChannel,
Multiaddr, PeerId,
};
Expand Down Expand Up @@ -232,14 +232,20 @@ impl SwarmDriver {
}

/// Update state on addition of a peer to the routing table.
pub(crate) fn update_on_peer_addition(&mut self, added_peer: PeerId) {
pub(crate) fn update_on_peer_addition(&mut self, added_peer: PeerId, addresses: Addresses) {
self.peers_in_rt = self.peers_in_rt.saturating_add(1);
let n_peers = self.peers_in_rt;
info!("New peer added to routing table: {added_peer:?}, now we have #{n_peers} connected peers");

#[cfg(feature = "loud")]
println!("New peer added to routing table: {added_peer:?}, now we have #{n_peers} connected peers");

if let Some(bootstrap_cache) = &mut self.bootstrap_cache {
for addr in addresses.iter() {
bootstrap_cache.add_addr(addr.clone());
}
}

self.log_kbuckets(&added_peer);
self.send_event(NetworkEvent::PeerAdded(added_peer, self.peers_in_rt));

Expand Down
24 changes: 21 additions & 3 deletions ant-networking/src/event/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,17 @@ impl SwarmDriver {

let _ = self.live_connected_peers.insert(
connection_id,
(peer_id, Instant::now() + Duration::from_secs(60)),
(
peer_id,
endpoint.get_remote_address().clone(),
Instant::now() + Duration::from_secs(60),
),
);

if let Some(bootstrap_cache) = self.bootstrap_cache.as_mut() {
bootstrap_cache.update_addr_status(endpoint.get_remote_address(), true);
}

self.insert_latest_established_connection_ids(
connection_id,
endpoint.get_remote_address(),
Expand Down Expand Up @@ -406,7 +415,7 @@ impl SwarmDriver {
} => {
event_string = "OutgoingConnErr";
warn!("OutgoingConnectionError to {failed_peer_id:?} on {connection_id:?} - {error:?}");
let _ = self.live_connected_peers.remove(&connection_id);
let connection_details = self.live_connected_peers.remove(&connection_id);
self.record_connection_metrics();

// we need to decide if this was a critical error and the peer should be removed from the routing table
Expand Down Expand Up @@ -506,6 +515,15 @@ impl SwarmDriver {
}
};

// Just track failures during outgoing connection with `failed_peer_id` inside the bootstrap cache.
// OutgoingConnectionError without peer_id can happen when dialing multiple addresses of a peer.
// And similarly IncomingConnectionError can happen when a peer has multiple transports/listen addrs.
if let (Some((_, failed_addr, _)), Some(bootstrap_cache)) =
(connection_details, self.bootstrap_cache.as_mut())
{
bootstrap_cache.update_addr_status(&failed_addr, false);
}

if should_clean_peer {
warn!("Tracking issue of {failed_peer_id:?}. Clearing it out for now");

Expand Down Expand Up @@ -641,7 +659,7 @@ impl SwarmDriver {
self.last_connection_pruning_time = Instant::now();

let mut removed_conns = 0;
self.live_connected_peers.retain(|connection_id, (peer_id, timeout_time)| {
self.live_connected_peers.retain(|connection_id, (peer_id, _addr, timeout_time)| {

// skip if timeout isn't reached yet
if Instant::now() < *timeout_time {
Expand Down
1 change: 1 addition & 0 deletions ant-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ upnp = ["ant-networking/upnp"]
websockets = ["ant-networking/websockets"]

[dependencies]
ant-bootstrap = { path = "../ant-bootstrap", version = "0.1.0" }
ant-build-info = { path = "../ant-build-info", version = "0.1.19" }
ant-evm = { path = "../ant-evm", version = "0.1.4" }
ant-logging = { path = "../ant-logging", version = "0.2.40" }
Expand Down
1 change: 0 additions & 1 deletion ant-node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ impl NodeBuilder {
network_builder.listen_addr(self.addr);
#[cfg(feature = "open-metrics")]
network_builder.metrics_server_port(self.metrics_server_port);
network_builder.initial_peers(self.initial_peers.clone());
network_builder.is_behind_home_network(self.is_behind_home_network);

#[cfg(feature = "upnp")]
Expand Down

0 comments on commit 772ef1d

Please sign in to comment.