Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't vote for empty leader transmissions #3248

Merged
merged 9 commits into from
Mar 13, 2019
70 changes: 34 additions & 36 deletions core/src/poh_recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,6 @@ impl PohRecorder {
// synchronize PoH with a bank
pub fn reset(&mut self, tick_height: u64, blockhash: Hash, start_slot: u64) {
self.clear_bank();
let existing = self.tick_cache.iter().any(|(entry, entry_tick_height)| {
if entry.hash == blockhash {
assert_eq!(*entry_tick_height, tick_height);
}
entry.hash == blockhash
});
if existing {
info!(
"reset skipped for: {},{}",
self.poh.hash, self.poh.tick_height
);
return;
}
let mut cache = vec![];
info!(
"reset poh from: {},{} to: {},{}",
Expand Down Expand Up @@ -159,6 +146,7 @@ impl PohRecorder {
"poh_record: max_tick_height reached, setting working bank {} to None",
working_bank.bank.slot()
);
self.start_slot = working_bank.max_tick_height / working_bank.bank.ticks_per_slot();
self.clear_bank();
}
if e.is_err() {
Expand Down Expand Up @@ -461,7 +449,7 @@ mod tests {
poh_recorder.tick();
assert_eq!(poh_recorder.tick_cache.len(), 2);
poh_recorder.reset(poh_recorder.poh.tick_height, poh_recorder.poh.hash, 0);
assert_eq!(poh_recorder.tick_cache.len(), 2);
assert_eq!(poh_recorder.tick_cache.len(), 0);
}

#[test]
Expand All @@ -475,28 +463,7 @@ mod tests {
poh_recorder.tick_cache[0].0.hash,
0,
);
assert_eq!(poh_recorder.tick_cache.len(), 2);
poh_recorder.reset(
poh_recorder.tick_cache[1].1,
poh_recorder.tick_cache[1].0.hash,
0,
);
assert_eq!(poh_recorder.tick_cache.len(), 2);
}

#[test]
#[should_panic]
fn test_reset_with_cached_bad_height() {
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0);
poh_recorder.tick();
poh_recorder.tick();
assert_eq!(poh_recorder.tick_cache.len(), 2);
//mixed up heights
poh_recorder.reset(
poh_recorder.tick_cache[0].1,
poh_recorder.tick_cache[1].0.hash,
0,
);
assert_eq!(poh_recorder.tick_cache.len(), 0);
}

#[test]
Expand Down Expand Up @@ -539,4 +506,35 @@ mod tests {
poh_recorder.clear_bank();
assert!(receiver.try_recv().is_ok());
}

#[test]
fn test_poh_recorder_reset_start_slot() {
let ticks_per_slot = 5;
let (mut genesis_block, _mint_keypair) = GenesisBlock::new(2);
genesis_block.ticks_per_slot = ticks_per_slot;
let bank = Arc::new(Bank::new(&genesis_block));

let prev_hash = bank.last_blockhash();
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash, 0);

let end_slot = 3;
let max_tick_height = (end_slot + 1) * ticks_per_slot - 1;
let working_bank = WorkingBank {
bank,
min_tick_height: 1,
max_tick_height,
};

poh_recorder.set_working_bank(working_bank);
for _ in 0..max_tick_height {
poh_recorder.tick();
}

let tx = test_tx();
let h1 = hash(b"hello world!");
assert!(poh_recorder.record(h1, vec![tx.clone()]).is_err());
assert!(poh_recorder.working_bank.is_none());
// Make sure the starting slot is updated
assert_eq!(poh_recorder.start_slot(), end_slot);
}
}
72 changes: 62 additions & 10 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use solana_sdk::timing::duration_as_ms;
use solana_vote_api::vote_transaction::VoteTransaction;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
Expand Down Expand Up @@ -108,15 +108,13 @@ impl ReplayStage {
}
let max_tick_height = (*bank_slot + 1) * bank.ticks_per_slot() - 1;
if bank.tick_height() == max_tick_height {
bank.freeze();
info!("bank frozen {}", bank.slot());
progress.remove(bank_slot);
if let Err(e) =
slot_full_sender.send((bank.slot(), bank.collector_id()))
{
info!("{} slot_full alert failed: {:?}", my_id, e);
}
votable.push(bank);
Self::process_completed_bank(
&my_id,
bank,
&mut progress,
&mut votable,
&slot_full_sender,
);
}
}

Expand Down Expand Up @@ -315,6 +313,24 @@ impl ReplayStage {
Ok(())
}

fn process_completed_bank(
my_id: &Pubkey,
bank: Arc<Bank>,
progress: &mut HashMap<u64, (Hash, usize)>,
votable: &mut Vec<Arc<Bank>>,
slot_full_sender: &Sender<(u64, Pubkey)>,
) {
bank.freeze();
info!("bank frozen {}", bank.slot());
progress.remove(&bank.slot());
if let Err(e) = slot_full_sender.send((bank.slot(), bank.collector_id())) {
info!("{} slot_full alert failed: {:?}", my_id, e);
}
if bank.is_votable() {
votable.push(bank);
}
}

