Skip to content

Commit

Permalink
Merge branch 'master' into get-blocks-bench
Browse files Browse the repository at this point in the history
  • Loading branch information
steveluscher authored Dec 9, 2024
2 parents 0985099 + a826581 commit 1ce3917
Showing 1 changed file with 122 additions and 4 deletions.
126 changes: 122 additions & 4 deletions accounts-cluster-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use {
},
solana_cli_config::{ConfigInput, CONFIG_FILE},
solana_client::{
rpc_config::RpcBlockConfig, rpc_request::MAX_GET_CONFIRMED_BLOCKS_RANGE,
transaction_executor::TransactionExecutor,
rpc_client::SerializableTransaction, rpc_config::RpcBlockConfig,
rpc_request::MAX_GET_CONFIRMED_BLOCKS_RANGE, transaction_executor::TransactionExecutor,
},
solana_gossip::gossip_service::discover,
solana_inline_spl::token,
Expand All @@ -25,19 +25,22 @@ use {
message::Message,
program_pack::Pack,
pubkey::Pubkey,
signature::{read_keypair_file, Keypair, Signer},
signature::{read_keypair_file, Keypair, Signature, Signer},
system_instruction, system_program,
transaction::Transaction,
},
solana_streamer::socket::SocketAddrSpace,
solana_transaction_status::UiTransactionEncoding,
spl_token::state::Account,
std::{
cmp::min,
collections::VecDeque,
ops::Deref,
process::exit,
str::FromStr,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Barrier,
Arc, Barrier, RwLock,
},
thread::{sleep, Builder, JoinHandle},
time::{Duration, Instant},
Expand Down Expand Up @@ -156,6 +159,35 @@ struct SeedTracker {
max_closed: Arc<AtomicU64>,
}

#[derive(Clone)]
struct TransactionSignatureTracker(Arc<RwLock<VecDeque<Signature>>>);

impl TransactionSignatureTracker {
fn get_random(&self) -> Option<Signature> {
let signatures = self.read().unwrap();
if signatures.is_empty() {
None
} else {
let random_index = thread_rng().gen_range(0..signatures.len());
let random_signature = signatures.get(random_index);
random_signature.cloned()
}
}
fn track_transactions(&self, transactions: &[Transaction]) {
let mut lock = self.write().unwrap();
for signature in transactions.iter().map(Transaction::get_signature) {
lock.push_back(*signature);
}
}
}

impl Deref for TransactionSignatureTracker {
type Target = Arc<RwLock<VecDeque<Signature>>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}

fn make_create_message(
keypair: &Keypair,
base_keypair: &Keypair,
Expand Down Expand Up @@ -283,6 +315,9 @@ pub enum RpcBench {
Block,
Blocks,
AccountInfo,
Transaction,
TransactionParsed,
FirstAvailableBlock,
}

#[derive(Debug)]
Expand All @@ -298,11 +333,14 @@ impl FromStr for RpcBench {
"account-info" => Ok(RpcBench::AccountInfo),
"block" => Ok(RpcBench::Block),
"blocks" => Ok(RpcBench::Blocks),
"first-available-block" => Ok(RpcBench::FirstAvailableBlock),
"slot" => Ok(RpcBench::Slot),
"supply" => Ok(RpcBench::Supply),
"multiple-accounts" => Ok(RpcBench::MultipleAccounts),
"token-accounts-by-delegate" => Ok(RpcBench::TokenAccountsByDelegate),
"token-accounts-by-owner" => Ok(RpcBench::TokenAccountsByOwner),
"transaction" => Ok(RpcBench::Transaction),
"transaction-parsed" => Ok(RpcBench::TransactionParsed),
"version" => Ok(RpcBench::Version),
_ => Err(RpcParseError::InvalidOption),
}
Expand Down Expand Up @@ -363,6 +401,37 @@ fn process_get_multiple_accounts(
}
}

fn process_get_transaction(
test_name: &'static str,
transaction_signature_tracker: &TransactionSignatureTracker,
client: &RpcClient,
stats: &mut RpcBenchStats,
last_error: &mut Instant,
encoding: UiTransactionEncoding,
) {
let Some(signature) = transaction_signature_tracker.get_random() else {
info!("transaction: No transactions have yet been made; skipping");
return;
};
let mut measure = Measure::start(test_name);
match client.get_transaction(&signature, encoding) {
Ok(_tx) => {
measure.stop();
stats.success += 1;
stats.total_success_time_us += measure.as_us();
}
Err(e) => {
measure.stop();
stats.errors += 1;
stats.total_errors_time_us += measure.as_us();
if last_error.elapsed().as_secs() > 2 {
info!("get_transaction error: {:?}", &e);
*last_error = Instant::now();
}
}
};
}

