diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 3683e257ed10a8..015ec5360448f9 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -51,6 +51,7 @@ use { solana_measure::measure::Measure, solana_poh::poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, solana_program_runtime::timings::ExecuteTimings, + solana_rayon_threadlimit::get_max_thread_count, solana_rpc::{ optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSenderConfig}, rpc_subscriptions::RpcSubscriptions, @@ -652,16 +653,23 @@ impl ReplayStage { r_bank_forks.get_vote_only_mode_signal(), ) }; + // Thread pool to (maybe) replay multiple threads in parallel let replay_mode = if replay_slots_concurrently { ForkReplayMode::Serial } else { let pool = rayon::ThreadPoolBuilder::new() .num_threads(MAX_CONCURRENT_FORKS_TO_REPLAY) - .thread_name(|i| format!("solReplay{i:02}")) + .thread_name(|i| format!("solReplayFork{i:02}")) .build() .expect("new rayon threadpool"); ForkReplayMode::Parallel(pool) }; + // Thread pool to replay multiple transactions within one block in parallel + let replay_tx_thread_pool = rayon::ThreadPoolBuilder::new() + .num_threads(get_max_thread_count()) + .thread_name(|i| format!("solReplayTx{i:02}")) + .build() + .expect("new rayon threadpool"); Self::reset_poh_recorder( &my_pubkey, @@ -724,6 +732,7 @@ impl ReplayStage { &mut replay_timing, log_messages_bytes_limit, &replay_mode, + &replay_tx_thread_pool, &prioritization_fee_cache, &mut purge_repair_slot_counter, ); @@ -2136,6 +2145,7 @@ impl ReplayStage { fn replay_blockstore_into_bank( bank: &BankWithScheduler, blockstore: &Blockstore, + replay_tx_thread_pool: &ThreadPool, replay_stats: &RwLock, replay_progress: &RwLock, transaction_status_sender: Option<&TransactionStatusSender>, @@ -2154,6 +2164,7 @@ impl ReplayStage { blockstore_processor::confirm_slot( blockstore, bank, + replay_tx_thread_pool, &mut w_replay_stats, &mut w_replay_progress, false, @@ -2712,7 +2723,8 @@ impl ReplayStage { fn replay_active_banks_concurrently( blockstore: &Blockstore, bank_forks: &RwLock, - thread_pool: &ThreadPool, + fork_thread_pool: &ThreadPool, + replay_tx_thread_pool: &ThreadPool, my_pubkey: &Pubkey, vote_account: &Pubkey, progress: &mut ProgressMap, @@ -2730,7 +2742,7 @@ impl ReplayStage { let longest_replay_time_us = AtomicU64::new(0); // Allow for concurrent replaying of slots from different forks. - let replay_result_vec: Vec = thread_pool.install(|| { + let replay_result_vec: Vec = fork_thread_pool.install(|| { active_bank_slots .into_par_iter() .map(|bank_slot| { @@ -2744,7 +2756,7 @@ impl ReplayStage { trace!( "Replay active bank: slot {}, thread_idx {}", bank_slot, - thread_pool.current_thread_index().unwrap_or_default() + fork_thread_pool.current_thread_index().unwrap_or_default() ); let mut progress_lock = progress.write().unwrap(); if progress_lock @@ -2797,6 +2809,7 @@ impl ReplayStage { let blockstore_result = Self::replay_blockstore_into_bank( &bank, blockstore, + replay_tx_thread_pool, &replay_stats, &replay_progress, transaction_status_sender, @@ -2826,6 +2839,7 @@ impl ReplayStage { fn replay_active_bank( blockstore: &Blockstore, bank_forks: &RwLock, + replay_tx_thread_pool: &ThreadPool, my_pubkey: &Pubkey, vote_account: &Pubkey, progress: &mut ProgressMap, @@ -2884,6 +2898,7 @@ impl ReplayStage { let blockstore_result = Self::replay_blockstore_into_bank( &bank, blockstore, + replay_tx_thread_pool, &bank_progress.replay_stats, &bank_progress.replay_progress, transaction_status_sender, @@ -3183,6 +3198,7 @@ impl ReplayStage { replay_timing: &mut ReplayLoopTiming, log_messages_bytes_limit: Option, replay_mode: &ForkReplayMode, + replay_tx_thread_pool: &ThreadPool, prioritization_fee_cache: &PrioritizationFeeCache, purge_repair_slot_counter: &mut PurgeRepairSlotCounter, ) -> bool /* completed a bank */ { @@ -3199,11 +3215,12 @@ impl ReplayStage { let replay_result_vec = match replay_mode { // Skip the overhead of the threadpool if there is only one bank to play - ForkReplayMode::Parallel(thread_pool) if num_active_banks > 1 => { + ForkReplayMode::Parallel(fork_thread_pool) if num_active_banks > 1 => { Self::replay_active_banks_concurrently( blockstore, bank_forks, - thread_pool, + fork_thread_pool, + replay_tx_thread_pool, my_pubkey, vote_account, progress, @@ -3223,6 +3240,7 @@ impl ReplayStage { Self::replay_active_bank( blockstore, bank_forks, + replay_tx_thread_pool, my_pubkey, vote_account, progress, @@ -5034,9 +5052,15 @@ pub(crate) mod tests { blockstore.insert_shreds(shreds, None, false).unwrap(); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let exit = Arc::new(AtomicBool::new(false)); + let replay_tx_thread_pool = rayon::ThreadPoolBuilder::new() + .num_threads(1) + .thread_name(|i| format!("solReplayTest{i:02}")) + .build() + .expect("new rayon threadpool"); let res = ReplayStage::replay_blockstore_into_bank( &bank1, &blockstore, + &replay_tx_thread_pool, &bank1_progress.replay_stats, &bank1_progress.replay_progress, None, diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index e4ae5f368b2afd..a76387f7cb2054 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -89,16 +89,6 @@ struct ReplayEntry { starting_index: usize, } -// get_max_thread_count to match number of threads in the old code. -// see: https://github.com/solana-labs/solana/pull/24853 -lazy_static! { - static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new() - .num_threads(get_max_thread_count()) - .thread_name(|i| format!("solBstoreProc{i:02}")) - .build() - .unwrap(); -} - fn first_err(results: &[Result<()>]) -> Result<()> { for r in results { if r.is_err() { @@ -139,6 +129,14 @@ fn get_first_error( first_err } +fn create_thread_pool(num_threads: usize) -> ThreadPool { + rayon::ThreadPoolBuilder::new() + .num_threads(num_threads) + .thread_name(|i| format!("solReplayTx{i:02}")) + .build() + .expect("new rayon threadpool") +} + pub fn execute_batch( batch: &TransactionBatchWithIndexes, bank: &Arc, @@ -242,6 +240,7 @@ impl ExecuteBatchesInternalMetrics { fn execute_batches_internal( bank: &Arc, + replay_tx_thread_pool: &ThreadPool, batches: &[TransactionBatchWithIndexes], transaction_status_sender: Option<&TransactionStatusSender>, replay_vote_sender: Option<&ReplayVoteSender>, @@ -253,7 +252,7 @@ fn execute_batches_internal( Mutex::new(HashMap::new()); let mut execute_batches_elapsed = Measure::start("execute_batches_elapsed"); - let results: Vec> = PAR_THREAD_POOL.install(|| { + let results: Vec> = replay_tx_thread_pool.install(|| { batches .into_par_iter() .map(|transaction_batch| { @@ -275,7 +274,7 @@ fn execute_batches_internal( "execute_batch", ); - let thread_index = PAR_THREAD_POOL.current_thread_index().unwrap(); + let thread_index = replay_tx_thread_pool.current_thread_index().unwrap(); execution_timings_per_thread .lock() .unwrap() @@ -324,6 +323,7 @@ fn execute_batches_internal( // invocation). fn process_batches( bank: &BankWithScheduler, + replay_tx_thread_pool: &ThreadPool, batches: &[TransactionBatchWithIndexes], transaction_status_sender: Option<&TransactionStatusSender>, replay_vote_sender: Option<&ReplayVoteSender>, @@ -348,6 +348,7 @@ fn process_batches( ); rebatch_and_execute_batches( bank, + replay_tx_thread_pool, batches, transaction_status_sender, replay_vote_sender, @@ -398,6 +399,7 @@ fn rebatch_transactions<'a>( fn rebatch_and_execute_batches( bank: &Arc, + replay_tx_thread_pool: &ThreadPool, batches: &[TransactionBatchWithIndexes], transaction_status_sender: Option<&TransactionStatusSender>, replay_vote_sender: Option<&ReplayVoteSender>, @@ -481,6 +483,7 @@ fn rebatch_and_execute_batches( let execute_batches_internal_metrics = execute_batches_internal( bank, + replay_tx_thread_pool, rebatched_txs, transaction_status_sender, replay_vote_sender, @@ -506,6 +509,7 @@ pub fn process_entries_for_tests( transaction_status_sender: Option<&TransactionStatusSender>, replay_vote_sender: Option<&ReplayVoteSender>, ) -> Result<()> { + let replay_tx_thread_pool = create_thread_pool(1); let verify_transaction = { let bank = bank.clone_with_scheduler(); move |versioned_tx: VersionedTransaction| -> Result { @@ -533,6 +537,7 @@ pub fn process_entries_for_tests( let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64); let result = process_entries( bank, + &replay_tx_thread_pool, &mut replay_entries, transaction_status_sender, replay_vote_sender, @@ -547,6 +552,7 @@ pub fn process_entries_for_tests( fn process_entries( bank: &BankWithScheduler, + replay_tx_thread_pool: &ThreadPool, entries: &mut [ReplayEntry], transaction_status_sender: Option<&TransactionStatusSender>, replay_vote_sender: Option<&ReplayVoteSender>, @@ -572,6 +578,7 @@ fn process_entries( // execute the group and register the tick process_batches( bank, + replay_tx_thread_pool, &batches, transaction_status_sender, replay_vote_sender, @@ -625,6 +632,7 @@ fn process_entries( // execute the current queue and try to process this entry again process_batches( bank, + replay_tx_thread_pool, &batches, transaction_status_sender, replay_vote_sender, @@ -640,6 +648,7 @@ fn process_entries( } process_batches( bank, + replay_tx_thread_pool, &batches, transaction_status_sender, replay_vote_sender, @@ -805,6 +814,7 @@ pub(crate) fn process_blockstore_for_bank_0( let bank_forks = BankForks::new_rw_arc(bank0); info!("Processing ledger for slot 0..."); + let replay_tx_thread_pool = create_thread_pool(get_max_thread_count()); process_bank_0( &bank_forks .read() @@ -812,6 +822,7 @@ pub(crate) fn process_blockstore_for_bank_0( .get_with_scheduler(bank0_slot) .unwrap(), blockstore, + &replay_tx_thread_pool, opts, &VerifyRecyclers::default(), cache_block_meta_sender, @@ -871,10 +882,12 @@ pub fn process_blockstore_from_root( .meta(start_slot) .unwrap_or_else(|_| panic!("Failed to get meta for slot {start_slot}")) { + let replay_tx_thread_pool = create_thread_pool(get_max_thread_count()); load_frozen_forks( bank_forks, &start_slot_meta, blockstore, + &replay_tx_thread_pool, leader_schedule_cache, opts, transaction_status_sender, @@ -978,9 +991,11 @@ fn verify_ticks( Ok(()) } +#[allow(clippy::too_many_arguments)] fn confirm_full_slot( blockstore: &Blockstore, bank: &BankWithScheduler, + replay_tx_thread_pool: &ThreadPool, opts: &ProcessOptions, recyclers: &VerifyRecyclers, progress: &mut ConfirmationProgress, @@ -996,6 +1011,7 @@ fn confirm_full_slot( confirm_slot( blockstore, bank, + replay_tx_thread_pool, &mut confirmation_timing, progress, skip_verification, @@ -1142,6 +1158,7 @@ impl ConfirmationProgress { pub fn confirm_slot( blockstore: &Blockstore, bank: &BankWithScheduler, + replay_tx_thread_pool: &ThreadPool, timing: &mut ConfirmationTiming, progress: &mut ConfirmationProgress, skip_verification: bool, @@ -1171,6 +1188,7 @@ pub fn confirm_slot( confirm_slot_entries( bank, + replay_tx_thread_pool, slot_entries_load_result, timing, progress, @@ -1187,6 +1205,7 @@ pub fn confirm_slot( #[allow(clippy::too_many_arguments)] fn confirm_slot_entries( bank: &BankWithScheduler, + replay_tx_thread_pool: &ThreadPool, slot_entries_load_result: (Vec, u64, bool), timing: &mut ConfirmationTiming, progress: &mut ConfirmationProgress, @@ -1328,6 +1347,7 @@ fn confirm_slot_entries( .collect(); let process_result = process_entries( bank, + replay_tx_thread_pool, &mut replay_entries, transaction_status_sender, replay_vote_sender, @@ -1385,6 +1405,7 @@ fn confirm_slot_entries( fn process_bank_0( bank0: &BankWithScheduler, blockstore: &Blockstore, + replay_tx_thread_pool: &ThreadPool, opts: &ProcessOptions, recyclers: &VerifyRecyclers, cache_block_meta_sender: Option<&CacheBlockMetaSender>, @@ -1395,6 +1416,7 @@ fn process_bank_0( confirm_full_slot( blockstore, bank0, + replay_tx_thread_pool, opts, recyclers, &mut progress, @@ -1479,6 +1501,7 @@ fn load_frozen_forks( bank_forks: &RwLock, start_slot_meta: &SlotMeta, blockstore: &Blockstore, + replay_tx_thread_pool: &ThreadPool, leader_schedule_cache: &LeaderScheduleCache, opts: &ProcessOptions, transaction_status_sender: Option<&TransactionStatusSender>, @@ -1566,6 +1589,7 @@ fn load_frozen_forks( if process_single_slot( blockstore, &bank, + replay_tx_thread_pool, opts, &recyclers, &mut progress, @@ -1771,6 +1795,7 @@ fn supermajority_root_from_vote_accounts( fn process_single_slot( blockstore: &Blockstore, bank: &BankWithScheduler, + replay_tx_thread_pool: &ThreadPool, opts: &ProcessOptions, recyclers: &VerifyRecyclers, progress: &mut ConfirmationProgress, @@ -1785,6 +1810,7 @@ fn process_single_slot( confirm_full_slot( blockstore, bank, + replay_tx_thread_pool, opts, recyclers, progress, @@ -3692,7 +3718,16 @@ pub mod tests { ..ProcessOptions::default() }; let recyclers = VerifyRecyclers::default(); - process_bank_0(&bank0, &blockstore, &opts, &recyclers, None, None); + let replay_tx_thread_pool = create_thread_pool(1); + process_bank_0( + &bank0, + &blockstore, + &replay_tx_thread_pool, + &opts, + &recyclers, + None, + None, + ); let bank0_last_blockhash = bank0.last_blockhash(); let bank1 = bank_forks.write().unwrap().insert(Bank::new_from_parent( bank0.clone_without_scheduler(), @@ -3702,6 +3737,7 @@ pub mod tests { confirm_full_slot( &blockstore, &bank1, + &replay_tx_thread_pool, &opts, &recyclers, &mut ConfirmationProgress::new(bank0_last_blockhash), @@ -4342,8 +4378,10 @@ pub mod tests { slot_full: bool, prev_entry_hash: Hash, ) -> result::Result<(), BlockstoreProcessorError> { + let replay_tx_thread_pool = create_thread_pool(1); confirm_slot_entries( &BankWithScheduler::new_without_scheduler(bank.clone()), + &replay_tx_thread_pool, (slot_entries, 0, slot_full), &mut ConfirmationTiming::default(), &mut ConfirmationProgress::new(prev_entry_hash), @@ -4400,6 +4438,7 @@ pub mod tests { let bank = BankWithScheduler::new_without_scheduler( Bank::new_with_bank_forks_for_tests(&genesis_config).0, ); + let replay_tx_thread_pool = create_thread_pool(1); let mut timing = ConfirmationTiming::default(); let mut progress = ConfirmationProgress::new(genesis_hash); let amount = genesis_config.rent.minimum_balance(0); @@ -4436,6 +4475,7 @@ pub mod tests { confirm_slot_entries( &bank, + &replay_tx_thread_pool, (vec![entry], 0, false), &mut timing, &mut progress, @@ -4480,6 +4520,7 @@ pub mod tests { confirm_slot_entries( &bank, + &replay_tx_thread_pool, (vec![entry], 0, false), &mut timing, &mut progress, @@ -4592,10 +4633,12 @@ pub mod tests { transaction_indexes: (0..txs.len()).collect(), }; + let replay_tx_thread_pool = create_thread_pool(1); let mut batch_execution_timing = BatchExecutionTiming::default(); let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64); assert!(process_batches( &bank, + &replay_tx_thread_pool, &[batch_with_indexes], None, None, diff --git a/ledger/src/lib.rs b/ledger/src/lib.rs index 10dd5182717841..5f577e3c938aaf 100644 --- a/ledger/src/lib.rs +++ b/ledger/src/lib.rs @@ -39,8 +39,5 @@ extern crate solana_metrics; #[macro_use] extern crate log; -#[macro_use] -extern crate lazy_static; - #[macro_use] extern crate solana_frozen_abi_macro;