Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup banking bench #24851

Merged
merged 2 commits into from
May 2, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 108 additions & 86 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use {
leader_schedule_cache::LeaderScheduleCache,
},
solana_measure::measure::Measure,
solana_perf::packet::to_packet_batches,
solana_perf::packet::{to_packet_batches, PacketBatch},
solana_poh::poh_recorder::{create_test_recorder, PohRecorder, WorkingBankEntry},
solana_runtime::{
accounts_background_service::AbsRequestSender, bank::Bank, bank_forks::BankForks,
Expand Down Expand Up @@ -55,7 +55,6 @@ fn check_txs(
break;
}
if poh_recorder.lock().unwrap().bank().is_none() {
trace!("no bank");
no_bank = true;
break;
}
Expand Down Expand Up @@ -121,18 +120,44 @@ fn make_accounts_txs(
.collect()
}

struct Config {
struct PacketsPerIteration {
packet_batches: Vec<PacketBatch>,
transactions: Vec<Transaction>,
packets_per_batch: usize,
}

impl Config {
fn get_transactions_index(&self, chunk_index: usize) -> usize {
chunk_index * self.packets_per_batch
impl PacketsPerIteration {
fn new(
packets_per_batch: usize,
batches_per_iteration: usize,
genesis_hash: Hash,
write_lock_contention: WriteLockContention,
) -> Self {
let total_num_transactions = packets_per_batch * batches_per_iteration;
let transactions = make_accounts_txs(
total_num_transactions,
packets_per_batch,
genesis_hash,
write_lock_contention,
);

let packet_batches: Vec<PacketBatch> = to_packet_batches(&transactions, packets_per_batch);
assert_eq!(packet_batches.len(), batches_per_iteration);
Self {
packet_batches,
transactions,
packets_per_batch,
}
}
}

fn bytes_as_usize(bytes: &[u8]) -> usize {
bytes[0] as usize | (bytes[1] as usize) << 8
fn refresh_blockhash(&mut self, new_blockhash: Hash) {
for tx in self.transactions.iter_mut() {
tx.message.recent_blockhash = new_blockhash;
let sig: Vec<u8> = (0..64).map(|_| thread_rng().gen::<u8>()).collect();
tx.signatures[0] = Signature::new(&sig[0..64]);
}
self.packet_batches = to_packet_batches(&self.transactions, self.packets_per_batch);
}
}

#[allow(clippy::cognitive_complexity)]
Expand All @@ -142,6 +167,12 @@ fn main() {
let matches = Command::new(crate_name!())
.about(crate_description!())
.version(solana_version::version!())
.arg(
Arg::new("iterations")
.long("iterations")
.takes_value(true)
.help("Number of test iterations"),
)
.arg(
Arg::new("num_chunks")
.long("num-chunks")
Expand Down Expand Up @@ -169,12 +200,6 @@ fn main() {
.possible_values(WriteLockContention::possible_values())
.help("Accounts that test transactions write lock"),
)
.arg(
Arg::new("iterations")
.long("iterations")
.takes_value(true)
.help("Number of iterations"),
)
.arg(
Arg::new("batches_per_iteration")
.long("batches-per-iteration")
Expand Down Expand Up @@ -205,7 +230,6 @@ fn main() {
.value_of_t::<WriteLockContention>("write_lock_contention")
.unwrap_or(WriteLockContention::None);

let total_num_transactions = num_chunks * packets_per_batch * batches_per_iteration;
let mint_total = 1_000_000_000_000;
let GenesisConfigInfo {
genesis_config,
Expand All @@ -226,55 +250,72 @@ fn main() {
.unwrap()
.set_limits(std::u64::MAX, std::u64::MAX, std::u64::MAX);

let mut all_packets: Vec<PacketsPerIteration> = std::iter::from_fn(|| {
Some(PacketsPerIteration::new(
packets_per_batch,
batches_per_iteration,
genesis_config.hash(),
write_lock_contention,
))
})
.take(num_chunks)
.collect();

// fund all the accounts
let total_num_transactions: u64 = all_packets
.iter()
.map(|packets_for_single_iteration| packets_for_single_iteration.transactions.len() as u64)
.sum();
info!(
"threads: {} txs: {}",
num_banking_threads, total_num_transactions
);

let mut transactions = make_accounts_txs(
total_num_transactions,
packets_per_batch,
genesis_config.hash(),
write_lock_contention,
);

// fund all the accounts
transactions.iter().for_each(|tx| {
let mut fund = system_transaction::transfer(
&mint_keypair,
&tx.message.account_keys[0],
mint_total / total_num_transactions as u64,
genesis_config.hash(),
);
// Ignore any pesky duplicate signature errors in the case we are using single-payer
let sig: Vec<u8> = (0..64).map(|_| thread_rng().gen::<u8>()).collect();
fund.signatures = vec![Signature::new(&sig[0..64])];
let x = bank.process_transaction(&fund);
x.unwrap();
all_packets.iter().for_each(|packets_for_single_iteration| {
packets_for_single_iteration
.transactions
.iter()
.for_each(|tx| {
let mut fund = system_transaction::transfer(
&mint_keypair,
&tx.message.account_keys[0],
mint_total / total_num_transactions,
genesis_config.hash(),
);
// Ignore any pesky duplicate signature errors in the case we are using single-payer
let sig: Vec<u8> = (0..64).map(|_| thread_rng().gen::<u8>()).collect();
fund.signatures = vec![Signature::new(&sig[0..64])];
bank.process_transaction(&fund).unwrap();
});
});

let skip_sanity = matches.is_present("skip_sanity");
if !skip_sanity {
//sanity check, make sure all the transactions can execute sequentially
transactions.iter().for_each(|tx| {
let res = bank.process_transaction(tx);
assert!(res.is_ok(), "sanity test transactions error: {:?}", res);
all_packets.iter().for_each(|packets_for_single_iteration| {
//sanity check, make sure all the transactions can execute sequentially
packets_for_single_iteration
.transactions
.iter()
.for_each(|tx| {
let res = bank.process_transaction(tx);
assert!(res.is_ok(), "sanity test transactions error: {:?}", res);
});
});
bank.clear_signatures();

if write_lock_contention == WriteLockContention::None {
//sanity check, make sure all the transactions can execute in parallel
let res = bank.process_transactions(transactions.iter());
for r in res {
assert!(r.is_ok(), "sanity parallel execution error: {:?}", r);
}
bank.clear_signatures();
all_packets.iter().for_each(|packets_for_single_iteration| {
//sanity check, make sure all the transactions can execute in parallel
let res =
bank.process_transactions(packets_for_single_iteration.transactions.iter());
for r in res {
assert!(r.is_ok(), "sanity parallel execution error: {:?}", r);
}
bank.clear_signatures();
});
}
}

let mut verified: Vec<_> = to_packet_batches(&transactions, packets_per_batch);
assert_eq!(verified.len(), num_chunks * batches_per_iteration);

let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(
Expand Down Expand Up @@ -306,9 +347,6 @@ fn main() {
);
poh_recorder.lock().unwrap().set_bank(&bank);

let chunk_len = batches_per_iteration;
let mut start = 0;

// This is so that the signal_receiver does not go out of scope after the closure.
// If it is dropped before poh_service, then poh_service will error when
// calling send() on the channel.
Expand All @@ -319,36 +357,26 @@ fn main() {
let mut txs_processed = 0;
let mut root = 1;
let collector = solana_sdk::pubkey::new_rand();
let config = Config { packets_per_batch };
let mut total_sent = 0;
for _ in 0..iterations {
for current_iteration_index in 0..iterations {
trace!("RUNNING ITERATION {}", current_iteration_index);
let now = Instant::now();
let mut sent = 0;

for (i, v) in verified[start..start + chunk_len].chunks(1).enumerate() {
let mut byte = 0;
let index = config.get_transactions_index(start + i);
if index < transactions.len() {
byte = bytes_as_usize(transactions[index].signatures[0].as_ref());
}
let packets_for_this_iteration = &all_packets[current_iteration_index % num_chunks];
for (packet_batch_index, packet_batch) in
packets_for_this_iteration.packet_batches.iter().enumerate()
{
sent += packet_batch.packets.len();
trace!(
"sending... {}..{} {} v.len: {} sig: {} transactions.len: {} index: {}",
start + i,
start + chunk_len,
"Sending PacketBatch index {}, {}",
packet_batch_index,
timestamp(),
v.len(),
byte,
transactions.len(),
index,
);
for xv in v {
sent += xv.packets.len();
}
verified_sender.send(v.to_vec()).unwrap();
verified_sender.send(vec![packet_batch.clone()]).unwrap();
}
let start_tx_index = config.get_transactions_index(start);
let end_tx_index = config.get_transactions_index(start + chunk_len);
for tx in &transactions[start_tx_index..end_tx_index] {

for tx in &packets_for_this_iteration.transactions {
loop {
if bank.get_signature_status(&tx.signatures[0]).is_some() {
break;
Expand All @@ -361,7 +389,7 @@ fn main() {
}
if check_txs(
&signal_receiver,
total_num_transactions / num_chunks,
packets_for_this_iteration.transactions.len(),
&poh_recorder,
) {
debug!(
Expand All @@ -370,7 +398,6 @@ fn main() {
bank.transaction_count(),
txs_processed
);
assert!(txs_processed < bank.transaction_count());
txs_processed = bank.transaction_count();
tx_total_us += duration_as_us(&now.elapsed());

Expand Down Expand Up @@ -422,22 +449,17 @@ fn main() {
debug!(
"time: {} us checked: {} sent: {}",
duration_as_us(&now.elapsed()),
total_num_transactions / num_chunks,
total_num_transactions / num_chunks as u64,
sent,
);
total_sent += sent;

if bank.slot() > 0 && bank.slot() % 16 == 0 {
for tx in transactions.iter_mut() {
tx.message.recent_blockhash = bank.last_blockhash();
let sig: Vec<u8> = (0..64).map(|_| thread_rng().gen::<u8>()).collect();
tx.signatures[0] = Signature::new(&sig[0..64]);
if current_iteration_index % 16 == 0 {
let last_blockhash = bank.last_blockhash();
for packets_for_single_iteration in all_packets.iter_mut() {
packets_for_single_iteration.refresh_blockhash(last_blockhash);
}
verified = to_packet_batches(&transactions.clone(), packets_per_batch);
}

start += chunk_len;
start %= verified.len();
}
let txs_processed = bank_forks.working_bank().transaction_count();
debug!("processed: {} base: {}", txs_processed, base_tx_count);
Expand Down