Skip to content

Commit

Permalink
Don't vote for empty leader transmissions (solana-labs#3248)
Browse files Browse the repository at this point in the history
* Don't vote for empty leader transmissions

* Add is_delta flag to bank to detect empty leader transmissions

* Plumb new is_votable flag through replay stage

* Fix PohRecorder tests

* Change is_delta to AtomicBool to avoid making Bank references mutable

* Reset start slot in poh_recorder when working bank is cleared, so that connsecutive TPU's will start from the correct place

* Use proper max tick height calculation

* Test for not voting on empty transmission

* tests for is_votable
  • Loading branch information
carllin committed Mar 13, 2019
1 parent 28aff96 commit a437a16
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 49 deletions.
70 changes: 34 additions & 36 deletions core/src/poh_recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,6 @@ impl PohRecorder {
// synchronize PoH with a bank
pub fn reset(&mut self, tick_height: u64, blockhash: Hash, start_slot: u64) {
self.clear_bank();
let existing = self.tick_cache.iter().any(|(entry, entry_tick_height)| {
if entry.hash == blockhash {
assert_eq!(*entry_tick_height, tick_height);
}
entry.hash == blockhash
});
if existing {
info!(
"reset skipped for: {},{}",
self.poh.hash, self.poh.tick_height
);
return;
}
let mut cache = vec![];
info!(
"reset poh from: {},{} to: {},{}",
Expand Down Expand Up @@ -159,6 +146,7 @@ impl PohRecorder {
"poh_record: max_tick_height reached, setting working bank {} to None",
working_bank.bank.slot()
);
self.start_slot = working_bank.max_tick_height / working_bank.bank.ticks_per_slot();
self.clear_bank();
}
if e.is_err() {
Expand Down Expand Up @@ -461,7 +449,7 @@ mod tests {
poh_recorder.tick();
assert_eq!(poh_recorder.tick_cache.len(), 2);
poh_recorder.reset(poh_recorder.poh.tick_height, poh_recorder.poh.hash, 0);
assert_eq!(poh_recorder.tick_cache.len(), 2);
assert_eq!(poh_recorder.tick_cache.len(), 0);
}

#[test]
Expand All @@ -475,28 +463,7 @@ mod tests {
poh_recorder.tick_cache[0].0.hash,
0,
);
assert_eq!(poh_recorder.tick_cache.len(), 2);
poh_recorder.reset(
poh_recorder.tick_cache[1].1,
poh_recorder.tick_cache[1].0.hash,
0,
);
assert_eq!(poh_recorder.tick_cache.len(), 2);
}

#[test]
#[should_panic]
fn test_reset_with_cached_bad_height() {
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0);
poh_recorder.tick();
poh_recorder.tick();
assert_eq!(poh_recorder.tick_cache.len(), 2);
//mixed up heights
poh_recorder.reset(
poh_recorder.tick_cache[0].1,
poh_recorder.tick_cache[1].0.hash,
0,
);
assert_eq!(poh_recorder.tick_cache.len(), 0);
}

#[test]
Expand Down Expand Up @@ -539,4 +506,35 @@ mod tests {
poh_recorder.clear_bank();
assert!(receiver.try_recv().is_ok());
}

#[test]
fn test_poh_recorder_reset_start_slot() {
let ticks_per_slot = 5;
let (mut genesis_block, _mint_keypair) = GenesisBlock::new(2);
genesis_block.ticks_per_slot = ticks_per_slot;
let bank = Arc::new(Bank::new(&genesis_block));

let prev_hash = bank.last_blockhash();
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash, 0);

let end_slot = 3;
let max_tick_height = (end_slot + 1) * ticks_per_slot - 1;
let working_bank = WorkingBank {
bank,
min_tick_height: 1,
max_tick_height,
};

poh_recorder.set_working_bank(working_bank);
for _ in 0..max_tick_height {
poh_recorder.tick();
}

let tx = test_tx();
let h1 = hash(b"hello world!");
assert!(poh_recorder.record(h1, vec![tx.clone()]).is_err());
assert!(poh_recorder.working_bank.is_none());
// Make sure the starting slot is updated
assert_eq!(poh_recorder.start_slot(), end_slot);
}
}
72 changes: 62 additions & 10 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use solana_sdk::timing::duration_as_ms;
use solana_vote_api::vote_transaction::VoteTransaction;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
Expand Down Expand Up @@ -108,15 +108,13 @@ impl ReplayStage {
}
let max_tick_height = (*bank_slot + 1) * bank.ticks_per_slot() - 1;
if bank.tick_height() == max_tick_height {
bank.freeze();
info!("bank frozen {}", bank.slot());
progress.remove(bank_slot);
if let Err(e) =
slot_full_sender.send((bank.slot(), bank.collector_id()))
{
info!("{} slot_full alert failed: {:?}", my_id, e);
}
votable.push(bank);
Self::process_completed_bank(
&my_id,
bank,
&mut progress,
&mut votable,
&slot_full_sender,
);
}
}

Expand Down Expand Up @@ -315,6 +313,24 @@ impl ReplayStage {
Ok(())
}

fn process_completed_bank(
my_id: &Pubkey,
bank: Arc<Bank>,
progress: &mut HashMap<u64, (Hash, usize)>,
votable: &mut Vec<Arc<Bank>>,
slot_full_sender: &Sender<(u64, Pubkey)>,
) {
bank.freeze();
info!("bank frozen {}", bank.slot());
progress.remove(&bank.slot());
if let Err(e) = slot_full_sender.send((bank.slot(), bank.collector_id())) {
info!("{} slot_full alert failed: {:?}", my_id, e);
}
if bank.is_votable() {
votable.push(bank);
}
}

