Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replay: reload tower if set-identity during startup #35173

Merged
merged 3 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions core/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,28 @@ impl Tower {
}
}

#[cfg(test)]
pub fn new_random(node_pubkey: Pubkey) -> Self {
use rand::Rng;

let mut rng = rand::thread_rng();
let root_slot = rng.gen();
let vote_state = VoteState::new_rand_for_tests(node_pubkey, root_slot);
let last_vote = VoteStateUpdate::from(
vote_state
.votes
.iter()
.map(|lv| (lv.slot(), lv.confirmation_count()))
.collect::<Vec<_>>(),
);
Self {
node_pubkey,
vote_state,
last_vote: VoteTransaction::CompactVoteStateUpdate(last_vote),
..Tower::default()
}
}

pub fn new_from_bankforks(
bank_forks: &BankForks,
node_pubkey: &Pubkey,
Expand Down
126 changes: 101 additions & 25 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,21 @@ impl ReplayStage {
let _exit = Finalizer::new(exit.clone());
let mut identity_keypair = cluster_info.keypair().clone();
let mut my_pubkey = identity_keypair.pubkey();
if my_pubkey != tower.node_pubkey {
// set-identity was called during the startup procedure, ensure the tower is consistent
// before starting the loop. further calls to set-identity will reload the tower in the loop
let my_old_pubkey = tower.node_pubkey;
tower = Self::load_tower(
tower_storage.as_ref(),
&my_pubkey,
&vote_account,
&bank_forks,
);
warn!(
"Identity changed during startup from {} to {}",
my_old_pubkey, my_pubkey
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a warn! here but not in the loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do warn in loop see line 1011

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, if you always warn you can put that in the common function I guess

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would prefer not to, as the helper is just a generic load of the tower. it doesn't need to know anything about the previous pubkey.

let (mut progress, mut heaviest_subtree_fork_choice) =
Self::initialize_progress_and_fork_choice_with_locked_bank_forks(
&bank_forks,
Expand Down Expand Up @@ -983,28 +998,12 @@ impl ReplayStage {
my_pubkey = identity_keypair.pubkey();

// Load the new identity's tower
tower = Tower::restore(tower_storage.as_ref(), &my_pubkey)
.and_then(|restored_tower| {
let root_bank = bank_forks.read().unwrap().root_bank();
let slot_history = root_bank.get_slot_history();
restored_tower.adjust_lockouts_after_replay(
root_bank.slot(),
&slot_history,
)
})
.unwrap_or_else(|err| {
if err.is_file_missing() {
Tower::new_from_bankforks(
&bank_forks.read().unwrap(),
&my_pubkey,
&vote_account,
)
} else {
error!("Failed to load tower for {}: {}", my_pubkey, err);
std::process::exit(1);
}
});

tower = Self::load_tower(
tower_storage.as_ref(),
&my_pubkey,
&vote_account,
&bank_forks,
);
// Ensure the validator can land votes with the new identity before
// becoming leader
has_new_vote_been_rooted = !wait_for_vote_to_start_leader;
Expand Down Expand Up @@ -1154,6 +1153,32 @@ impl ReplayStage {
})
}

fn load_tower(
tower_storage: &dyn TowerStorage,
node_pubkey: &Pubkey,
vote_account: &Pubkey,
bank_forks: &Arc<RwLock<BankForks>>,
) -> Tower {
Tower::restore(tower_storage, node_pubkey)
.and_then(|restored_tower| {
let root_bank = bank_forks.read().unwrap().root_bank();
let slot_history = root_bank.get_slot_history();
restored_tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history)
})
.unwrap_or_else(|err| {
if err.is_file_missing() {
Tower::new_from_bankforks(
&bank_forks.read().unwrap(),
node_pubkey,
vote_account,
)
} else {
error!("Failed to load tower for {}: {}", node_pubkey, err);
std::process::exit(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why exit instead of panic! ? Would returning an error be better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's just rust style guidelines:
"For planned shutdowns, use exit(). Do note that a known error is considered a planned shutdown
For unplanned shutdowns (i.e. exceptional failures) consider panic!(), as you'll both benefit from being able to get a stack trace when this happens, and the failure case should be exceptional enough that it is effectively unaccounted for and stems from an unplanned scenario"
not being able to load tower due to corrupted disk is an expected failure condition, we should always shutdown if we do not have a working tower.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

however i haven't really had to debug these scenarios before, if you think the unwind from panic is helpful we can go with that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the difference is small, but panic! is probably better style because we use it more:
https://stackoverflow.com/questions/39228685/when-is-stdprocessexit-o-k-to-use

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, i'll change to panic in a separate PR, don't want to backport changes to existing behavior.

}
})
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a test maybe?

fn check_for_vote_only_mode(
heaviest_bank_slot: Slot,
forks_root: Slot,
Expand Down Expand Up @@ -4230,9 +4255,9 @@ pub(crate) mod tests {
crate::{
consensus::{
progress_map::{ValidatorStakeInfo, RETRANSMIT_BASE_DELAY_MS},
tower_storage::NullTowerStorage,
tower_storage::{FileTowerStorage, NullTowerStorage},
tree_diff::TreeDiff,
Tower,
Tower, VOTE_THRESHOLD_DEPTH,
},
replay_stage::ReplayStage,
vote_simulator::{self, VoteSimulator},
Expand All @@ -4254,7 +4279,7 @@ pub(crate) mod tests {
},
solana_runtime::{
accounts_background_service::AbsRequestSender,
commitment::BlockCommitment,
commitment::{BlockCommitment, VOTE_THRESHOLD_SIZE},
genesis_utils::{GenesisConfigInfo, ValidatorVoteKeypairs},
},
solana_sdk::{
Expand All @@ -4278,6 +4303,7 @@ pub(crate) mod tests {
iter,
sync::{atomic::AtomicU64, Arc, RwLock},
},
tempfile::tempdir,
trees::{tr, Tree},
};

Expand Down Expand Up @@ -8598,4 +8624,54 @@ pub(crate) mod tests {
assert_eq!(reset_fork, Some(4));
assert_eq!(failures, vec![HeaviestForkFailures::LockedOut(4),]);
}

#[test]
fn test_tower_load_missing() {
let tower_file = tempdir().unwrap().into_path();
let tower_storage = FileTowerStorage::new(tower_file);
let node_pubkey = Pubkey::new_unique();
let vote_account = Pubkey::new_unique();
let tree = tr(0) / (tr(1) / (tr(3) / (tr(4))) / (tr(2) / (tr(5) / (tr(6)))));
let generate_votes = |pubkeys: Vec<Pubkey>| {
pubkeys
.into_iter()
.zip(iter::once(vec![0, 1, 2, 5, 6]).chain(iter::repeat(vec![0, 1, 3, 4]).take(2)))
.collect()
};
let (vote_simulator, _blockstore) =
setup_forks_from_tree(tree, 3, Some(Box::new(generate_votes)));
let bank_forks = vote_simulator.bank_forks;

let tower =
ReplayStage::load_tower(&tower_storage, &node_pubkey, &vote_account, &bank_forks);
let expected_tower = Tower::new_for_tests(VOTE_THRESHOLD_DEPTH, VOTE_THRESHOLD_SIZE);
assert_eq!(tower.vote_state, expected_tower.vote_state);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, do we test that the new tower has new identity somewhere?

assert_eq!(tower.node_pubkey, node_pubkey);
}

#[test]
fn test_tower_load() {
let tower_file = tempdir().unwrap().into_path();
let tower_storage = FileTowerStorage::new(tower_file);
let node_keypair = Keypair::new();
let node_pubkey = node_keypair.pubkey();
let vote_account = Pubkey::new_unique();
let tree = tr(0) / (tr(1) / (tr(3) / (tr(4))) / (tr(2) / (tr(5) / (tr(6)))));
let generate_votes = |pubkeys: Vec<Pubkey>| {
pubkeys
.into_iter()
.zip(iter::once(vec![0, 1, 2, 5, 6]).chain(iter::repeat(vec![0, 1, 3, 4]).take(2)))
.collect()
};
let (vote_simulator, _blockstore) =
setup_forks_from_tree(tree, 3, Some(Box::new(generate_votes)));
let bank_forks = vote_simulator.bank_forks;
let expected_tower = Tower::new_random(node_pubkey);
expected_tower.save(&tower_storage, &node_keypair).unwrap();

let tower =
ReplayStage::load_tower(&tower_storage, &node_pubkey, &vote_account, &bank_forks);
assert_eq!(tower.vote_state, expected_tower.vote_state);
assert_eq!(tower.node_pubkey, expected_tower.node_pubkey);
}
}
18 changes: 18 additions & 0 deletions sdk/program/src/vote/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,24 @@ impl VoteState {
}
}

pub fn new_rand_for_tests(node_pubkey: Pubkey, root_slot: Slot) -> Self {
let votes = (1..32)
.map(|x| LandedVote {
latency: 0,
lockout: Lockout::new_with_confirmation_count(
u64::from(x).saturating_add(root_slot),
32_u32.saturating_sub(x),
),
})
.collect();
Self {
node_pubkey,
root_slot: Some(root_slot),
votes,
..VoteState::default()
}
}

pub fn get_authorized_voter(&self, epoch: Epoch) -> Option<Pubkey> {
self.authorized_voters.get_authorized_voter(epoch)
}
Expand Down
Loading