diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 305f4e253834d6..f65090b160171f 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -281,8 +281,7 @@ impl CrdsGossipPull { ) { requests.into_iter().for_each(|(caller, _)| { let key = caller.label().pubkey(); - let old = crds.insert(caller, now); - if let Some(val) = old.ok().and_then(|opt| opt) { + if let Ok(Some(val)) = crds.insert(caller, now) { self.purged_values .push_back((val.value_hash, val.local_timestamp)); } diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index 4be6f0f8772edf..fbcd012233a1a3 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -19,7 +19,7 @@ use crate::{ use bincode::serialized_size; use indexmap::map::IndexMap; use itertools::Itertools; -use rand::{self, seq::SliceRandom, thread_rng, RngCore}; +use rand::{seq::SliceRandom, Rng}; use solana_runtime::bloom::Bloom; use solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::timestamp}; use std::{ @@ -36,6 +36,8 @@ pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 30000; pub const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500; pub const CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT: f64 = 0.15; pub const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 2; +// Do not push to peers which have not been updated for this long. +const PUSH_ACTIVE_TIMEOUT_MS: u64 = 60_000; // 10 minutes const MAX_PUSHED_TO_TIMEOUT_MS: u64 = 10 * 60 * 1000; @@ -126,7 +128,7 @@ impl CrdsGossipPush { .collect(); let mut seed = [0; 32]; - seed[0..8].copy_from_slice(&thread_rng().next_u64().to_le_bytes()); + rand::thread_rng().fill(&mut seed[..]); let shuffle = weighted_shuffle( staked_peers.iter().map(|(_, stake)| *stake).collect_vec(), seed, @@ -302,6 +304,7 @@ impl CrdsGossipPush { network_size: usize, ratio: usize, ) { + let mut rng = rand::thread_rng(); let need = Self::compute_need(self.num_active, self.active_set.len(), ratio); let mut new_items = HashMap::new(); @@ -317,7 +320,7 @@ impl CrdsGossipPush { } let mut seed = [0; 32]; - seed[0..8].copy_from_slice(&thread_rng().next_u64().to_le_bytes()); + rng.fill(&mut seed[..]); let mut shuffle = weighted_shuffle( options.iter().map(|weighted| weighted.0).collect_vec(), seed, @@ -343,7 +346,7 @@ impl CrdsGossipPush { } } let mut keys: Vec = self.active_set.keys().cloned().collect(); - keys.shuffle(&mut rand::thread_rng()); + keys.shuffle(&mut rng); let num = keys.len() / ratio; for k in &keys[..num] { self.active_set.swap_remove(k); @@ -361,11 +364,26 @@ impl CrdsGossipPush { stakes: &HashMap, gossip_validators: Option<&HashSet>, ) -> Vec<(f32, &'a ContactInfo)> { + let now = timestamp(); + let mut rng = rand::thread_rng(); + let max_weight = u16::MAX as f32 - 1.0; + let active_cutoff = now.saturating_sub(PUSH_ACTIVE_TIMEOUT_MS); crds.table .values() - .filter(|v| v.value.contact_info().is_some()) - .map(|v| (v.value.contact_info().unwrap(), v)) - .filter(|(info, _)| { + .filter_map(|value| { + let info = value.value.contact_info()?; + // Stop pushing to nodes which have not been active recently. + if value.local_timestamp < active_cutoff { + // In order to mitigate eclipse attack, for staked nodes + // continue retrying periodically. + let stake = stakes.get(&info.id).unwrap_or(&0); + if *stake == 0 || rng.gen_ratio(7, 8) { + return None; + } + } + Some(info) + }) + .filter(|info| { info.id != *self_id && ContactInfo::is_valid_address(&info.gossip) && self_shred_version == info.shred_version @@ -373,10 +391,9 @@ impl CrdsGossipPush { gossip_validators.contains(&info.id) }) }) - .map(|(info, _value)| { - let max_weight = f32::from(u16::max_value()) - 1.0; + .map(|info| { let last_pushed_to: u64 = *self.last_pushed_to.get(&info.id).unwrap_or(&0); - let since = ((timestamp() - last_pushed_to) / 1024) as u32; + let since = ((now - last_pushed_to) / 1024) as u32; let stake = get_stake(&info.id, stakes); let weight = get_weight(max_weight, since, stake); (weight, info) @@ -556,6 +573,7 @@ mod test { #[test] fn test_refresh_active_set() { solana_logger::setup(); + let now = timestamp(); let mut crds = Crds::default(); let mut push = CrdsGossipPush::default(); let value1 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( @@ -563,7 +581,7 @@ mod test { 0, ))); - assert_eq!(crds.insert(value1.clone(), 0), Ok(None)); + assert_eq!(crds.insert(value1.clone(), now), Ok(None)); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); assert!(push.active_set.get(&value1.label().pubkey()).is_some()); @@ -572,7 +590,7 @@ mod test { 0, ))); assert!(push.active_set.get(&value2.label().pubkey()).is_none()); - assert_eq!(crds.insert(value2.clone(), 0), Ok(None)); + assert_eq!(crds.insert(value2.clone(), now), Ok(None)); for _ in 0..30 { push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); if push.active_set.get(&value2.label().pubkey()).is_some() { @@ -585,7 +603,7 @@ mod test { let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo( ContactInfo::new_localhost(&Pubkey::new_rand(), 0), )); - assert_eq!(crds.insert(value2.clone(), 0), Ok(None)); + assert_eq!(crds.insert(value2.clone(), now), Ok(None)); } push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); assert_eq!(push.active_set.len(), push.num_active); @@ -619,6 +637,7 @@ mod test { #[test] fn test_no_pushes_to_from_different_shred_versions() { + let now = timestamp(); let mut crds = Crds::default(); let stakes = HashMap::new(); let node = CrdsGossipPush::default(); @@ -650,10 +669,10 @@ mod test { ..ContactInfo::default() })); - crds.insert(me.clone(), 0).unwrap(); - crds.insert(spy.clone(), 0).unwrap(); - crds.insert(node_123.clone(), 0).unwrap(); - crds.insert(node_456, 0).unwrap(); + crds.insert(me.clone(), now).unwrap(); + crds.insert(spy.clone(), now).unwrap(); + crds.insert(node_123.clone(), now).unwrap(); + crds.insert(node_456, now).unwrap(); // shred version 123 should ignore nodes with versions 0 and 456 let options = node @@ -676,6 +695,7 @@ mod test { #[test] fn test_pushes_only_to_allowed() { + let now = timestamp(); let mut crds = Crds::default(); let stakes = HashMap::new(); let node = CrdsGossipPush::default(); @@ -693,7 +713,7 @@ mod test { })); crds.insert(me.clone(), 0).unwrap(); - crds.insert(node_123.clone(), 0).unwrap(); + crds.insert(node_123.clone(), now).unwrap(); // Unknown pubkey in gossip_validators -- will push to nobody let mut gossip_validators = HashSet::new(); @@ -734,13 +754,14 @@ mod test { #[test] fn test_new_push_messages() { + let now = timestamp(); let mut crds = Crds::default(); let mut push = CrdsGossipPush::default(); let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::new_rand(), 0, ))); - assert_eq!(crds.insert(peer.clone(), 0), Ok(None)); + assert_eq!(crds.insert(peer.clone(), now), Ok(None)); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( @@ -758,24 +779,25 @@ mod test { } #[test] fn test_personalized_push_messages() { + let now = timestamp(); let mut crds = Crds::default(); let mut push = CrdsGossipPush::default(); let peer_1 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::new_rand(), 0, ))); - assert_eq!(crds.insert(peer_1.clone(), 0), Ok(None)); + assert_eq!(crds.insert(peer_1.clone(), now), Ok(None)); let peer_2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::new_rand(), 0, ))); - assert_eq!(crds.insert(peer_2.clone(), 0), Ok(None)); + assert_eq!(crds.insert(peer_2.clone(), now), Ok(None)); let peer_3 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::new_rand(), - 0, + now, ))); assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), peer_3.clone(), 0), + push.process_push_message(&mut crds, &Pubkey::default(), peer_3.clone(), now), Ok(None) ); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); @@ -789,7 +811,7 @@ mod test { expected.insert(peer_1.pubkey(), vec![new_msg.clone()]); expected.insert(peer_2.pubkey(), vec![new_msg]); assert_eq!(push.active_set.len(), 3); - assert_eq!(push.new_push_messages(&crds, 0), expected); + assert_eq!(push.new_push_messages(&crds, now), expected); } #[test] fn test_process_prune() {