fn generate_new_bank_forks(blocktree: &Blocktree, forks: &mut BankForks) {
// Find the next slot that chains to the old slot
let frozen_banks = forks.frozen_banks();
Expand Down Expand Up @@ -439,6 +455,42 @@ mod test {
let _ignored = remove_dir_all(&my_ledger_path);
}

#[test]
fn test_no_vote_empty_transmission() {
let genesis_block = GenesisBlock::new(10_000).0;
let bank = Arc::new(Bank::new(&genesis_block));
let mut blockhash = bank.last_blockhash();
let mut entries = Vec::new();
for _ in 0..genesis_block.ticks_per_slot {
let entry = next_entry_mut(&mut blockhash, 1, vec![]); //just ticks
entries.push(entry);
}
let (sender, _receiver) = channel();

let mut progress = HashMap::new();
let (forward_entry_sender, _forward_entry_receiver) = channel();
ReplayStage::replay_entries_into_bank(
&bank,
entries.clone(),
&mut progress,
&forward_entry_sender,
0,
)
.unwrap();

let mut votable = vec![];
ReplayStage::process_completed_bank(
&Pubkey::default(),
bank,
&mut progress,
&mut votable,
&sender,
);
assert!(progress.is_empty());
// Don't vote on slot that only contained ticks
assert!(votable.is_empty());
}

#[test]
fn test_replay_stage_poh_ok_entry_receiver() {
let (forward_entry_sender, forward_entry_receiver) = channel();
Expand Down
16 changes: 13 additions & 3 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use solana_sdk::transaction::Transaction;
use solana_vote_api::vote_instruction::Vote;
use solana_vote_api::vote_state::{Lockout, VoteState};
use std::result;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::time::Instant;

Expand Down Expand Up @@ -192,6 +192,10 @@ pub struct Bank {
/// staked nodes on epoch boundaries, saved off when a bank.slot() is at
/// a leader schedule boundary
epoch_vote_accounts: HashMap<u64, HashMap<Pubkey, Account>>,

/// A boolean reflecting whether any entries were recorded into the PoH
/// stream for the slot == self.slot
is_delta: AtomicBool,
}

impl Default for HashQueue {
Expand Down Expand Up @@ -223,7 +227,6 @@ impl Bank {
/// Create a new bank that points to an immutable checkpoint of another bank.
pub fn new_from_parent(parent: &Arc<Bank>, collector_id: &Pubkey, slot: u64) -> Self {
parent.freeze();

assert_ne!(slot, parent.slot());

let mut bank = Self::default();
Expand Down Expand Up @@ -684,6 +687,9 @@ impl Bank {
if self.is_frozen() {
warn!("=========== FIXME: commit_transactions() working on a frozen bank! ================");
}

self.is_delta.store(true, Ordering::Relaxed);

// TODO: put this assert back in
// assert!(!self.is_frozen());
let now = Instant::now();
Expand Down Expand Up @@ -871,6 +877,11 @@ impl Bank {
pub fn get_epoch_and_slot_index(&self, slot: u64) -> (u64, u64) {
self.epoch_schedule.get_epoch_and_slot_index(slot)
}

pub fn is_votable(&self) -> bool {
let max_tick_height = (self.slot + 1) * self.ticks_per_slot - 1;
self.is_delta.load(Ordering::Relaxed) && self.tick_height() == max_tick_height
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pub function (called from another crate) needs tests and a doc comment. At first glance, it looks like this implementation can be replaced with:

pub is_votable(&self) -> bool {
   self.is_frozen() && !self.accounts.is_empty()
}

but no way to find out from within the solana_runtime crate. The test up in ReplayStage only tests one case, but this function alone has several edge cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those are not equivalent, is_delta can be false when !self.accounts.is_empty() is true (f your parent bank is nonempty, the child bank will always be nonempty as well, even if the child processes no transactions). We check self.tick_height() == max_tick_height instead of is_frozen because @rob-solana pointed out we may want to check if a bank is votable before freezing it.

Copy link
Contributor Author

@carllin carllin Mar 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, which edge cases were you seeing? The one I can think of is that a bank that processes only ticks has self.is_delta == false, and that processing a tx sets it to true

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carllin there definitely should be local tests. cargo test --manifest-path runtime/Cargo.toml should cover everything in here

@garious this check is conservative and so shouldn't have edge cases. once tick_height() is max, no commit() will be called, and commit() is where we move to "is_delta".

the reason this check is down here is because otherwise, its logic would have to be duplicated in the TVU and in the TPU (which talk to the bank at different levels)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use HashQueue::hash_height()? I don't really want an answer as much as I want a test to answer on your behalf.

}
}

#[cfg(test)]
Expand Down Expand Up @@ -1598,5 +1609,4 @@ mod tests {
assert!(last_slots_in_epoch == slots_per_epoch);
}
}

}