#[derive(Default)]
struct RpcBenchStats {
errors: u64,
Expand All @@ -383,6 +452,7 @@ fn run_rpc_bench_loop(
max_created: &AtomicU64,
slot_height: &AtomicU64,
mint: &Option<Pubkey>,
transaction_signature_tracker: &TransactionSignatureTracker,
) {
let mut stats = RpcBenchStats::default();
let mut iters = 0;
Expand Down Expand Up @@ -501,6 +571,25 @@ fn run_rpc_bench_loop(
}
}
}
RpcBench::FirstAvailableBlock => {
let mut rpc_time = Measure::start("rpc-get-first-available-block");
match client.get_first_available_block() {
Ok(_slot) => {
rpc_time.stop();
stats.success += 1;
stats.total_success_time_us += rpc_time.as_us();
}
Err(e) => {
rpc_time.stop();
stats.total_errors_time_us += rpc_time.as_us();
stats.errors += 1;
if last_error.elapsed().as_secs() > 2 {
info!("get_first_available_block error: {:?}", e);
last_error = Instant::now();
}
}
}
}
RpcBench::Slot => {
let mut rpc_time = Measure::start("rpc-get-slot");
match client.get_slot() {
Expand Down Expand Up @@ -612,6 +701,26 @@ fn run_rpc_bench_loop(
}
}
}
RpcBench::Transaction => {
process_get_transaction(
"rpc-get-transaction-base64",
transaction_signature_tracker,
client,
&mut stats,
&mut last_error,
UiTransactionEncoding::Base64,
);
}
RpcBench::TransactionParsed => {
process_get_transaction(
"rpc-get-transaction-parsed",
transaction_signature_tracker,
client,
&mut stats,
&mut last_error,
UiTransactionEncoding::JsonParsed,
);
}
RpcBench::Version => {
let mut rpc_time = Measure::start("rpc-get-version");
match client.get_version() {
Expand Down Expand Up @@ -647,6 +756,7 @@ fn make_rpc_bench_threads(
slot_height: &Arc<AtomicU64>,
base_keypair_pubkey: Pubkey,
num_rpc_bench_threads: usize,
transaction_signature_tracker: &TransactionSignatureTracker,
) -> Vec<JoinHandle<()>> {
let program_id = if mint.is_some() {
token::id()
Expand All @@ -663,6 +773,7 @@ fn make_rpc_bench_threads(
let max_closed = seed_tracker.max_closed.clone();
let max_created = seed_tracker.max_created.clone();
let slot_height = slot_height.clone();
let transaction_signature_tracker = transaction_signature_tracker.clone();
let mint = *mint;
Builder::new()
.name(format!("rpc-bench-{}", thread))
Expand All @@ -679,6 +790,7 @@ fn make_rpc_bench_threads(
&max_created,
&slot_height,
&mint,
&transaction_signature_tracker,
)
})
.unwrap()
Expand Down Expand Up @@ -733,6 +845,8 @@ fn run_accounts_bench(
max_created: Arc::new(AtomicU64::default()),
max_closed: Arc::new(AtomicU64::default()),
};
let transaction_signature_tracker =
TransactionSignatureTracker(Arc::new(RwLock::new(VecDeque::with_capacity(5000))));

info!("Starting balance(s): {:?}", balances);

Expand Down Expand Up @@ -773,6 +887,7 @@ fn run_accounts_bench(
&slot_height,
base_keypair_pubkey,
num_rpc_bench_threads,
&transaction_signature_tracker,
)
} else {
Vec::new()
Expand Down Expand Up @@ -836,6 +951,7 @@ fn run_accounts_bench(
.collect();
balances[i] = balances[i].saturating_sub(lamports * txs.len() as u64);
info!("txs: {}", txs.len());
transaction_signature_tracker.track_transactions(&txs);
let new_ids = executor.push_transactions(txs);
info!("ids: {}", new_ids.len());
tx_sent_count += new_ids.len();
Expand Down Expand Up @@ -869,6 +985,7 @@ fn run_accounts_bench(
.collect();
balances[0] = balances[0].saturating_sub(fee * txs.len() as u64);
info!("close txs: {}", txs.len());
transaction_signature_tracker.track_transactions(&txs);
let new_ids = executor.push_transactions(txs);
info!("close ids: {}", new_ids.len());
tx_sent_count += new_ids.len();
Expand Down Expand Up @@ -965,6 +1082,7 @@ fn run_accounts_bench(
.collect();
balances[i] = balances[i].saturating_sub(fee * txs.len() as u64);
info!("close txs: {}", txs.len());
transaction_signature_tracker.track_transactions(&txs);
let new_ids = executor.push_transactions(txs);
info!("close ids: {}", new_ids.len());
tx_sent_count += new_ids.len();
Expand Down

0 comments on commit 1ce3917

Please sign in to comment.