Skip to content

Commit

Permalink
filters out inactive nodes from push options
Browse files Browse the repository at this point in the history
solana-labs#12620
patched the DDOS issue with nodes which go offline:
solana-labs#12409

However, offline nodes still see (much lesser) traffic spike, likely
because no origins are pruned from their bloom filter in active set:
https://github.com/solana-labs/solana/blob/aaf3790d8/core/src/crds_gossip_push.rs#L276-L286
and so multiple nodes push redundant duplicate messages to them
simultaneously:
https://github.com/solana-labs/solana/blob/aaf3790d8/core/src/crds_gossip_push.rs#L254-L255

This commit will filter out inactive peers from potential push targets
entirely. To mitigate eclipse attacks, staked nodes are retried
periodically.
  • Loading branch information
behzadnouri committed Oct 5, 2020
1 parent 0571882 commit e243757
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 26 deletions.
3 changes: 1 addition & 2 deletions core/src/crds_gossip_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,7 @@ impl CrdsGossipPull {
) {
requests.into_iter().for_each(|(caller, _)| {
let key = caller.label().pubkey();
let old = crds.insert(caller, now);
if let Some(val) = old.ok().and_then(|opt| opt) {
if let Ok(Some(val)) = crds.insert(caller, now) {
self.purged_values
.push_back((val.value_hash, val.local_timestamp));
}
Expand Down
70 changes: 46 additions & 24 deletions core/src/crds_gossip_push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
use bincode::serialized_size;
use indexmap::map::IndexMap;
use itertools::Itertools;
use rand::{self, seq::SliceRandom, thread_rng, RngCore};
use rand::{seq::SliceRandom, Rng};
use solana_runtime::bloom::Bloom;
use solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::timestamp};
use std::{
Expand All @@ -36,6 +36,8 @@ pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 30000;
pub const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500;
pub const CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT: f64 = 0.15;
pub const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 2;
// Do not push to peers which have not been updated for this long.
const PUSH_ACTIVE_TIMEOUT_MS: u64 = 60_000;

// 10 minutes
const MAX_PUSHED_TO_TIMEOUT_MS: u64 = 10 * 60 * 1000;
Expand Down Expand Up @@ -126,7 +128,7 @@ impl CrdsGossipPush {
.collect();

let mut seed = [0; 32];
seed[0..8].copy_from_slice(&thread_rng().next_u64().to_le_bytes());
rand::thread_rng().fill(&mut seed[..]);
let shuffle = weighted_shuffle(
staked_peers.iter().map(|(_, stake)| *stake).collect_vec(),
seed,
Expand Down Expand Up @@ -302,6 +304,7 @@ impl CrdsGossipPush {
network_size: usize,
ratio: usize,
) {
let mut rng = rand::thread_rng();
let need = Self::compute_need(self.num_active, self.active_set.len(), ratio);
let mut new_items = HashMap::new();

Expand All @@ -317,7 +320,7 @@ impl CrdsGossipPush {
}

let mut seed = [0; 32];
seed[0..8].copy_from_slice(&thread_rng().next_u64().to_le_bytes());
rng.fill(&mut seed[..]);
let mut shuffle = weighted_shuffle(
options.iter().map(|weighted| weighted.0).collect_vec(),
seed,
Expand All @@ -343,7 +346,7 @@ impl CrdsGossipPush {
}
}
let mut keys: Vec<Pubkey> = self.active_set.keys().cloned().collect();
keys.shuffle(&mut rand::thread_rng());
keys.shuffle(&mut rng);
let num = keys.len() / ratio;
for k in &keys[..num] {
self.active_set.swap_remove(k);
Expand All @@ -361,22 +364,36 @@ impl CrdsGossipPush {
stakes: &HashMap<Pubkey, u64>,
gossip_validators: Option<&HashSet<Pubkey>>,
) -> Vec<(f32, &'a ContactInfo)> {
let now = timestamp();
let mut rng = rand::thread_rng();
let max_weight = u16::MAX as f32 - 1.0;
let active_cutoff = now.saturating_sub(PUSH_ACTIVE_TIMEOUT_MS);
crds.table
.values()
.filter(|v| v.value.contact_info().is_some())
.map(|v| (v.value.contact_info().unwrap(), v))
.filter(|(info, _)| {
.filter_map(|value| {
let info = value.value.contact_info()?;
// Stop pushing to nodes which have not been active recently.
if value.local_timestamp < active_cutoff {
// In order to mitigate eclipse attack, for staked nodes
// continue retrying periodically.
let stake = stakes.get(&info.id).unwrap_or(&0);
if *stake == 0 || rng.gen_ratio(7, 8) {
return None;
}
}
Some(info)
})
.filter(|info| {
info.id != *self_id
&& ContactInfo::is_valid_address(&info.gossip)
&& self_shred_version == info.shred_version
&& gossip_validators.map_or(true, |gossip_validators| {
gossip_validators.contains(&info.id)
})
})
.map(|(info, _value)| {
let max_weight = f32::from(u16::max_value()) - 1.0;
.map(|info| {
let last_pushed_to: u64 = *self.last_pushed_to.get(&info.id).unwrap_or(&0);
let since = ((timestamp() - last_pushed_to) / 1024) as u32;
let since = ((now - last_pushed_to) / 1024) as u32;
let stake = get_stake(&info.id, stakes);
let weight = get_weight(max_weight, since, stake);
(weight, info)
Expand Down Expand Up @@ -556,14 +573,15 @@ mod test {
#[test]
fn test_refresh_active_set() {
solana_logger::setup();
let now = timestamp();
let mut crds = Crds::default();
let mut push = CrdsGossipPush::default();
let value1 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));

assert_eq!(crds.insert(value1.clone(), 0), Ok(None));
assert_eq!(crds.insert(value1.clone(), now), Ok(None));
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);

assert!(push.active_set.get(&value1.label().pubkey()).is_some());
Expand All @@ -572,7 +590,7 @@ mod test {
0,
)));
assert!(push.active_set.get(&value2.label().pubkey()).is_none());
assert_eq!(crds.insert(value2.clone(), 0), Ok(None));
assert_eq!(crds.insert(value2.clone(), now), Ok(None));
for _ in 0..30 {
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
if push.active_set.get(&value2.label().pubkey()).is_some() {
Expand All @@ -585,7 +603,7 @@ mod test {
let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(
ContactInfo::new_localhost(&Pubkey::new_rand(), 0),
));
assert_eq!(crds.insert(value2.clone(), 0), Ok(None));
assert_eq!(crds.insert(value2.clone(), now), Ok(None));
}
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
assert_eq!(push.active_set.len(), push.num_active);
Expand Down Expand Up @@ -619,6 +637,7 @@ mod test {

#[test]
fn test_no_pushes_to_from_different_shred_versions() {
let now = timestamp();
let mut crds = Crds::default();
let stakes = HashMap::new();
let node = CrdsGossipPush::default();
Expand Down Expand Up @@ -650,10 +669,10 @@ mod test {
..ContactInfo::default()
}));

crds.insert(me.clone(), 0).unwrap();
crds.insert(spy.clone(), 0).unwrap();
crds.insert(node_123.clone(), 0).unwrap();
crds.insert(node_456, 0).unwrap();
crds.insert(me.clone(), now).unwrap();
crds.insert(spy.clone(), now).unwrap();
crds.insert(node_123.clone(), now).unwrap();
crds.insert(node_456, now).unwrap();

// shred version 123 should ignore nodes with versions 0 and 456
let options = node
Expand All @@ -676,6 +695,7 @@ mod test {

#[test]
fn test_pushes_only_to_allowed() {
let now = timestamp();
let mut crds = Crds::default();
let stakes = HashMap::new();
let node = CrdsGossipPush::default();
Expand All @@ -693,7 +713,7 @@ mod test {
}));

crds.insert(me.clone(), 0).unwrap();
crds.insert(node_123.clone(), 0).unwrap();
crds.insert(node_123.clone(), now).unwrap();

// Unknown pubkey in gossip_validators -- will push to nobody
let mut gossip_validators = HashSet::new();
Expand Down Expand Up @@ -734,13 +754,14 @@ mod test {

#[test]
fn test_new_push_messages() {
let now = timestamp();
let mut crds = Crds::default();
let mut push = CrdsGossipPush::default();
let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
assert_eq!(crds.insert(peer.clone(), now), Ok(None));
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);

let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
Expand All @@ -758,24 +779,25 @@ mod test {
}
#[test]
fn test_personalized_push_messages() {
let now = timestamp();
let mut crds = Crds::default();
let mut push = CrdsGossipPush::default();
let peer_1 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
assert_eq!(crds.insert(peer_1.clone(), 0), Ok(None));
assert_eq!(crds.insert(peer_1.clone(), now), Ok(None));
let peer_2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
assert_eq!(crds.insert(peer_2.clone(), 0), Ok(None));
assert_eq!(crds.insert(peer_2.clone(), now), Ok(None));
let peer_3 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
now,
)));
assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), peer_3.clone(), 0),
push.process_push_message(&mut crds, &Pubkey::default(), peer_3.clone(), now),
Ok(None)
);
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
Expand All @@ -789,7 +811,7 @@ mod test {
expected.insert(peer_1.pubkey(), vec![new_msg.clone()]);
expected.insert(peer_2.pubkey(), vec![new_msg]);
assert_eq!(push.active_set.len(), 3);
assert_eq!(push.new_push_messages(&crds, 0), expected);
assert_eq!(push.new_push_messages(&crds, now), expected);
}
#[test]
fn test_process_prune() {
Expand Down

0 comments on commit e243757

Please sign in to comment.