Skip to content

Commit

Permalink
Wire unified scheduler into banking experimentally
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Dec 6, 2024
1 parent 744db01 commit ca24a20
Show file tree
Hide file tree
Showing 48 changed files with 2,285 additions and 649 deletions.
71 changes: 68 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,13 @@ curve25519-dalek = { version = "4.1.3", features = ["digest", "rand_core"] }
dashmap = "5.5.3"
derivation-path = { version = "0.2.0", default-features = false }
derive-where = "1.2.7"
derive_more = { version = "1.0.0", features = ["full"] }
dialoguer = "0.10.4"
digest = "0.10.7"
dir-diff = "0.3.3"
dirs-next = "2.0.0"
dlopen2 = "0.5.0"
dyn-clone = "1.0.17"
eager = "0.1.0"
ed25519-dalek = "=1.0.1"
ed25519-dalek-bip32 = "0.2.0"
Expand Down Expand Up @@ -629,6 +631,7 @@ tokio-util = "0.7"
toml = "0.8.12"
tonic = "0.9.2"
tonic-build = "0.9.2"
trait-set = "0.3.0"
trees = "0.4.2"
tungstenite = "0.20.1"
uriparse = "0.6.4"
Expand Down
4 changes: 3 additions & 1 deletion banking-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ license = { workspace = true }
edition = { workspace = true }

[dependencies]
assert_matches = { workspace = true }
clap = { version = "3.1.8", features = ["derive", "cargo"] }
crossbeam-channel = { workspace = true }
log = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
solana-client = { workspace = true }
solana-core = { workspace = true }
solana-core = { workspace = true, features = ["dev-context-only-utils"] }
solana-gossip = { workspace = true }
solana-ledger = { workspace = true }
solana-logger = { workspace = true }
Expand All @@ -26,6 +27,7 @@ solana-runtime = { workspace = true, features = ["dev-context-only-utils"] }
solana-sdk = { workspace = true }
solana-streamer = { workspace = true }
solana-tpu-client = { workspace = true }
solana-unified-scheduler-pool = { workspace = true }
solana-version = { workspace = true }

