Skip to content

Commit

Permalink
replay: do not start leader for a block we already have shreds for (#…
Browse files Browse the repository at this point in the history
…2416)

* replay: do not start leader for a block we already have shreds for

* pr feedback: comment, move existing check to blockstore fn

* move blockstore read after tick height check

* pr feedback: resuse blockstore fn in next_leader_slot
  • Loading branch information
AshwinSekar authored Aug 6, 2024
1 parent f1de5c0 commit 15dbe7f
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 15 deletions.
144 changes: 137 additions & 7 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1992,6 +1992,14 @@ impl ReplayStage {
}
}

/// Checks if it is time for us to start producing a leader block.
/// Fails if:
/// - Current PoH has not satisfied criteria to start my leader block
/// - Startup verification is not complete,
/// - Bank forks already contains a bank for this leader slot
/// - We have not landed a vote yet and the `wait_for_vote_to_start_leader` flag is set
/// - We have failed the propagated check
/// Returns whether a new working bank was created and inserted into bank forks.
#[allow(clippy::too_many_arguments)]
fn maybe_start_leader(
my_pubkey: &Pubkey,
Expand All @@ -2005,7 +2013,7 @@ impl ReplayStage {
banking_tracer: &Arc<BankingTracer>,
has_new_vote_been_rooted: bool,
track_transaction_indexes: bool,
) {
) -> bool {
// all the individual calls to poh_recorder.read() are designed to
// increase granularity, decrease contention

Expand All @@ -2019,7 +2027,7 @@ impl ReplayStage {
} => (poh_slot, parent_slot),
PohLeaderStatus::NotReached => {
trace!("{} poh_recorder hasn't reached_leader_slot", my_pubkey);
return;
return false;
}
};

Expand All @@ -2035,12 +2043,12 @@ impl ReplayStage {

if !parent.is_startup_verification_complete() {
info!("startup verification incomplete, so skipping my leader slot");
return;
return false;
}

if bank_forks.read().unwrap().get(poh_slot).is_some() {
warn!("{} already have bank in forks at {}?", my_pubkey, poh_slot);
return;
return false;
}
trace!(
"{} poh_slot {} parent_slot {}",
Expand All @@ -2052,7 +2060,7 @@ impl ReplayStage {
if let Some(next_leader) = leader_schedule_cache.slot_leader_at(poh_slot, Some(&parent)) {
if !has_new_vote_been_rooted {
info!("Haven't landed a vote, so skipping my leader slot");
return;
return false;
}

trace!(
Expand All @@ -2064,7 +2072,7 @@ impl ReplayStage {

// I guess I missed my slot
if next_leader != *my_pubkey {
return;
return false;
}

datapoint_info!(
Expand Down Expand Up @@ -2098,7 +2106,7 @@ impl ReplayStage {
latest_unconfirmed_leader_slot,
);
}
return;
return false;
}

let root_slot = bank_forks.read().unwrap().root();
Expand Down Expand Up @@ -2133,8 +2141,10 @@ impl ReplayStage {
.write()
.unwrap()
.set_bank(tpu_bank, track_transaction_indexes);
true
} else {
error!("{} No next leader found", my_pubkey);
false
}
}

Expand Down Expand Up @@ -9097,4 +9107,124 @@ pub(crate) mod tests {
.is_candidate(&(5, bank_forks.bank_hash(5).unwrap()))
.unwrap());
}

#[test]
fn test_skip_leader_slot_for_existing_slot() {
solana_logger::setup();

let ReplayBlockstoreComponents {
blockstore,
my_pubkey,
leader_schedule_cache,
poh_recorder,
vote_simulator,
rpc_subscriptions,
..
} = replay_blockstore_components(None, 1, None);
let VoteSimulator {
bank_forks,
mut progress,
..
} = vote_simulator;

let working_bank = bank_forks.read().unwrap().working_bank();
assert!(working_bank.is_complete());
assert!(working_bank.is_frozen());
// Mark startup verification as complete to avoid skipping leader slots
working_bank.set_startup_verification_complete();

// Insert a block two slots greater than current bank. This slot does
// not have a corresponding Bank in BankForks; this emulates a scenario
// where the block had previously been created and added to BankForks,
// but then got removed. This could be the case if the Bank was not on
// the major fork.
let dummy_slot = working_bank.slot() + 2;
let initial_slot = working_bank.slot();
let num_entries = 10;
let merkle_variant = true;
let (shreds, _) = make_slot_entries(dummy_slot, initial_slot, num_entries, merkle_variant);
blockstore.insert_shreds(shreds, None, false).unwrap();

// Reset PoH recorder to the completed bank to ensure consistent state
ReplayStage::reset_poh_recorder(
&my_pubkey,
&blockstore,
working_bank.clone(),
&poh_recorder,
&leader_schedule_cache,
);

// Register just over one slot worth of ticks directly with PoH recorder
let num_poh_ticks =
(working_bank.ticks_per_slot() * working_bank.hashes_per_tick().unwrap()) + 1;
poh_recorder
.write()
.map(|mut poh_recorder| {
for _ in 0..num_poh_ticks + 1 {
poh_recorder.tick();
}
})
.unwrap();

let poh_recorder = Arc::new(poh_recorder);
let (retransmit_slots_sender, _) = unbounded();
let (banking_tracer, _) = BankingTracer::new(None).unwrap();
// A vote has not technically been rooted, but it doesn't matter for
// this test to use true to avoid skipping the leader slot
let has_new_vote_been_rooted = true;
let track_transaction_indexes = false;

// We should not attempt to start leader for the dummy_slot
assert_matches!(
poh_recorder.read().unwrap().reached_leader_slot(&my_pubkey),
PohLeaderStatus::NotReached
);
assert!(!ReplayStage::maybe_start_leader(
&my_pubkey,
&bank_forks,
&poh_recorder,
&leader_schedule_cache,
&rpc_subscriptions,
&mut progress,
&retransmit_slots_sender,
&mut SkippedSlotsInfo::default(),
&banking_tracer,
has_new_vote_been_rooted,
track_transaction_indexes,
));

// Register another slots worth of ticks with PoH recorder
poh_recorder
.write()
.map(|mut poh_recorder| {
for _ in 0..num_poh_ticks + 1 {
poh_recorder.tick();
}
})
.unwrap();

// We should now start leader for dummy_slot + 1
let good_slot = dummy_slot + 1;
assert!(ReplayStage::maybe_start_leader(
&my_pubkey,
&bank_forks,
&poh_recorder,
&leader_schedule_cache,
&rpc_subscriptions,
&mut progress,
&retransmit_slots_sender,
&mut SkippedSlotsInfo::default(),
&banking_tracer,
has_new_vote_been_rooted,
track_transaction_indexes,
));
// Get the new working bank, which is also the new leader bank/slot
let working_bank = bank_forks.read().unwrap().working_bank();
// The new bank's slot must NOT be dummy_slot as the blockstore already
// had a shred inserted for dummy_slot prior to maybe_start_leader().
// maybe_start_leader() must not pick dummy_slot to avoid creating a
// duplicate block.
assert_eq!(working_bank.slot(), good_slot);
assert_eq!(working_bank.parent_slot(), initial_slot);
}
}
7 changes: 7 additions & 0 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4049,6 +4049,13 @@ impl Blockstore {
Ok(duplicate_slots_iterator.map(|(slot, _)| slot))
}

pub fn has_existing_shreds_for_slot(&self, slot: Slot) -> bool {
match self.meta(slot).unwrap() {
Some(meta) => meta.received > 0,
None => false,
}
}

/// Returns the max root or 0 if it does not exist
pub fn max_root(&self) -> Slot {
self.max_root.load(Ordering::Relaxed)
Expand Down
12 changes: 4 additions & 8 deletions ledger/src/leader_schedule_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,10 @@ impl LeaderScheduleCache {
.map(move |i| i as Slot + first_slot)
})
.skip_while(|slot| {
match blockstore {
None => false,
// Skip slots we have already sent a shred for.
Some(blockstore) => match blockstore.meta(*slot).unwrap() {
Some(meta) => meta.received > 0,
None => false,
},
}
// Skip slots we already have shreds for
blockstore
.map(|bs| bs.has_existing_shreds_for_slot(*slot))
.unwrap_or(false)
});
let first_slot = schedule.next()?;
let max_slot = first_slot.saturating_add(max_slot_range);
Expand Down
9 changes: 9 additions & 0 deletions poh/src/poh_recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,15 @@ impl PohRecorder {
return PohLeaderStatus::NotReached;
}

if self.blockstore.has_existing_shreds_for_slot(next_poh_slot) {
// We already have existing shreds for this slot. This can happen when this block was previously
// created and added to BankForks, however a recent PoH reset caused this bank to be removed
// as it was not part of the rooted fork. If this slot is not the first slot for this leader,
// and the first slot was previously ticked over, the check in `leader_schedule_cache::next_leader_slot`
// will not suffice, as it only checks if there are shreds for the first slot.
return PohLeaderStatus::NotReached;
}

assert!(next_tick_height >= self.start_tick_height);
let poh_slot = next_poh_slot;
let parent_slot = self.start_slot();
Expand Down

0 comments on commit 15dbe7f

Please sign in to comment.