From 2238861c1c6d5097e820f4a8400b8ea378f94d2f Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Fri, 14 Feb 2020 09:17:04 -0800 Subject: [PATCH 1/8] Update epoch slots to include all missing slots --- Cargo.lock | 13 ++++ core/Cargo.toml | 1 + core/src/cluster_info.rs | 72 +++++++++++++++++- core/src/crds_value.rs | 10 +++ core/src/repair_service.rs | 151 ++++++++++++++++++++++++++++++++++++- 5 files changed, 244 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a0e4f435bb6e48..dcc68b578b49f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -560,6 +560,17 @@ dependencies = [ "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "compression" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "num-traits 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "console" version = "0.9.2" @@ -3827,6 +3838,7 @@ dependencies = [ "bs58 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", + "compression 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "core_affinity 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam-channel 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "ed25519-dalek 1.0.0-pre.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -6072,6 +6084,7 @@ dependencies = [ "checksum codespan-reporting 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ab081a14ab8f9598ce826890fe896d0addee68c7a58ab49008369ccbb51510a8" "checksum colored 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6cdb90b60f2927f8d76139c72dbde7e10c3a2bc47c8594c9c7a66529f2687c03" "checksum combine 2.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1645a65a99c7c8d345761f4b75a6ffe5be3b3b27a93ee731fccc5050ba6be97c" +"checksum compression 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "3a82b366ae14633c67a1cbb4aa3738210a23f77d2868a0fd50faa23a956f9ec4" "checksum console 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)" = "45e0f3986890b3acbc782009e2629dfe2baa430ac091519ce3be26164a2ae6c0" "checksum constant_time_eq 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8ff012e225ce166d4422e0e78419d901719760f62ae2b7969ca6b564d1b54a9e" "checksum cookie 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "888604f00b3db336d2af898ec3c1d5d0ddf5e6d462220f2ededc33a87ac4bbd5" diff --git a/core/Cargo.toml b/core/Cargo.toml index fb0a7a92d964fe..ab41dde0f473ea 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -18,6 +18,7 @@ bincode = "1.2.1" bs58 = "0.3.0" byteorder = "1.3.2" chrono = { version = "0.4.10", features = ["serde"] } +compression = "0.1.5" core_affinity = "0.5.10" crossbeam-channel = "0.3" fs_extra = "1.1.0" diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index ee5e1ecd43d6ae..b6904cdcead7a7 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -26,6 +26,7 @@ use crate::{ weighted_shuffle::{weighted_best, weighted_shuffle}, }; use bincode::{serialize, serialized_size}; +use compression::prelude::*; use core::cmp; use itertools::Itertools; use solana_ledger::{bank_forks::BankForks, staking_utils}; @@ -307,10 +308,75 @@ impl ClusterInfo { ) } - pub fn push_epoch_slots(&mut self, id: Pubkey, root: Slot, min: Slot, slots: BTreeSet) { + pub fn compress_incomplete_slots(incomplete_slots: &BTreeSet) -> (Slot, Vec) { + if !incomplete_slots.is_empty() { + let first_slot = incomplete_slots + .iter() + .next() + .expect("expected to find at least one slot"); + let last_slot = incomplete_slots + .iter() + .last() + .expect("expected to find last slot"); + let mut uncompressed = vec![0u8; (last_slot.saturating_sub(*first_slot) + 1) as usize]; + incomplete_slots.iter().for_each(|slot| { + uncompressed[slot.saturating_sub(*first_slot) as usize] = 1; + }); + if let Ok(compressed) = uncompressed + .iter() + .cloned() + .encode(&mut GZipEncoder::new(), Action::Finish) + .collect::, _>>() + { + (*first_slot, compressed) + } else { + (0, vec![]) + } + } else { + (0, vec![]) + } + } + + pub fn decompress_incomplete_slots(first_slot: u64, compressed: &[u8]) -> BTreeSet { + let mut old_incomplete_slots: BTreeSet = BTreeSet::new(); + + if let Ok(decompressed) = compressed + .iter() + .cloned() + .decode(&mut GZipDecoder::new()) + .collect::, _>>() + { + decompressed.iter().enumerate().for_each(|(i, val)| { + if *val == 1 { + old_incomplete_slots.insert(first_slot + i as u64); + } + }) + } + + old_incomplete_slots + } + + pub fn push_epoch_slots( + &mut self, + id: Pubkey, + root: Slot, + min: Slot, + slots: BTreeSet, + incomplete_slots: &BTreeSet, + ) { + let (first_missing_slot, compressed_map) = + Self::compress_incomplete_slots(incomplete_slots); let now = timestamp(); let entry = CrdsValue::new_signed( - CrdsData::EpochSlots(EpochSlots::new(id, root, min, slots, now)), + CrdsData::EpochSlots(EpochSlots::new( + id, + root, + min, + slots, + first_missing_slot, + compressed_map, + now, + )), &self.keypair, ); self.gossip @@ -2313,6 +2379,8 @@ mod tests { peer_root, peer_lowest, BTreeSet::new(), + 0, + vec![], timestamp(), ))); let _ = cluster_info.gossip.crds.insert(value, timestamp()); diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index 06d44e35e10d15..93a5d4d11c04e4 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -67,6 +67,8 @@ pub struct EpochSlots { pub root: Slot, pub lowest: Slot, pub slots: BTreeSet, + pub first_missing: Slot, + pub stash: Vec, pub wallclock: u64, } @@ -76,6 +78,8 @@ impl EpochSlots { root: Slot, lowest: Slot, slots: BTreeSet, + first_missing: Slot, + stash: Vec, wallclock: u64, ) -> Self { Self { @@ -83,6 +87,8 @@ impl EpochSlots { root, lowest, slots, + first_missing, + stash, wallclock, } } @@ -283,6 +289,8 @@ mod test { 0, BTreeSet::new(), 0, + vec![], + 0, ))); assert_eq!(v.wallclock(), 0); let key = v.clone().epoch_slots().unwrap().from; @@ -309,6 +317,8 @@ mod test { 0, 0, btreeset, + 0, + vec![], timestamp(), ))); verify_signatures(&mut v, &keypair, &wrong_keypair); diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index d6a84996466c28..e91c11b1441d7d 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -9,11 +9,12 @@ use solana_ledger::{ bank_forks::BankForks, blockstore::{Blockstore, CompletedSlotsReceiver, SlotMeta}, }; +use solana_sdk::clock::DEFAULT_SLOTS_PER_EPOCH; use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey}; use std::{ collections::BTreeSet, net::UdpSocket, - ops::Bound::{Excluded, Unbounded}, + ops::Bound::{Excluded, Included, Unbounded}, sync::atomic::{AtomicBool, Ordering}, sync::{Arc, RwLock}, thread::sleep, @@ -85,6 +86,7 @@ impl RepairService { ) { let serve_repair = ServeRepair::new(cluster_info.clone()); let mut epoch_slots: BTreeSet = BTreeSet::new(); + let mut old_incomplete_slots: BTreeSet = BTreeSet::new(); let id = cluster_info.read().unwrap().id(); let mut current_root = 0; if let RepairStrategy::RepairAll { @@ -96,6 +98,7 @@ impl RepairService { id, blockstore, &mut epoch_slots, + &old_incomplete_slots, current_root, epoch_schedule, cluster_info, @@ -129,6 +132,7 @@ impl RepairService { lowest_slot, &mut current_root, &mut epoch_slots, + &mut old_incomplete_slots, &cluster_info, completed_slots_receiver, ); @@ -292,6 +296,7 @@ impl RepairService { id: Pubkey, blockstore: &Blockstore, slots_in_gossip: &mut BTreeSet, + old_incomplete_slots: &BTreeSet, root: Slot, epoch_schedule: &EpochSchedule, cluster_info: &RwLock, @@ -307,6 +312,7 @@ impl RepairService { root, blockstore.lowest_slot(), slots_in_gossip.clone(), + old_incomplete_slots, ); } @@ -318,6 +324,7 @@ impl RepairService { lowest_slot: Slot, prev_root: &mut Slot, slots_in_gossip: &mut BTreeSet, + old_incomplete_slots: &mut BTreeSet, cluster_info: &RwLock, completed_slots_receiver: &CompletedSlotsReceiver, ) { @@ -325,6 +332,7 @@ impl RepairService { let mut should_update = latest_known_root != *prev_root; while let Ok(completed_slots) = completed_slots_receiver.try_recv() { for slot in completed_slots { + old_incomplete_slots.remove(&slot); // If the newly completed slot > root, and the set did not contain this value // before, we should update gossip. if slot > latest_known_root { @@ -336,6 +344,12 @@ impl RepairService { if should_update { // Filter out everything <= root if latest_known_root != *prev_root { + Self::retain_old_incomplete_slots( + slots_in_gossip, + *prev_root, + latest_known_root, + old_incomplete_slots, + ); *prev_root = latest_known_root; Self::retain_slots_greater_than_root(slots_in_gossip, latest_known_root); } @@ -345,10 +359,40 @@ impl RepairService { latest_known_root, lowest_slot, slots_in_gossip.clone(), + old_incomplete_slots, ); } } + fn retain_old_incomplete_slots( + slots_in_gossip: &BTreeSet, + prev_root: Slot, + new_root: Slot, + old_incomplete_slots: &mut BTreeSet, + ) { + // Prev root and new root are not included in incomplete slot list. + (prev_root + 1..new_root).into_iter().for_each(|slot| { + if !slots_in_gossip.contains(&slot) { + old_incomplete_slots.insert(slot); + } + }); + if let Some(oldest_incomplete_slot) = old_incomplete_slots.iter().next() { + // Prune old slots + // Prune in batches to reduce overhead. Pruning starts when oldest slot is 1.5 epochs + // earlier than the new root. But, we prune all the slots that are older than 1 epoch. + // So slots in a batch of half epoch are getting pruned + if oldest_incomplete_slot + DEFAULT_SLOTS_PER_EPOCH + DEFAULT_SLOTS_PER_EPOCH / 2 + < new_root + { + let oldest_slot_to_retain = new_root.saturating_sub(DEFAULT_SLOTS_PER_EPOCH); + *old_incomplete_slots = old_incomplete_slots + .range((Included(&oldest_slot_to_retain), Unbounded)) + .cloned() + .collect(); + } + } + } + fn retain_slots_greater_than_root(slot_set: &mut BTreeSet, root: Slot) { *slot_set = slot_set .range((Excluded(&root), Unbounded)) @@ -703,6 +747,7 @@ mod test { node_info.info.clone(), )); + let mut old_incomplete_slots: BTreeSet = BTreeSet::new(); while completed_slots.len() < num_slots as usize { RepairService::update_epoch_slots( Pubkey::default(), @@ -710,6 +755,7 @@ mod test { blockstore.lowest_slot(), &mut root.clone(), &mut completed_slots, + &mut old_incomplete_slots, &cluster_info, &completed_slots_receiver, ); @@ -728,6 +774,7 @@ mod test { 0, &mut 0, &mut completed_slots, + &mut old_incomplete_slots, &cluster_info, &completed_slots_receiver, ); @@ -751,6 +798,7 @@ mod test { let my_pubkey = Pubkey::new_rand(); let (completed_slots_sender, completed_slots_receiver) = channel(); + let mut old_incomplete_slots: BTreeSet = BTreeSet::new(); // Send a new slot before the root is updated let newly_completed_slot = 63; completed_slots_sender @@ -762,6 +810,7 @@ mod test { 0, &mut current_root.clone(), &mut completed_slots, + &mut old_incomplete_slots, &cluster_info, &completed_slots_receiver, ); @@ -793,6 +842,7 @@ mod test { 0, &mut current_root, &mut completed_slots, + &mut old_incomplete_slots, &cluster_info, &completed_slots_receiver, ); @@ -812,6 +862,7 @@ mod test { 0, &mut current_root, &mut completed_slots, + &mut old_incomplete_slots, &cluster_info, &completed_slots_receiver, ); @@ -831,4 +882,102 @@ mod test { .slots .contains(&newly_completed_slot)); } + + #[test] + fn test_retain_old_incomplete_slots() { + let mut old_incomplete_slots: BTreeSet = BTreeSet::new(); + let mut slots_in_gossip: BTreeSet = BTreeSet::new(); + + // When slots_in_gossip is empty. All slots between prev and new root + // should be incomplete + RepairService::retain_old_incomplete_slots( + &slots_in_gossip, + 10, + 100, + &mut old_incomplete_slots, + ); + + assert!(!old_incomplete_slots.contains(&10)); + assert!(!old_incomplete_slots.contains(&100)); + (11..100u64) + .into_iter() + .for_each(|i| assert!(old_incomplete_slots.contains(&i))); + + // Insert some slots in slots_in_gossip. + slots_in_gossip.insert(101); + slots_in_gossip.insert(102); + slots_in_gossip.insert(104); + slots_in_gossip.insert(105); + + RepairService::retain_old_incomplete_slots( + &slots_in_gossip, + 100, + 105, + &mut old_incomplete_slots, + ); + + assert!(!old_incomplete_slots.contains(&10)); + assert!(!old_incomplete_slots.contains(&100)); + (11..100u64) + .into_iter() + .for_each(|i| assert!(old_incomplete_slots.contains(&i))); + assert!(!old_incomplete_slots.contains(&101)); + assert!(!old_incomplete_slots.contains(&102)); + assert!(old_incomplete_slots.contains(&103)); + assert!(!old_incomplete_slots.contains(&104)); + assert!(!old_incomplete_slots.contains(&105)); + + // Insert some slots that are 1 epoch away. It should not trigger + // pruning, as we wait 1.5 epoch. + slots_in_gossip.insert(105 + DEFAULT_SLOTS_PER_EPOCH); + + RepairService::retain_old_incomplete_slots( + &slots_in_gossip, + 100 + DEFAULT_SLOTS_PER_EPOCH, + 105 + DEFAULT_SLOTS_PER_EPOCH, + &mut old_incomplete_slots, + ); + + assert!(!old_incomplete_slots.contains(&10)); + assert!(!old_incomplete_slots.contains(&100)); + (11..100u64) + .into_iter() + .for_each(|i| assert!(old_incomplete_slots.contains(&i))); + assert!(!old_incomplete_slots.contains(&101)); + assert!(!old_incomplete_slots.contains(&102)); + assert!(old_incomplete_slots.contains(&103)); + assert!(!old_incomplete_slots.contains(&104)); + assert!(!old_incomplete_slots.contains(&105)); + assert!(old_incomplete_slots.contains(&(101 + DEFAULT_SLOTS_PER_EPOCH))); + assert!(old_incomplete_slots.contains(&(102 + DEFAULT_SLOTS_PER_EPOCH))); + assert!(old_incomplete_slots.contains(&(103 + DEFAULT_SLOTS_PER_EPOCH))); + assert!(old_incomplete_slots.contains(&(104 + DEFAULT_SLOTS_PER_EPOCH))); + + // Insert some slots that are 1.5 epoch away. It should trigger + // pruning, as we wait 1.5 epoch. + let one_and_half_epoch_slots = DEFAULT_SLOTS_PER_EPOCH + DEFAULT_SLOTS_PER_EPOCH / 2; + slots_in_gossip.insert(100 + one_and_half_epoch_slots); + + RepairService::retain_old_incomplete_slots( + &slots_in_gossip, + 99 + one_and_half_epoch_slots, + 100 + one_and_half_epoch_slots, + &mut old_incomplete_slots, + ); + + assert!(!old_incomplete_slots.contains(&10)); + assert!(!old_incomplete_slots.contains(&100)); + (11..100u64) + .into_iter() + .for_each(|i| assert!(!old_incomplete_slots.contains(&i))); + assert!(!old_incomplete_slots.contains(&101)); + assert!(!old_incomplete_slots.contains(&102)); + assert!(!old_incomplete_slots.contains(&103)); + assert!(!old_incomplete_slots.contains(&104)); + assert!(!old_incomplete_slots.contains(&105)); + assert!(old_incomplete_slots.contains(&(101 + DEFAULT_SLOTS_PER_EPOCH))); + assert!(old_incomplete_slots.contains(&(102 + DEFAULT_SLOTS_PER_EPOCH))); + assert!(old_incomplete_slots.contains(&(103 + DEFAULT_SLOTS_PER_EPOCH))); + assert!(old_incomplete_slots.contains(&(104 + DEFAULT_SLOTS_PER_EPOCH))); + } } From a3ed3ef620e45ec5291faa976f624c140b0e386b Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Fri, 14 Feb 2020 10:05:37 -0800 Subject: [PATCH 2/8] fix clippy --- core/src/repair_service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index e91c11b1441d7d..0807434e695cbd 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -371,7 +371,7 @@ impl RepairService { old_incomplete_slots: &mut BTreeSet, ) { // Prev root and new root are not included in incomplete slot list. - (prev_root + 1..new_root).into_iter().for_each(|slot| { + (prev_root + 1..new_root).for_each(|slot| { if !slots_in_gossip.contains(&slot) { old_incomplete_slots.insert(slot); } From 499722e36e16aee9d0a907d46a440b672eb07a0d Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Fri, 14 Feb 2020 10:14:58 -0800 Subject: [PATCH 3/8] fix tests --- core/src/cluster_info.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index b6904cdcead7a7..cda0ec61b48a1d 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -2198,6 +2198,8 @@ mod tests { root: 0, lowest: 0, slots: btree_slots, + first_missing: 0, + stash: vec![], wallclock: 0, })); test_split_messages(value); @@ -2215,6 +2217,8 @@ mod tests { root: 0, lowest: 0, slots: BTreeSet::new(), + first_missing: 0, + stash: vec![], wallclock: 0, })); @@ -2233,6 +2237,8 @@ mod tests { root: 0, lowest: 0, slots, + first_missing: 0, + stash: vec![], wallclock: 0, }); i += 1; From ceab564e8eba3fa8673a1de72c07d61f3cc839b4 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Fri, 14 Feb 2020 10:28:19 -0800 Subject: [PATCH 4/8] new test for compress/decompress --- core/src/cluster_info.rs | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index cda0ec61b48a1d..4fbb539ddc4a71 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -2448,4 +2448,32 @@ mod tests { serialized_size(&protocol).expect("unable to serialize gossip protocol") as usize; PACKET_DATA_SIZE - (protocol_size - filter_size) } + + #[test] + fn test_compress_incomplete_slots() { + let mut incomplete_slots: BTreeSet = BTreeSet::new(); + + assert_eq!( + (0, vec![]), + ClusterInfo::compress_incomplete_slots(&incomplete_slots) + ); + + incomplete_slots.insert(100); + let (first, compressed) = ClusterInfo::compress_incomplete_slots(&incomplete_slots); + assert_eq!(100, first); + let decompressed = ClusterInfo::decompress_incomplete_slots(first, &compressed); + assert_eq!(incomplete_slots, decompressed); + + incomplete_slots.insert(104); + let (first, compressed) = ClusterInfo::compress_incomplete_slots(&incomplete_slots); + assert_eq!(100, first); + let decompressed = ClusterInfo::decompress_incomplete_slots(first, &compressed); + assert_eq!(incomplete_slots, decompressed); + + incomplete_slots.insert(80); + let (first, compressed) = ClusterInfo::compress_incomplete_slots(&incomplete_slots); + assert_eq!(80, first); + let decompressed = ClusterInfo::decompress_incomplete_slots(first, &compressed); + assert_eq!(incomplete_slots, decompressed); + } } From 8180d073fe8859ee905aafa3687a5ae603ee7ef9 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Fri, 14 Feb 2020 12:31:59 -0800 Subject: [PATCH 5/8] address review comments --- core/src/cluster_info.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 4fbb539ddc4a71..988e4b1e637d1a 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -316,7 +316,7 @@ impl ClusterInfo { .expect("expected to find at least one slot"); let last_slot = incomplete_slots .iter() - .last() + .next_back() .expect("expected to find last slot"); let mut uncompressed = vec![0u8; (last_slot.saturating_sub(*first_slot) + 1) as usize]; incomplete_slots.iter().for_each(|slot| { From 8dce5a6cc5b34f8eea49528ae672321b56eecf26 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Mon, 17 Feb 2020 10:10:45 -0800 Subject: [PATCH 6/8] limit cache based on size, instead of comparing roots --- core/src/repair_service.rs | 424 ++++++++++++++++--------------------- 1 file changed, 185 insertions(+), 239 deletions(-) diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 0807434e695cbd..5d54daa38be60d 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -14,7 +14,7 @@ use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey}; use std::{ collections::BTreeSet, net::UdpSocket, - ops::Bound::{Excluded, Included, Unbounded}, + ops::Bound::{Included, Unbounded}, sync::atomic::{AtomicBool, Ordering}, sync::{Arc, RwLock}, thread::sleep, @@ -26,6 +26,9 @@ pub const MAX_REPAIR_LENGTH: usize = 512; pub const REPAIR_MS: u64 = 100; pub const MAX_ORPHANS: usize = 5; +const MAX_COMPLETED_SLOT_CACHE_LEN: usize = 256; +const COMPLETED_SLOT_CACHE_FLUSH_TRIGGER: usize = 512; + pub enum RepairStrategy { RepairRange(RepairSlotRange), RepairAll { @@ -88,12 +91,11 @@ impl RepairService { let mut epoch_slots: BTreeSet = BTreeSet::new(); let mut old_incomplete_slots: BTreeSet = BTreeSet::new(); let id = cluster_info.read().unwrap().id(); - let mut current_root = 0; if let RepairStrategy::RepairAll { ref epoch_schedule, .. } = repair_strategy { - current_root = blockstore.last_root(); + let current_root = blockstore.last_root(); Self::initialize_epoch_slots( id, blockstore, @@ -130,7 +132,6 @@ impl RepairService { id, new_root, lowest_slot, - &mut current_root, &mut epoch_slots, &mut old_incomplete_slots, &cluster_info, @@ -322,70 +323,90 @@ impl RepairService { id: Pubkey, latest_known_root: Slot, lowest_slot: Slot, - prev_root: &mut Slot, - slots_in_gossip: &mut BTreeSet, - old_incomplete_slots: &mut BTreeSet, + completed_slot_cache: &mut BTreeSet, + incomplete_slot_stash: &mut BTreeSet, cluster_info: &RwLock, completed_slots_receiver: &CompletedSlotsReceiver, ) { - // If the latest known root is different, update gossip. - let mut should_update = latest_known_root != *prev_root; + let mut should_update = false; while let Ok(completed_slots) = completed_slots_receiver.try_recv() { for slot in completed_slots { - old_incomplete_slots.remove(&slot); - // If the newly completed slot > root, and the set did not contain this value - // before, we should update gossip. - if slot > latest_known_root { - should_update |= slots_in_gossip.insert(slot); + let last_slot_in_stash = incomplete_slot_stash + .iter() + .next_back() + .unwrap_or(&0) + .clone(); + let removed_from_stash = incomplete_slot_stash.remove(&slot); + // If the newly completed slot was not being tracked in stash, and is > last + // slot being tracked in stash, add it to cache. Also, update gossip + if !removed_from_stash && slot >= last_slot_in_stash { + should_update |= completed_slot_cache.insert(slot); } + // If the slot was removed from stash, update gossip + should_update |= removed_from_stash; } } if should_update { - // Filter out everything <= root - if latest_known_root != *prev_root { - Self::retain_old_incomplete_slots( - slots_in_gossip, - *prev_root, - latest_known_root, - old_incomplete_slots, + if completed_slot_cache.len() >= COMPLETED_SLOT_CACHE_FLUSH_TRIGGER { + Self::stash_old_incomplete_slots(completed_slot_cache, incomplete_slot_stash); + let lowest_completed_slot_in_cache = + completed_slot_cache.iter().next().unwrap_or(&0).clone(); + Self::prune_incomplete_slot_stash( + incomplete_slot_stash, + lowest_completed_slot_in_cache, ); - *prev_root = latest_known_root; - Self::retain_slots_greater_than_root(slots_in_gossip, latest_known_root); } cluster_info.write().unwrap().push_epoch_slots( id, latest_known_root, lowest_slot, - slots_in_gossip.clone(), - old_incomplete_slots, + completed_slot_cache.clone(), + incomplete_slot_stash, ); } } - fn retain_old_incomplete_slots( - slots_in_gossip: &BTreeSet, - prev_root: Slot, - new_root: Slot, - old_incomplete_slots: &mut BTreeSet, - ) { - // Prev root and new root are not included in incomplete slot list. - (prev_root + 1..new_root).for_each(|slot| { - if !slots_in_gossip.contains(&slot) { - old_incomplete_slots.insert(slot); + fn stash_old_incomplete_slots(cache: &mut BTreeSet, stash: &mut BTreeSet) { + if cache.len() >= MAX_COMPLETED_SLOT_CACHE_LEN + 1 { + let mut prev = cache + .iter() + .next() + .expect("Expected to find some slot") + .clone(); + cache.remove(&prev); + while cache.len() >= MAX_COMPLETED_SLOT_CACHE_LEN { + let next = cache + .iter() + .next() + .expect("Expected to find some slot") + .clone(); + cache.remove(&next); + // Prev slot and next slot are not included in incomplete slot list. + (prev + 1..next).for_each(|slot| { + stash.insert(slot); + }); + prev = next; } - }); - if let Some(oldest_incomplete_slot) = old_incomplete_slots.iter().next() { + } + } + + fn prune_incomplete_slot_stash( + stash: &mut BTreeSet, + lowest_completed_slot_in_cache: Slot, + ) { + if let Some(oldest_incomplete_slot) = stash.iter().next() { // Prune old slots // Prune in batches to reduce overhead. Pruning starts when oldest slot is 1.5 epochs // earlier than the new root. But, we prune all the slots that are older than 1 epoch. // So slots in a batch of half epoch are getting pruned if oldest_incomplete_slot + DEFAULT_SLOTS_PER_EPOCH + DEFAULT_SLOTS_PER_EPOCH / 2 - < new_root + < lowest_completed_slot_in_cache { - let oldest_slot_to_retain = new_root.saturating_sub(DEFAULT_SLOTS_PER_EPOCH); - *old_incomplete_slots = old_incomplete_slots + let oldest_slot_to_retain = + lowest_completed_slot_in_cache.saturating_sub(DEFAULT_SLOTS_PER_EPOCH); + *stash = stash .range((Included(&oldest_slot_to_retain), Unbounded)) .cloned() .collect(); @@ -393,13 +414,6 @@ impl RepairService { } } - fn retain_slots_greater_than_root(slot_set: &mut BTreeSet, root: Slot) { - *slot_set = slot_set - .range((Excluded(&root), Unbounded)) - .cloned() - .collect(); - } - pub fn join(self) -> thread::Result<()> { self.t_repair.join() } @@ -417,7 +431,6 @@ mod test { }; use solana_ledger::shred::max_ticks_per_n_shreds; use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}; - use std::sync::mpsc::channel; use std::thread::Builder; #[test] @@ -753,7 +766,6 @@ mod test { Pubkey::default(), root, blockstore.lowest_slot(), - &mut root.clone(), &mut completed_slots, &mut old_incomplete_slots, &cluster_info, @@ -772,14 +784,12 @@ mod test { Pubkey::default(), root, 0, - &mut 0, &mut completed_slots, &mut old_incomplete_slots, &cluster_info, &completed_slots_receiver, ); expected.insert(num_slots + 2); - RepairService::retain_slots_greater_than_root(&mut expected, root); assert_eq!(completed_slots, expected); writer.join().unwrap(); } @@ -787,197 +797,133 @@ mod test { } #[test] - fn test_update_epoch_slots_new_root() { - let mut current_root = 0; - - let mut completed_slots = BTreeSet::new(); - let node_info = Node::new_localhost_with_pubkey(&Pubkey::default()); - let cluster_info = RwLock::new(ClusterInfo::new_with_invalid_keypair( - node_info.info.clone(), - )); - let my_pubkey = Pubkey::new_rand(); - let (completed_slots_sender, completed_slots_receiver) = channel(); - - let mut old_incomplete_slots: BTreeSet = BTreeSet::new(); - // Send a new slot before the root is updated - let newly_completed_slot = 63; - completed_slots_sender - .send(vec![newly_completed_slot]) - .unwrap(); - RepairService::update_epoch_slots( - my_pubkey.clone(), - current_root, - 0, - &mut current_root.clone(), - &mut completed_slots, - &mut old_incomplete_slots, - &cluster_info, - &completed_slots_receiver, - ); - - // We should see epoch state update - let (my_epoch_slots_in_gossip, updated_ts) = { - let r_cluster_info = cluster_info.read().unwrap(); - - let (my_epoch_slots_in_gossip, updated_ts) = r_cluster_info - .get_epoch_state_for_node(&my_pubkey, None) - .clone() - .unwrap(); - - (my_epoch_slots_in_gossip.clone(), updated_ts) - }; - - assert_eq!(my_epoch_slots_in_gossip.root, 0); - assert_eq!(current_root, 0); - assert_eq!(my_epoch_slots_in_gossip.slots.len(), 1); - assert!(my_epoch_slots_in_gossip - .slots - .contains(&newly_completed_slot)); - - // Calling update again with no updates to either the roots or set of completed slots - // should not update gossip - RepairService::update_epoch_slots( - my_pubkey.clone(), - current_root, - 0, - &mut current_root, - &mut completed_slots, - &mut old_incomplete_slots, - &cluster_info, - &completed_slots_receiver, - ); - - assert!(cluster_info - .read() - .unwrap() - .get_epoch_state_for_node(&my_pubkey, Some(updated_ts)) - .is_none()); - - sleep(Duration::from_millis(10)); - // Updating just the root again should update gossip (simulates replay stage updating root - // after a slot has been signaled as completed) - RepairService::update_epoch_slots( - my_pubkey.clone(), - current_root + 1, - 0, - &mut current_root, - &mut completed_slots, - &mut old_incomplete_slots, - &cluster_info, - &completed_slots_receiver, - ); - - let r_cluster_info = cluster_info.read().unwrap(); - - let (my_epoch_slots_in_gossip, _) = r_cluster_info - .get_epoch_state_for_node(&my_pubkey, Some(updated_ts)) - .clone() - .unwrap(); - - // Check the root was updated correctly - assert_eq!(my_epoch_slots_in_gossip.root, 1); - assert_eq!(current_root, 1); - assert_eq!(my_epoch_slots_in_gossip.slots.len(), 1); - assert!(my_epoch_slots_in_gossip - .slots - .contains(&newly_completed_slot)); - } - - #[test] - fn test_retain_old_incomplete_slots() { - let mut old_incomplete_slots: BTreeSet = BTreeSet::new(); - let mut slots_in_gossip: BTreeSet = BTreeSet::new(); - - // When slots_in_gossip is empty. All slots between prev and new root - // should be incomplete - RepairService::retain_old_incomplete_slots( - &slots_in_gossip, - 10, - 100, - &mut old_incomplete_slots, - ); - - assert!(!old_incomplete_slots.contains(&10)); - assert!(!old_incomplete_slots.contains(&100)); - (11..100u64) + fn test_stash_old_incomplete_slots() { + let mut cache: BTreeSet = BTreeSet::new(); + let mut stash: BTreeSet = BTreeSet::new(); + + // When cache is empty. + RepairService::stash_old_incomplete_slots(&mut cache, &mut stash); + assert_eq!(stash.len(), 0); + + // Insert some slots in cache ( < MAX_COMPLETED_SLOT_CACHE_LEN + 1) + cache.insert(101); + cache.insert(102); + cache.insert(104); + cache.insert(105); + + // Not enough slots in cache. So stash should remain empty. + RepairService::stash_old_incomplete_slots(&mut cache, &mut stash); + assert_eq!(stash.len(), 0); + assert_eq!(cache.len(), 4); + + // Insert slots in cache ( = MAX_COMPLETED_SLOT_CACHE_LEN) + let mut cache: BTreeSet = BTreeSet::new(); + (0..MAX_COMPLETED_SLOT_CACHE_LEN as u64) .into_iter() - .for_each(|i| assert!(old_incomplete_slots.contains(&i))); - - // Insert some slots in slots_in_gossip. - slots_in_gossip.insert(101); - slots_in_gossip.insert(102); - slots_in_gossip.insert(104); - slots_in_gossip.insert(105); - - RepairService::retain_old_incomplete_slots( - &slots_in_gossip, - 100, - 105, - &mut old_incomplete_slots, - ); - - assert!(!old_incomplete_slots.contains(&10)); - assert!(!old_incomplete_slots.contains(&100)); - (11..100u64) + .for_each(|slot| { + cache.insert(slot); + }); + + // Not enough slots in cache. So stash should remain empty. + RepairService::stash_old_incomplete_slots(&mut cache, &mut stash); + assert_eq!(stash.len(), 0); + assert_eq!(cache.len(), MAX_COMPLETED_SLOT_CACHE_LEN); + + // Insert 1 more to cross the threshold + cache.insert(MAX_COMPLETED_SLOT_CACHE_LEN as u64); + RepairService::stash_old_incomplete_slots(&mut cache, &mut stash); + // Stash is still empty, as no missing slots + assert_eq!(stash.len(), 0); + // It removed some entries from cache + assert_eq!(cache.len(), MAX_COMPLETED_SLOT_CACHE_LEN - 1); + + // Insert more slots to create a missing slot + let mut cache: BTreeSet = BTreeSet::new(); + cache.insert(0); + (2..=MAX_COMPLETED_SLOT_CACHE_LEN as u64 + 2) .into_iter() - .for_each(|i| assert!(old_incomplete_slots.contains(&i))); - assert!(!old_incomplete_slots.contains(&101)); - assert!(!old_incomplete_slots.contains(&102)); - assert!(old_incomplete_slots.contains(&103)); - assert!(!old_incomplete_slots.contains(&104)); - assert!(!old_incomplete_slots.contains(&105)); - - // Insert some slots that are 1 epoch away. It should not trigger - // pruning, as we wait 1.5 epoch. - slots_in_gossip.insert(105 + DEFAULT_SLOTS_PER_EPOCH); - - RepairService::retain_old_incomplete_slots( - &slots_in_gossip, - 100 + DEFAULT_SLOTS_PER_EPOCH, - 105 + DEFAULT_SLOTS_PER_EPOCH, - &mut old_incomplete_slots, - ); - - assert!(!old_incomplete_slots.contains(&10)); - assert!(!old_incomplete_slots.contains(&100)); - (11..100u64) + .for_each(|slot| { + cache.insert(slot); + }); + RepairService::stash_old_incomplete_slots(&mut cache, &mut stash); + + // Stash is not empty + assert!(stash.contains(&1)); + // It removed some entries from cache + assert_eq!(cache.len(), MAX_COMPLETED_SLOT_CACHE_LEN - 1); + + // Test multiple missing slots at dispersed locations + let mut cache: BTreeSet = BTreeSet::new(); + (0..MAX_COMPLETED_SLOT_CACHE_LEN as u64 * 2) .into_iter() - .for_each(|i| assert!(old_incomplete_slots.contains(&i))); - assert!(!old_incomplete_slots.contains(&101)); - assert!(!old_incomplete_slots.contains(&102)); - assert!(old_incomplete_slots.contains(&103)); - assert!(!old_incomplete_slots.contains(&104)); - assert!(!old_incomplete_slots.contains(&105)); - assert!(old_incomplete_slots.contains(&(101 + DEFAULT_SLOTS_PER_EPOCH))); - assert!(old_incomplete_slots.contains(&(102 + DEFAULT_SLOTS_PER_EPOCH))); - assert!(old_incomplete_slots.contains(&(103 + DEFAULT_SLOTS_PER_EPOCH))); - assert!(old_incomplete_slots.contains(&(104 + DEFAULT_SLOTS_PER_EPOCH))); - - // Insert some slots that are 1.5 epoch away. It should trigger - // pruning, as we wait 1.5 epoch. - let one_and_half_epoch_slots = DEFAULT_SLOTS_PER_EPOCH + DEFAULT_SLOTS_PER_EPOCH / 2; - slots_in_gossip.insert(100 + one_and_half_epoch_slots); - - RepairService::retain_old_incomplete_slots( - &slots_in_gossip, - 99 + one_and_half_epoch_slots, - 100 + one_and_half_epoch_slots, - &mut old_incomplete_slots, - ); - - assert!(!old_incomplete_slots.contains(&10)); - assert!(!old_incomplete_slots.contains(&100)); - (11..100u64) + .for_each(|slot| { + cache.insert(slot); + }); + + cache.remove(&10); + cache.remove(&11); + + cache.remove(&28); + cache.remove(&29); + + cache.remove(&148); + cache.remove(&149); + cache.remove(&150); + cache.remove(&151); + + RepairService::stash_old_incomplete_slots(&mut cache, &mut stash); + + // Stash is not empty + assert!(stash.contains(&10)); + assert!(stash.contains(&11)); + assert!(stash.contains(&28)); + assert!(stash.contains(&29)); + assert!(stash.contains(&148)); + assert!(stash.contains(&149)); + assert!(stash.contains(&150)); + assert!(stash.contains(&151)); + + assert!(!stash.contains(&147)); + assert!(!stash.contains(&152)); + // It removed some entries from cache + assert_eq!(cache.len(), MAX_COMPLETED_SLOT_CACHE_LEN - 1); + (MAX_COMPLETED_SLOT_CACHE_LEN + 1..MAX_COMPLETED_SLOT_CACHE_LEN * 2) .into_iter() - .for_each(|i| assert!(!old_incomplete_slots.contains(&i))); - assert!(!old_incomplete_slots.contains(&101)); - assert!(!old_incomplete_slots.contains(&102)); - assert!(!old_incomplete_slots.contains(&103)); - assert!(!old_incomplete_slots.contains(&104)); - assert!(!old_incomplete_slots.contains(&105)); - assert!(old_incomplete_slots.contains(&(101 + DEFAULT_SLOTS_PER_EPOCH))); - assert!(old_incomplete_slots.contains(&(102 + DEFAULT_SLOTS_PER_EPOCH))); - assert!(old_incomplete_slots.contains(&(103 + DEFAULT_SLOTS_PER_EPOCH))); - assert!(old_incomplete_slots.contains(&(104 + DEFAULT_SLOTS_PER_EPOCH))); + .for_each(|slot| { + let slot: u64 = slot as u64; + assert!(cache.contains(&slot)); + }); + } + + #[test] + fn test_prune_incomplete_slot_stash() { + // Prune empty stash + let mut stash: BTreeSet = BTreeSet::new(); + RepairService::prune_incomplete_slot_stash(&mut stash, 0); + assert!(stash.is_empty()); + + // Prune stash with slots < DEFAULT_SLOTS_PER_EPOCH + stash.insert(0); + stash.insert(10); + stash.insert(11); + stash.insert(50); + assert_eq!(stash.len(), 4); + RepairService::prune_incomplete_slot_stash(&mut stash, 100); + assert_eq!(stash.len(), 4); + + // Prune stash with slots > DEFAULT_SLOTS_PER_EPOCH, but < 1.5 * DEFAULT_SLOTS_PER_EPOCH + stash.insert(DEFAULT_SLOTS_PER_EPOCH + 50); + assert_eq!(stash.len(), 5); + RepairService::prune_incomplete_slot_stash(&mut stash, DEFAULT_SLOTS_PER_EPOCH + 100); + assert_eq!(stash.len(), 5); + + // Prune stash with slots > 1.5 * DEFAULT_SLOTS_PER_EPOCH + stash.insert(DEFAULT_SLOTS_PER_EPOCH + DEFAULT_SLOTS_PER_EPOCH / 2); + assert_eq!(stash.len(), 6); + RepairService::prune_incomplete_slot_stash( + &mut stash, + DEFAULT_SLOTS_PER_EPOCH + DEFAULT_SLOTS_PER_EPOCH / 2 + 1, + ); + assert_eq!(stash.len(), 2); } } From 4d1d206972ff57e9854a793abb0253d5bc886e89 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Mon, 17 Feb 2020 10:14:41 -0800 Subject: [PATCH 7/8] fix clippy --- core/src/repair_service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 5d54daa38be60d..57db2593f70048 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -369,7 +369,7 @@ impl RepairService { } fn stash_old_incomplete_slots(cache: &mut BTreeSet, stash: &mut BTreeSet) { - if cache.len() >= MAX_COMPLETED_SLOT_CACHE_LEN + 1 { + if cache.len() > MAX_COMPLETED_SLOT_CACHE_LEN { let mut prev = cache .iter() .next() From 65ef911573d624dd906863a8ef9f2ab78b012579 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Mon, 17 Feb 2020 10:18:34 -0800 Subject: [PATCH 8/8] fix clippy --- core/src/repair_service.rs | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 57db2593f70048..a31f22de660184 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -331,11 +331,7 @@ impl RepairService { let mut should_update = false; while let Ok(completed_slots) = completed_slots_receiver.try_recv() { for slot in completed_slots { - let last_slot_in_stash = incomplete_slot_stash - .iter() - .next_back() - .unwrap_or(&0) - .clone(); + let last_slot_in_stash = *incomplete_slot_stash.iter().next_back().unwrap_or(&0); let removed_from_stash = incomplete_slot_stash.remove(&slot); // If the newly completed slot was not being tracked in stash, and is > last // slot being tracked in stash, add it to cache. Also, update gossip @@ -351,7 +347,7 @@ impl RepairService { if completed_slot_cache.len() >= COMPLETED_SLOT_CACHE_FLUSH_TRIGGER { Self::stash_old_incomplete_slots(completed_slot_cache, incomplete_slot_stash); let lowest_completed_slot_in_cache = - completed_slot_cache.iter().next().unwrap_or(&0).clone(); + *completed_slot_cache.iter().next().unwrap_or(&0); Self::prune_incomplete_slot_stash( incomplete_slot_stash, lowest_completed_slot_in_cache, @@ -370,18 +366,10 @@ impl RepairService { fn stash_old_incomplete_slots(cache: &mut BTreeSet, stash: &mut BTreeSet) { if cache.len() > MAX_COMPLETED_SLOT_CACHE_LEN { - let mut prev = cache - .iter() - .next() - .expect("Expected to find some slot") - .clone(); + let mut prev = *cache.iter().next().expect("Expected to find some slot"); cache.remove(&prev); while cache.len() >= MAX_COMPLETED_SLOT_CACHE_LEN { - let next = cache - .iter() - .next() - .expect("Expected to find some slot") - .clone(); + let next = *cache.iter().next().expect("Expected to find some slot"); cache.remove(&next); // Prev slot and next slot are not included in incomplete slot list. (prev + 1..next).for_each(|slot| {