[features]
Expand Down
89 changes: 66 additions & 23 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
#![allow(clippy::arithmetic_side_effects)]
use {
assert_matches::assert_matches,
clap::{crate_description, crate_name, Arg, ArgEnum, Command},
crossbeam_channel::{unbounded, Receiver},
log::*,
rand::{thread_rng, Rng},
rayon::prelude::*,
solana_client::connection_cache::ConnectionCache,
solana_core::{
banking_stage::BankingStage,
banking_trace::{BankingPacketBatch, BankingTracer, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT},
banking_stage::{update_bank_forks_and_poh_recorder_for_new_tpu_bank, BankingStage},
banking_trace::{
BankingPacketBatch, BankingTracer, Channels, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT,
},
validator::BlockProductionMethod,
},
solana_gossip::cluster_info::{ClusterInfo, Node},
Expand All @@ -29,13 +32,15 @@ use {
hash::Hash,
message::Message,
pubkey::{self, Pubkey},
scheduling::SchedulingMode,
signature::{Keypair, Signature, Signer},
system_instruction, system_transaction,
timing::timestamp,
transaction::Transaction,
},
solana_streamer::socket::SocketAddrSpace,
solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE,
solana_unified_scheduler_pool::{DefaultSchedulerPool, SupportedSchedulingMode},
std::{
sync::{atomic::Ordering, Arc, RwLock},
thread::sleep,
Expand Down Expand Up @@ -347,7 +352,7 @@ fn main() {
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let bank0 = Bank::new_for_benches(&genesis_config);
let bank_forks = BankForks::new_rw_arc(bank0);
let mut bank = bank_forks.read().unwrap().working_bank();
let mut bank = bank_forks.read().unwrap().working_bank_with_scheduler();

// set cost tracker limits to MAX so it will not filter out TXs
bank.write_cost_tracker()
Expand Down Expand Up @@ -440,9 +445,36 @@ fn main() {
BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT,
)))
.unwrap();
let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote();
let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote();
let (gossip_vote_sender, gossip_vote_receiver) = banking_tracer.create_channel_gossip_vote();
let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let scheduler_pool = if matches!(
block_production_method,
BlockProductionMethod::UnifiedScheduler
) {
let pool = DefaultSchedulerPool::new(
SupportedSchedulingMode::Either(SchedulingMode::BlockProduction),
None,
None,
None,
Some(replay_vote_sender.clone()),
prioritization_fee_cache.clone(),
poh_recorder.read().unwrap().new_recorder(),
);
bank_forks
.write()
.unwrap()
.install_scheduler_pool(pool.clone());
Some(pool)
} else {
None
};
let Channels {
non_vote_sender,
non_vote_receiver,
tpu_vote_sender,
tpu_vote_receiver,
gossip_vote_sender,
gossip_vote_receiver,
} = banking_tracer.create_channels(scheduler_pool.as_ref());
let cluster_info = {
let keypair = Arc::new(Keypair::new());
let node = Node::new_localhost_with_pubkey(&keypair.pubkey());
Expand All @@ -461,7 +493,7 @@ fn main() {
),
};
let banking_stage = BankingStage::new_num_threads(
block_production_method,
block_production_method.clone(),
&cluster_info,
&poh_recorder,
non_vote_receiver,
Expand All @@ -473,10 +505,23 @@ fn main() {
None,
Arc::new(connection_cache),
bank_forks.clone(),
&Arc::new(PrioritizationFeeCache::new(0u64)),
&prioritization_fee_cache,
false,
scheduler_pool,
);

// This bench processes transactions, starting from the very first bank, so special-casing is
// needed for unified scheduler.
if matches!(
block_production_method,
BlockProductionMethod::UnifiedScheduler
) {
bank = bank_forks
.write()
.unwrap()
.reinstall_block_production_scheduler_into_working_genesis_bank();
}

// This is so that the signal_receiver does not go out of scope after the closure.
// If it is dropped before poh_service, then poh_service will error when
// calling send() on the channel.
Expand Down Expand Up @@ -537,33 +582,31 @@ fn main() {
tx_total_us += now.elapsed().as_micros() as u64;

let mut poh_time = Measure::start("poh_time");
poh_recorder
let cleared_bank = poh_recorder
.write()
.unwrap()
.reset(bank.clone(), Some((bank.slot(), bank.slot() + 1)));
assert_matches!(cleared_bank, None);
poh_time.stop();

let mut new_bank_time = Measure::start("new_bank");
if let Some((result, _timings)) = bank.wait_for_completed_scheduler() {
assert_matches!(result, Ok(_));
}
let new_slot = bank.slot() + 1;
let new_bank = Bank::new_from_parent(bank, &collector, new_slot);
let new_bank = Bank::new_from_parent(bank.clone(), &collector, new_slot);
new_bank_time.stop();

let mut insert_time = Measure::start("insert_time");
bank_forks.write().unwrap().insert(new_bank);
bank = bank_forks.read().unwrap().working_bank();
update_bank_forks_and_poh_recorder_for_new_tpu_bank(
&bank_forks,
&poh_recorder,
new_bank,
false,
);
bank = bank_forks.read().unwrap().working_bank_with_scheduler();
insert_time.stop();

// set cost tracker limits to MAX so it will not filter out TXs
bank.write_cost_tracker()
.unwrap()
.set_limits(u64::MAX, u64::MAX, u64::MAX);

assert!(poh_recorder.read().unwrap().bank().is_none());
poh_recorder
.write()
.unwrap()
.set_bank_for_test(bank.clone());
assert!(poh_recorder.read().unwrap().bank().is_some());
debug!(
"new_bank_time: {}us insert_time: {}us poh_time: {}us",
new_bank_time.as_us(),
Expand Down
Loading

0 comments on commit ca24a20

Please sign in to comment.