Skip to content

Commit

Permalink
replay: reload tower if set-identity during startup
Browse files Browse the repository at this point in the history
  • Loading branch information
AshwinSekar committed Feb 13, 2024
1 parent bf1becb commit cdedc75
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 22 deletions.
70 changes: 48 additions & 22 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ impl PartitionInfo {
}

pub struct ReplayStageConfig {
pub startup_identity: Pubkey,
pub vote_account: Pubkey,
pub authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
pub exit: Arc<AtomicBool>,
Expand Down Expand Up @@ -546,6 +547,7 @@ impl ReplayStage {
popular_pruned_forks_receiver: PopularPrunedForksReceiver,
) -> Result<Self, String> {
let ReplayStageConfig {
startup_identity,
vote_account,
authorized_voter_keypairs,
exit,
Expand Down Expand Up @@ -578,6 +580,20 @@ impl ReplayStage {
let _exit = Finalizer::new(exit.clone());
let mut identity_keypair = cluster_info.keypair().clone();
let mut my_pubkey = identity_keypair.pubkey();
if my_pubkey != startup_identity {
// set-identity was called during the startup procedure, ensure the tower is consistent
// before starting the loop. further calls to set-identity will reload the tower in the loop
tower = Self::reload_tower_with_new_identity(
&tower_storage,
&my_pubkey,
&vote_account,
&bank_forks,
);
warn!(
"Identity changed from {} to {}",
startup_identity, my_pubkey
);
}
let (mut progress, mut heaviest_subtree_fork_choice) =
Self::initialize_progress_and_fork_choice_with_locked_bank_forks(
&bank_forks,
Expand Down Expand Up @@ -983,28 +999,12 @@ impl ReplayStage {
my_pubkey = identity_keypair.pubkey();

// Load the new identity's tower
tower = Tower::restore(tower_storage.as_ref(), &my_pubkey)
.and_then(|restored_tower| {
let root_bank = bank_forks.read().unwrap().root_bank();
let slot_history = root_bank.get_slot_history();
restored_tower.adjust_lockouts_after_replay(
root_bank.slot(),
&slot_history,
)
})
.unwrap_or_else(|err| {
if err.is_file_missing() {
Tower::new_from_bankforks(
&bank_forks.read().unwrap(),
&my_pubkey,
&vote_account,
)
} else {
error!("Failed to load tower for {}: {}", my_pubkey, err);
std::process::exit(1);
}
});

tower = Self::reload_tower_with_new_identity(
&tower_storage,
&my_pubkey,
&vote_account,
&bank_forks,
);
// Ensure the validator can land votes with the new identity before
// becoming leader
has_new_vote_been_rooted = !wait_for_vote_to_start_leader;
Expand Down Expand Up @@ -1154,6 +1154,32 @@ impl ReplayStage {
})
}

fn reload_tower_with_new_identity(
tower_storage: &Arc<dyn TowerStorage>,
new_identity: &Pubkey,
vote_account: &Pubkey,
bank_forks: &Arc<RwLock<BankForks>>,
) -> Tower {
Tower::restore(tower_storage.as_ref(), new_identity)
.and_then(|restored_tower| {
let root_bank = bank_forks.read().unwrap().root_bank();
let slot_history = root_bank.get_slot_history();
restored_tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history)
})
.unwrap_or_else(|err| {
if err.is_file_missing() {
Tower::new_from_bankforks(
&bank_forks.read().unwrap(),
new_identity,
vote_account,
)
} else {
error!("Failed to load tower for {}: {}", new_identity, err);
std::process::exit(1);
}
})
}

fn check_for_vote_only_mode(
heaviest_bank_slot: Slot,
forks_root: Slot,
Expand Down
3 changes: 3 additions & 0 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ impl Tvu {
/// * `blockstore` - the ledger itself
#[allow(clippy::too_many_arguments)]
pub fn new(
startup_identity: Pubkey,
vote_account: &Pubkey,
authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
bank_forks: &Arc<RwLock<BankForks>>,
Expand Down Expand Up @@ -248,6 +249,7 @@ impl Tvu {

let (blockstore_cleanup_slot_sender, blockstore_cleanup_slot_receiver) = unbounded();
let replay_stage_config = ReplayStageConfig {
startup_identity,
vote_account: *vote_account,
authorized_voter_keypairs,
exit: exit.clone(),
Expand Down Expand Up @@ -451,6 +453,7 @@ pub mod tests {
let outstanding_repair_requests = Arc::<RwLock<OutstandingShredRepairs>>::default();
let cluster_slots = Arc::new(ClusterSlots::default());
let tvu = Tvu::new(
Pubkey::new_unique(),
&vote_keypair.pubkey(),
Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])),
&bank_forks,
Expand Down
1 change: 1 addition & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1248,6 +1248,7 @@ impl Validator {
Arc::new(crate::cluster_slots_service::cluster_slots::ClusterSlots::default());

let tvu = Tvu::new(
id,
vote_account,
authorized_voter_keypairs,
&bank_forks,
Expand Down

0 comments on commit cdedc75

Please sign in to comment.