Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve upgradability #2445

Merged
merged 2 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions sn_networking/src/event/request_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ impl SwarmDriver {
self.record_metrics(Marker::FlaggedAsBadNode {
flagged_by: &detected_by,
});

// TODO: shall we terminate self after received such notifications
// from the majority close_group nodes around us?
} else {
error!("Received a bad_peer notification from {detected_by:?}, targeting {bad_peer:?}, which is not us.");
}
Expand Down
28 changes: 0 additions & 28 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,6 @@ impl Network {
self.send_req_ignore_reply(request, *peer_id);
}

filter_out_bad_nodes(&mut all_costs, record_address);

get_fees_from_store_cost_responses(all_costs)
}

Expand Down Expand Up @@ -1189,32 +1187,6 @@ fn get_fees_from_store_cost_responses(
Ok((payee_id, payee.1, payee.2))
}

/// According to the bad_nodes list collected via quotes,
/// candidate that received majority votes from others shall be ignored.
fn filter_out_bad_nodes(
all_costs: &mut Vec<(NetworkAddress, RewardsAddress, PaymentQuote)>,
record_address: NetworkAddress,
) {
let mut bad_node_votes: BTreeMap<NetworkAddress, usize> = BTreeMap::new();
for (peer_addr, _reward_addr, quote) in all_costs.iter() {
let bad_nodes: Vec<NetworkAddress> = match rmp_serde::from_slice(&quote.bad_nodes) {
Ok(bad_nodes) => bad_nodes,
Err(err) => {
error!("For record {record_address:?}, failed to recover bad_nodes from quote of {peer_addr:?} with error {err:?}");
continue;
}
};
for bad_node in bad_nodes {
let entry = bad_node_votes.entry(bad_node).or_default();
*entry += 1;
}
}
all_costs.retain(|(peer_addr, _, _)| {
let entry = bad_node_votes.entry(peer_addr.clone()).or_default();
*entry < close_group_majority()
});
}

