From c3ee68380d044e7315d2489a52c953c634149279 Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Sun, 11 Feb 2024 05:42:53 +0000 Subject: [PATCH 1/3] replay: reload tower if set-identity during startup --- core/src/replay_stage.rs | 70 +++++++++++++++++++++++++++------------- core/src/tvu.rs | 3 ++ core/src/validator.rs | 1 + 3 files changed, 52 insertions(+), 22 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 27c30b9e52eb7b..581ef4149b29f2 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -276,6 +276,7 @@ impl PartitionInfo { } pub struct ReplayStageConfig { + pub startup_identity: Pubkey, pub vote_account: Pubkey, pub authorized_voter_keypairs: Arc>>>, pub exit: Arc, @@ -546,6 +547,7 @@ impl ReplayStage { popular_pruned_forks_receiver: PopularPrunedForksReceiver, ) -> Result { let ReplayStageConfig { + startup_identity, vote_account, authorized_voter_keypairs, exit, @@ -578,6 +580,20 @@ impl ReplayStage { let _exit = Finalizer::new(exit.clone()); let mut identity_keypair = cluster_info.keypair().clone(); let mut my_pubkey = identity_keypair.pubkey(); + if my_pubkey != startup_identity { + // set-identity was called during the startup procedure, ensure the tower is consistent + // before starting the loop. further calls to set-identity will reload the tower in the loop + tower = Self::reload_tower_with_new_identity( + &tower_storage, + &my_pubkey, + &vote_account, + &bank_forks, + ); + warn!( + "Identity changed during startup from {} to {}", + startup_identity, my_pubkey + ); + } let (mut progress, mut heaviest_subtree_fork_choice) = Self::initialize_progress_and_fork_choice_with_locked_bank_forks( &bank_forks, @@ -983,28 +999,12 @@ impl ReplayStage { my_pubkey = identity_keypair.pubkey(); // Load the new identity's tower - tower = Tower::restore(tower_storage.as_ref(), &my_pubkey) - .and_then(|restored_tower| { - let root_bank = bank_forks.read().unwrap().root_bank(); - let slot_history = root_bank.get_slot_history(); - restored_tower.adjust_lockouts_after_replay( - root_bank.slot(), - &slot_history, - ) - }) - .unwrap_or_else(|err| { - if err.is_file_missing() { - Tower::new_from_bankforks( - &bank_forks.read().unwrap(), - &my_pubkey, - &vote_account, - ) - } else { - error!("Failed to load tower for {}: {}", my_pubkey, err); - std::process::exit(1); - } - }); - + tower = Self::reload_tower_with_new_identity( + &tower_storage, + &my_pubkey, + &vote_account, + &bank_forks, + ); // Ensure the validator can land votes with the new identity before // becoming leader has_new_vote_been_rooted = !wait_for_vote_to_start_leader; @@ -1154,6 +1154,32 @@ impl ReplayStage { }) } + fn reload_tower_with_new_identity( + tower_storage: &Arc, + new_identity: &Pubkey, + vote_account: &Pubkey, + bank_forks: &Arc>, + ) -> Tower { + Tower::restore(tower_storage.as_ref(), new_identity) + .and_then(|restored_tower| { + let root_bank = bank_forks.read().unwrap().root_bank(); + let slot_history = root_bank.get_slot_history(); + restored_tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history) + }) + .unwrap_or_else(|err| { + if err.is_file_missing() { + Tower::new_from_bankforks( + &bank_forks.read().unwrap(), + new_identity, + vote_account, + ) + } else { + error!("Failed to load tower for {}: {}", new_identity, err); + std::process::exit(1); + } + }) + } + fn check_for_vote_only_mode( heaviest_bank_slot: Slot, forks_root: Slot, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index d498ab405d39aa..d53f69dc03dccd 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -102,6 +102,7 @@ impl Tvu { /// * `blockstore` - the ledger itself #[allow(clippy::too_many_arguments)] pub fn new( + startup_identity: Pubkey, vote_account: &Pubkey, authorized_voter_keypairs: Arc>>>, bank_forks: &Arc>, @@ -248,6 +249,7 @@ impl Tvu { let (blockstore_cleanup_slot_sender, blockstore_cleanup_slot_receiver) = unbounded(); let replay_stage_config = ReplayStageConfig { + startup_identity, vote_account: *vote_account, authorized_voter_keypairs, exit: exit.clone(), @@ -451,6 +453,7 @@ pub mod tests { let outstanding_repair_requests = Arc::>::default(); let cluster_slots = Arc::new(ClusterSlots::default()); let tvu = Tvu::new( + Pubkey::new_unique(), &vote_keypair.pubkey(), Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])), &bank_forks, diff --git a/core/src/validator.rs b/core/src/validator.rs index f1432d67f397dc..ea8e8d0d1b9d04 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1248,6 +1248,7 @@ impl Validator { Arc::new(crate::cluster_slots_service::cluster_slots::ClusterSlots::default()); let tvu = Tvu::new( + id, vote_account, authorized_voter_keypairs, &bank_forks, From 3c404d91fee4089c7b23af7ce85c770c98fad860 Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Tue, 13 Feb 2024 08:38:49 +0000 Subject: [PATCH 2/3] pr feedback: add unit tests --- core/src/consensus.rs | 22 ++++++++++ core/src/replay_stage.rs | 71 ++++++++++++++++++++++++++++--- sdk/program/src/vote/state/mod.rs | 18 ++++++++ 3 files changed, 104 insertions(+), 7 deletions(-) diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 4f129b18282218..3e24f33233863e 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -292,6 +292,28 @@ impl Tower { } } + #[cfg(test)] + pub fn new_random(node_pubkey: Pubkey) -> Self { + use rand::Rng; + + let mut rng = rand::thread_rng(); + let root_slot = rng.gen(); + let vote_state = VoteState::new_rand_for_tests(node_pubkey, root_slot); + let last_vote = VoteStateUpdate::from( + vote_state + .votes + .iter() + .map(|lv| (lv.slot(), lv.confirmation_count())) + .collect::>(), + ); + Self { + node_pubkey, + vote_state, + last_vote: VoteTransaction::CompactVoteStateUpdate(last_vote), + ..Tower::default() + } + } + pub fn new_from_bankforks( bank_forks: &BankForks, node_pubkey: &Pubkey, diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 581ef4149b29f2..23aea602049526 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -584,7 +584,7 @@ impl ReplayStage { // set-identity was called during the startup procedure, ensure the tower is consistent // before starting the loop. further calls to set-identity will reload the tower in the loop tower = Self::reload_tower_with_new_identity( - &tower_storage, + tower_storage.as_ref(), &my_pubkey, &vote_account, &bank_forks, @@ -1000,7 +1000,7 @@ impl ReplayStage { // Load the new identity's tower tower = Self::reload_tower_with_new_identity( - &tower_storage, + tower_storage.as_ref(), &my_pubkey, &vote_account, &bank_forks, @@ -1155,12 +1155,12 @@ impl ReplayStage { } fn reload_tower_with_new_identity( - tower_storage: &Arc, + tower_storage: &dyn TowerStorage, new_identity: &Pubkey, vote_account: &Pubkey, bank_forks: &Arc>, ) -> Tower { - Tower::restore(tower_storage.as_ref(), new_identity) + Tower::restore(tower_storage, new_identity) .and_then(|restored_tower| { let root_bank = bank_forks.read().unwrap().root_bank(); let slot_history = root_bank.get_slot_history(); @@ -4256,9 +4256,9 @@ pub(crate) mod tests { crate::{ consensus::{ progress_map::{ValidatorStakeInfo, RETRANSMIT_BASE_DELAY_MS}, - tower_storage::NullTowerStorage, + tower_storage::{FileTowerStorage, NullTowerStorage}, tree_diff::TreeDiff, - Tower, + Tower, VOTE_THRESHOLD_DEPTH, }, replay_stage::ReplayStage, vote_simulator::{self, VoteSimulator}, @@ -4280,7 +4280,7 @@ pub(crate) mod tests { }, solana_runtime::{ accounts_background_service::AbsRequestSender, - commitment::BlockCommitment, + commitment::{BlockCommitment, VOTE_THRESHOLD_SIZE}, genesis_utils::{GenesisConfigInfo, ValidatorVoteKeypairs}, }, solana_sdk::{ @@ -4304,6 +4304,7 @@ pub(crate) mod tests { iter, sync::{atomic::AtomicU64, Arc, RwLock}, }, + tempfile::tempdir, trees::{tr, Tree}, }; @@ -8624,4 +8625,60 @@ pub(crate) mod tests { assert_eq!(reset_fork, Some(4)); assert_eq!(failures, vec![HeaviestForkFailures::LockedOut(4),]); } + + #[test] + fn test_tower_reload_missing() { + let tower_file = tempdir().unwrap().into_path(); + let tower_storage = FileTowerStorage::new(tower_file); + let identity = Pubkey::new_unique(); + let vote_account = Pubkey::new_unique(); + let tree = tr(0) / (tr(1) / (tr(3) / (tr(4))) / (tr(2) / (tr(5) / (tr(6))))); + let generate_votes = |pubkeys: Vec| { + pubkeys + .into_iter() + .zip(iter::once(vec![0, 1, 2, 5, 6]).chain(iter::repeat(vec![0, 1, 3, 4]).take(2))) + .collect() + }; + let (vote_simulator, _blockstore) = + setup_forks_from_tree(tree, 3, Some(Box::new(generate_votes))); + let bank_forks = vote_simulator.bank_forks; + + let tower = ReplayStage::reload_tower_with_new_identity( + &tower_storage, + &identity, + &vote_account, + &bank_forks, + ); + let expected_tower = Tower::new_for_tests(VOTE_THRESHOLD_DEPTH, VOTE_THRESHOLD_SIZE); + assert_eq!(tower.vote_state, expected_tower.vote_state); + } + + #[test] + fn test_tower_reload() { + let tower_file = tempdir().unwrap().into_path(); + let tower_storage = FileTowerStorage::new(tower_file); + let node_keypair = Keypair::new(); + let identity = node_keypair.pubkey(); + let vote_account = Pubkey::new_unique(); + let tree = tr(0) / (tr(1) / (tr(3) / (tr(4))) / (tr(2) / (tr(5) / (tr(6))))); + let generate_votes = |pubkeys: Vec| { + pubkeys + .into_iter() + .zip(iter::once(vec![0, 1, 2, 5, 6]).chain(iter::repeat(vec![0, 1, 3, 4]).take(2))) + .collect() + }; + let (vote_simulator, _blockstore) = + setup_forks_from_tree(tree, 3, Some(Box::new(generate_votes))); + let bank_forks = vote_simulator.bank_forks; + let expected_tower = Tower::new_random(identity); + expected_tower.save(&tower_storage, &node_keypair).unwrap(); + + let tower = ReplayStage::reload_tower_with_new_identity( + &tower_storage, + &identity, + &vote_account, + &bank_forks, + ); + assert_eq!(tower.vote_state, expected_tower.vote_state); + } } diff --git a/sdk/program/src/vote/state/mod.rs b/sdk/program/src/vote/state/mod.rs index 8cfcd0ef19d9e8..a6e765472750c6 100644 --- a/sdk/program/src/vote/state/mod.rs +++ b/sdk/program/src/vote/state/mod.rs @@ -352,6 +352,24 @@ impl VoteState { } } + pub fn new_rand_for_tests(node_pubkey: Pubkey, root_slot: Slot) -> Self { + let votes = (1..32) + .map(|x| LandedVote { + latency: 0, + lockout: Lockout::new_with_confirmation_count( + u64::from(x).saturating_add(root_slot), + 32_u32.saturating_sub(x), + ), + }) + .collect(); + Self { + node_pubkey, + root_slot: Some(root_slot), + votes, + ..VoteState::default() + } + } + pub fn get_authorized_voter(&self, epoch: Epoch) -> Option { self.authorized_voters.get_authorized_voter(epoch) } From d63fd4f75817b3231490b1e7544c3206724c9a0b Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Tue, 20 Feb 2024 04:20:36 +0000 Subject: [PATCH 3/3] pr feedback: use tower.node_pubkey, more descriptive names --- core/src/replay_stage.rs | 49 +++++++++++++++++----------------------- core/src/tvu.rs | 3 --- core/src/validator.rs | 1 - 3 files changed, 21 insertions(+), 32 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 23aea602049526..1b7b4737f55fc2 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -276,7 +276,6 @@ impl PartitionInfo { } pub struct ReplayStageConfig { - pub startup_identity: Pubkey, pub vote_account: Pubkey, pub authorized_voter_keypairs: Arc>>>, pub exit: Arc, @@ -547,7 +546,6 @@ impl ReplayStage { popular_pruned_forks_receiver: PopularPrunedForksReceiver, ) -> Result { let ReplayStageConfig { - startup_identity, vote_account, authorized_voter_keypairs, exit, @@ -580,10 +578,11 @@ impl ReplayStage { let _exit = Finalizer::new(exit.clone()); let mut identity_keypair = cluster_info.keypair().clone(); let mut my_pubkey = identity_keypair.pubkey(); - if my_pubkey != startup_identity { + if my_pubkey != tower.node_pubkey { // set-identity was called during the startup procedure, ensure the tower is consistent // before starting the loop. further calls to set-identity will reload the tower in the loop - tower = Self::reload_tower_with_new_identity( + let my_old_pubkey = tower.node_pubkey; + tower = Self::load_tower( tower_storage.as_ref(), &my_pubkey, &vote_account, @@ -591,7 +590,7 @@ impl ReplayStage { ); warn!( "Identity changed during startup from {} to {}", - startup_identity, my_pubkey + my_old_pubkey, my_pubkey ); } let (mut progress, mut heaviest_subtree_fork_choice) = @@ -999,7 +998,7 @@ impl ReplayStage { my_pubkey = identity_keypair.pubkey(); // Load the new identity's tower - tower = Self::reload_tower_with_new_identity( + tower = Self::load_tower( tower_storage.as_ref(), &my_pubkey, &vote_account, @@ -1154,13 +1153,13 @@ impl ReplayStage { }) } - fn reload_tower_with_new_identity( + fn load_tower( tower_storage: &dyn TowerStorage, - new_identity: &Pubkey, + node_pubkey: &Pubkey, vote_account: &Pubkey, bank_forks: &Arc>, ) -> Tower { - Tower::restore(tower_storage, new_identity) + Tower::restore(tower_storage, node_pubkey) .and_then(|restored_tower| { let root_bank = bank_forks.read().unwrap().root_bank(); let slot_history = root_bank.get_slot_history(); @@ -1170,11 +1169,11 @@ impl ReplayStage { if err.is_file_missing() { Tower::new_from_bankforks( &bank_forks.read().unwrap(), - new_identity, + node_pubkey, vote_account, ) } else { - error!("Failed to load tower for {}: {}", new_identity, err); + error!("Failed to load tower for {}: {}", node_pubkey, err); std::process::exit(1); } }) @@ -8627,10 +8626,10 @@ pub(crate) mod tests { } #[test] - fn test_tower_reload_missing() { + fn test_tower_load_missing() { let tower_file = tempdir().unwrap().into_path(); let tower_storage = FileTowerStorage::new(tower_file); - let identity = Pubkey::new_unique(); + let node_pubkey = Pubkey::new_unique(); let vote_account = Pubkey::new_unique(); let tree = tr(0) / (tr(1) / (tr(3) / (tr(4))) / (tr(2) / (tr(5) / (tr(6))))); let generate_votes = |pubkeys: Vec| { @@ -8643,22 +8642,19 @@ pub(crate) mod tests { setup_forks_from_tree(tree, 3, Some(Box::new(generate_votes))); let bank_forks = vote_simulator.bank_forks; - let tower = ReplayStage::reload_tower_with_new_identity( - &tower_storage, - &identity, - &vote_account, - &bank_forks, - ); + let tower = + ReplayStage::load_tower(&tower_storage, &node_pubkey, &vote_account, &bank_forks); let expected_tower = Tower::new_for_tests(VOTE_THRESHOLD_DEPTH, VOTE_THRESHOLD_SIZE); assert_eq!(tower.vote_state, expected_tower.vote_state); + assert_eq!(tower.node_pubkey, node_pubkey); } #[test] - fn test_tower_reload() { + fn test_tower_load() { let tower_file = tempdir().unwrap().into_path(); let tower_storage = FileTowerStorage::new(tower_file); let node_keypair = Keypair::new(); - let identity = node_keypair.pubkey(); + let node_pubkey = node_keypair.pubkey(); let vote_account = Pubkey::new_unique(); let tree = tr(0) / (tr(1) / (tr(3) / (tr(4))) / (tr(2) / (tr(5) / (tr(6))))); let generate_votes = |pubkeys: Vec| { @@ -8670,15 +8666,12 @@ pub(crate) mod tests { let (vote_simulator, _blockstore) = setup_forks_from_tree(tree, 3, Some(Box::new(generate_votes))); let bank_forks = vote_simulator.bank_forks; - let expected_tower = Tower::new_random(identity); + let expected_tower = Tower::new_random(node_pubkey); expected_tower.save(&tower_storage, &node_keypair).unwrap(); - let tower = ReplayStage::reload_tower_with_new_identity( - &tower_storage, - &identity, - &vote_account, - &bank_forks, - ); + let tower = + ReplayStage::load_tower(&tower_storage, &node_pubkey, &vote_account, &bank_forks); assert_eq!(tower.vote_state, expected_tower.vote_state); + assert_eq!(tower.node_pubkey, expected_tower.node_pubkey); } } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index d53f69dc03dccd..d498ab405d39aa 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -102,7 +102,6 @@ impl Tvu { /// * `blockstore` - the ledger itself #[allow(clippy::too_many_arguments)] pub fn new( - startup_identity: Pubkey, vote_account: &Pubkey, authorized_voter_keypairs: Arc>>>, bank_forks: &Arc>, @@ -249,7 +248,6 @@ impl Tvu { let (blockstore_cleanup_slot_sender, blockstore_cleanup_slot_receiver) = unbounded(); let replay_stage_config = ReplayStageConfig { - startup_identity, vote_account: *vote_account, authorized_voter_keypairs, exit: exit.clone(), @@ -453,7 +451,6 @@ pub mod tests { let outstanding_repair_requests = Arc::>::default(); let cluster_slots = Arc::new(ClusterSlots::default()); let tvu = Tvu::new( - Pubkey::new_unique(), &vote_keypair.pubkey(), Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])), &bank_forks, diff --git a/core/src/validator.rs b/core/src/validator.rs index ea8e8d0d1b9d04..f1432d67f397dc 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1248,7 +1248,6 @@ impl Validator { Arc::new(crate::cluster_slots_service::cluster_slots::ClusterSlots::default()); let tvu = Tvu::new( - id, vote_account, authorized_voter_keypairs, &bank_forks,