fn generate_new_bank_forks(blocktree: &Blocktree, forks: &mut BankForks) {
// Find the next slot that chains to the old slot
let frozen_banks = forks.frozen_banks();
Expand Down Expand Up @@ -439,6 +455,42 @@ mod test {
let _ignored = remove_dir_all(&my_ledger_path);
}

#[test]
fn test_no_vote_empty_transmission() {
let genesis_block = GenesisBlock::new(10_000).0;
let bank = Arc::new(Bank::new(&genesis_block));
let mut blockhash = bank.last_blockhash();
let mut entries = Vec::new();
for _ in 0..genesis_block.ticks_per_slot {
let entry = next_entry_mut(&mut blockhash, 1, vec![]); //just ticks
entries.push(entry);
}
let (sender, _receiver) = channel();

let mut progress = HashMap::new();
let (forward_entry_sender, _forward_entry_receiver) = channel();
ReplayStage::replay_entries_into_bank(
&bank,
entries.clone(),
&mut progress,
&forward_entry_sender,
0,
)
.unwrap();

let mut votable = vec![];
ReplayStage::process_completed_bank(
&Pubkey::default(),
bank,
&mut progress,
&mut votable,
&sender,
);
assert!(progress.is_empty());
// Don't vote on slot that only contained ticks
assert!(votable.is_empty());
}

#[test]
fn test_replay_stage_poh_ok_entry_receiver() {
let (forward_entry_sender, forward_entry_receiver) = channel();
Expand Down
48 changes: 45 additions & 3 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use solana_sdk::transaction::Transaction;
use solana_vote_api::vote_instruction::Vote;
use solana_vote_api::vote_state::{Lockout, VoteState};
use std::result;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::time::Instant;

Expand Down Expand Up @@ -194,6 +194,10 @@ pub struct Bank {
/// staked nodes on epoch boundaries, saved off when a bank.slot() is at
/// a leader schedule boundary
epoch_vote_accounts: HashMap<u64, HashMap<Pubkey, Account>>,

/// A boolean reflecting whether any entries were recorded into the PoH
/// stream for the slot == self.slot
is_delta: AtomicBool,
}

impl Default for HashQueue {
Expand Down Expand Up @@ -226,7 +230,6 @@ impl Bank {
/// Create a new bank that points to an immutable checkpoint of another bank.
pub fn new_from_parent(parent: &Arc<Bank>, collector_id: &Pubkey, slot: u64) -> Self {
parent.freeze();

assert_ne!(slot, parent.slot());

let mut bank = Self::default();
Expand Down Expand Up @@ -685,6 +688,9 @@ impl Bank {
if self.is_frozen() {
warn!("=========== FIXME: commit_transactions() working on a frozen bank! ================");
}

self.is_delta.store(true, Ordering::Relaxed);

// TODO: put this assert back in
// assert!(!self.is_frozen());
let now = Instant::now();
Expand Down Expand Up @@ -850,7 +856,6 @@ impl Bank {
// tick_height is using an AtomicUSize because AtomicU64 is not yet a stable API.
// Until we can switch to AtomicU64, fail if usize is not the same as u64
assert_eq!(std::usize::MAX, 0xFFFF_FFFF_FFFF_FFFF);

self.tick_height.load(Ordering::SeqCst) as u64
}

Expand Down Expand Up @@ -883,6 +888,11 @@ impl Bank {
pub fn get_epoch_and_slot_index(&self, slot: u64) -> (u64, u64) {
self.epoch_schedule.get_epoch_and_slot_index(slot)
}

pub fn is_votable(&self) -> bool {
let max_tick_height = (self.slot + 1) * self.ticks_per_slot - 1;
self.is_delta.load(Ordering::Relaxed) && self.tick_height() == max_tick_height
}
}

#[cfg(test)]
Expand All @@ -891,6 +901,7 @@ mod tests {
use bincode::serialize;
use hashbrown::HashSet;
use solana_sdk::genesis_block::{GenesisBlock, BOOTSTRAP_LEADER_LAMPORTS};
use solana_sdk::hash;
use solana_sdk::native_program::ProgramError;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_instruction::SystemInstruction;
Expand Down Expand Up @@ -1666,4 +1677,35 @@ mod tests {
}
}

#[test]
fn test_is_delta_true() {
let (genesis_block, mint_keypair) = GenesisBlock::new(500);
let bank = Arc::new(Bank::new(&genesis_block));
let key1 = Keypair::new();
let tx_move_mint_to_1 =
SystemTransaction::new_move(&mint_keypair, &key1.pubkey(), 1, genesis_block.hash(), 0);
assert_eq!(bank.process_transaction(&tx_move_mint_to_1), Ok(()));
assert_eq!(bank.is_delta.load(Ordering::Relaxed), true);
}

#[test]
fn test_is_votable() {
let (genesis_block, mint_keypair) = GenesisBlock::new(500);
let bank = Arc::new(Bank::new(&genesis_block));
let key1 = Keypair::new();
assert_eq!(bank.is_votable(), false);

// Set is_delta to true
let tx_move_mint_to_1 =
SystemTransaction::new_move(&mint_keypair, &key1.pubkey(), 1, genesis_block.hash(), 0);
assert_eq!(bank.process_transaction(&tx_move_mint_to_1), Ok(()));
assert_eq!(bank.is_votable(), false);

// Register enough ticks to hit max tick height
for i in 0..genesis_block.ticks_per_slot - 1 {
bank.register_tick(&hash::hash(format!("hello world {}", i).as_bytes()));
}

assert_eq!(bank.is_votable(), true);
}
}

0 comments on commit a437a16

Please sign in to comment.