/// Get the value of the provided Quorum
pub fn get_quorum_value(quorum: &Quorum) -> usize {
match quorum {
Expand Down
151 changes: 2 additions & 149 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use sn_evm::{AttoTokens, RewardsAddress};
#[cfg(feature = "open-metrics")]
use sn_networking::MetricsRegistries;
use sn_networking::{
close_group_majority, Instant, Network, NetworkBuilder, NetworkError, NetworkEvent, NodeIssue,
SwarmDriver,
Instant, Network, NetworkBuilder, NetworkError, NetworkEvent, NodeIssue, SwarmDriver,
};
use sn_protocol::{
error::Error as ProtocolError,
Expand All @@ -36,21 +35,14 @@ use std::{
},
time::Duration,
};
use tokio::{
sync::mpsc::Receiver,
task::{spawn, JoinHandle},
};
use tokio::{sync::mpsc::Receiver, task::spawn};

use sn_evm::EvmNetwork;

/// Interval to trigger replication of all records to all peers.
/// This is the max time it should take. Minimum interval at any node will be half this
pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180;

/// Interval to trigger bad node detection.
/// This is the max time it should take. Minimum interval at any node will be half this
const PERIODIC_BAD_NODE_DETECTION_INTERVAL_MAX_S: u64 = 600;

/// Max number of attempts that chunk proof verification will be carried out against certain target,
/// before classifying peer as a bad peer.
const MAX_CHUNK_PROOF_VERIFY_ATTEMPTS: usize = 3;
Expand Down Expand Up @@ -256,19 +248,6 @@ impl Node {
let mut replication_interval = tokio::time::interval(replication_interval_time);
let _ = replication_interval.tick().await; // first tick completes immediately

// use a random timeout to ensure not sync when transmit messages.
let bad_nodes_check_interval: u64 = rng.gen_range(
PERIODIC_BAD_NODE_DETECTION_INTERVAL_MAX_S / 2
..PERIODIC_BAD_NODE_DETECTION_INTERVAL_MAX_S,
);
let bad_nodes_check_time = Duration::from_secs(bad_nodes_check_interval);
debug!("BadNodesCheck interval set to {bad_nodes_check_time:?}");

let mut bad_nodes_check_interval = tokio::time::interval(bad_nodes_check_time);
let _ = bad_nodes_check_interval.tick().await; // first tick completes immediately

let mut rolling_index = 0;

let mut uptime_metrics_update_interval =
tokio::time::interval(UPTIME_METRICS_UPDATE_INTERVAL);
let _ = uptime_metrics_update_interval.tick().await; // first tick completes immediately
Expand Down Expand Up @@ -310,24 +289,6 @@ impl Node {
trace!("Periodic replication took {:?}", start.elapsed());
});
}
// runs every bad_nodes_check_time time
_ = bad_nodes_check_interval.tick() => {
let start = Instant::now();
debug!("Periodic bad_nodes check triggered");
let network = self.network().clone();
self.record_metrics(Marker::IntervalBadNodesCheckTriggered);

let _handle = spawn(async move {
Self::try_bad_nodes_check(network, rolling_index).await;
trace!("Periodic bad_nodes check took {:?}", start.elapsed());
});

if rolling_index == 511 {
rolling_index = 0;
} else {
rolling_index += 1;
}
}
_ = uptime_metrics_update_interval.tick() => {
#[cfg(feature = "open-metrics")]
if let Some(metrics_recorder) = self.metrics_recorder() {
Expand Down Expand Up @@ -524,58 +485,6 @@ impl Node {
);
}

// Query close_group peers to the target to verifify whether the target is bad_node
// Returns true when it is a bad_node, otherwise false
async fn close_nodes_shunning_peer(network: &Network, peer_id: PeerId) -> bool {
// using `client` to exclude self
let closest_peers = match network
.client_get_all_close_peers_in_range_or_close_group(&NetworkAddress::from_peer(peer_id))
.await
{
Ok(peers) => peers,
Err(err) => {
error!("Failed to finding closest_peers to {peer_id:?} client_get_closest_peers errored: {err:?}");
return false;
}
};

// Query the peer status from the close_group to the peer,
// raise alert as long as getting alerts from majority(3) of the close_group.
let req = Request::Query(Query::CheckNodeInProblem(NetworkAddress::from_peer(
peer_id,
)));
let mut handles = Vec::new();
for peer in closest_peers {
let req_copy = req.clone();
let network_copy = network.clone();
let handle: JoinHandle<bool> = spawn(async move {
debug!("getting node_status of {peer_id:?} from {peer:?}");
if let Ok(resp) = network_copy.send_request(req_copy, peer).await {
match resp {
Response::Query(QueryResponse::CheckNodeInProblem {
is_in_trouble,
..
}) => is_in_trouble,
other => {
error!("Cannot get node status of {peer_id:?} from node {peer:?}, with response {other:?}");
false
}
}
} else {
false
}
});
handles.push(handle);
}
let results: Vec<_> = futures::future::join_all(handles).await;

results
.iter()
.filter(|r| *r.as_ref().unwrap_or(&false))
.count()
>= close_group_majority()
}

// Handle the response that was not awaited at the call site
fn handle_response(&self, response: Response) -> Result<()> {
match response {
Expand Down Expand Up @@ -711,62 +620,6 @@ impl Node {
};
Response::Query(resp)
}

async fn try_bad_nodes_check(network: Network, rolling_index: usize) {
if let Ok(kbuckets) = network.get_kbuckets().await {
let total_peers: usize = kbuckets.values().map(|peers| peers.len()).sum();
if total_peers > 100 {
// The `rolling_index` is rotating among 0-511,
// meanwhile the returned `kbuckets` only holding non-empty buckets.
// Hence using the `remainder` calculate to achieve a rolling check.
// A further `remainder of 2` is used to allow `upper or lower part`
// index within a bucket, to further reduce the concurrent queries.
let mut bucket_index = (rolling_index / 2) % kbuckets.len();
let part_index = rolling_index % 2;

for (distance, peers) in kbuckets.iter() {
if bucket_index == 0 {
let peers_to_query = if peers.len() > 10 {
let split_index = peers.len() / 2;
let (left, right) = peers.split_at(split_index);
if part_index == 0 {
left
} else {
right
}
} else {
peers
};

debug!(
"Undertake bad_nodes check against bucket {distance} having {} peers, {} candidates to be queried",
peers.len(), peers_to_query.len()
);
for peer_id in peers_to_query {
let peer_id_clone = *peer_id;
let network_clone = network.clone();
let _handle = spawn(async move {
let is_bad =
Self::close_nodes_shunning_peer(&network_clone, peer_id_clone)
.await;
if is_bad {
network_clone.record_node_issues(
peer_id_clone,
NodeIssue::CloseNodesShunning,
);
}
});
}
break;
} else {
bucket_index = bucket_index.saturating_sub(1);
}
}
} else {
debug!("Skip bad_nodes check as not having too many nodes in RT");
}
}
}
}

async fn chunk_proof_verify_peer(network: &Network, peer_id: PeerId, key: &NetworkAddress) -> bool {
Expand Down
Loading
Loading