diff --git a/crates/aptos-rosetta/src/test/mod.rs b/crates/aptos-rosetta/src/test/mod.rs index ac5c9428b095c..2f02eeb03c765 100644 --- a/crates/aptos-rosetta/src/test/mod.rs +++ b/crates/aptos-rosetta/src/test/mod.rs @@ -16,7 +16,7 @@ use aptos_crypto::{ }; use aptos_rest_client::aptos_api_types::{ResourceGroup, TransactionOnChainData}; use aptos_types::{ - account_config::fungible_store::FungibleStoreResource, + account_config::{fungible_store::FungibleStoreResource, DepositFAEvent, MoveEventV2, WithdrawFAEvent}, chain_id::ChainId, contract_event::ContractEvent, event::{EventHandle, EventKey}, @@ -126,35 +126,23 @@ impl FaData { }; let (new_balance, contract_event) = if self.deposit { - let type_tag = TypeTag::Struct(Box::new(StructTag { - address: AccountAddress::ONE, - module: ident_str!(FUNGIBLE_ASSET_MODULE).into(), - name: ident_str!("Deposit").into(), - type_args: vec![], - })); - let event = FungibleAssetChangeEvent { + let event = DepositFAEvent { store: self.store_address, amount: self.amount, }; ( self.previous_balance + self.amount, - ContractEvent::new_v2(type_tag, bcs::to_bytes(&event).unwrap()), + event.create_event_v2(), ) } else { - let event = FungibleAssetChangeEvent { + let event = WithdrawFAEvent { store: self.store_address, amount: self.amount, }; - let type_tag = TypeTag::Struct(Box::new(StructTag { - address: AccountAddress::ONE, - module: ident_str!(FUNGIBLE_ASSET_MODULE).into(), - name: ident_str!("Withdraw").into(), - type_args: vec![], - })); ( self.previous_balance - self.amount, - ContractEvent::new_v2(type_tag, bcs::to_bytes(&event).unwrap()), + event.create_event_v2(), ) }; diff --git a/crates/aptos-rosetta/src/types/move_types.rs b/crates/aptos-rosetta/src/types/move_types.rs index 286296833ca46..569dbc688eecd 100644 --- a/crates/aptos-rosetta/src/types/move_types.rs +++ b/crates/aptos-rosetta/src/types/move_types.rs @@ -256,12 +256,6 @@ pub struct WithdrawUndelegatedEvent { pub amount_withdrawn: u64, } -#[derive(Debug, Serialize, Deserialize)] -pub struct FungibleAssetChangeEvent { - pub store: AccountAddress, - pub amount: u64, -} - #[derive(Debug, Serialize, Deserialize)] pub struct ObjectCore { pub guid_creation_num: u64, diff --git a/execution/executor-benchmark/src/block_preparation.rs b/execution/executor-benchmark/src/block_preparation.rs index 773a6c9f31b79..7b3ad8a5b0d3a 100644 --- a/execution/executor-benchmark/src/block_preparation.rs +++ b/execution/executor-benchmark/src/block_preparation.rs @@ -1,7 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::{metrics::TIMER, pipeline::ExecuteBlockMessage}; +use crate::{metrics::{NUM_TXNS, TIMER}, pipeline::ExecuteBlockMessage}; use aptos_block_partitioner::{BlockPartitioner, PartitionerConfig}; use aptos_crypto::HashValue; use aptos_experimental_runtimes::thread_manager::optimal_min_len; @@ -10,28 +10,18 @@ use aptos_types::{ block_executor::partitioner::{ExecutableBlock, ExecutableTransactions}, transaction::{signature_verified_transaction::SignatureVerifiedTransaction, Transaction}, }; -use once_cell::sync::Lazy; use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; -use std::{sync::Arc, time::Instant}; - -pub static SIG_VERIFY_POOL: Lazy> = Lazy::new(|| { - Arc::new( - rayon::ThreadPoolBuilder::new() - .num_threads(8) // More than 8 threads doesn't seem to help much - .thread_name(|index| format!("signature-checker-{}", index)) - .build() - .unwrap(), - ) -}); +use std::time::Instant; pub(crate) struct BlockPreparationStage { num_executor_shards: usize, num_blocks_processed: usize, maybe_partitioner: Option>, + sig_verify_pool: rayon::ThreadPool, } impl BlockPreparationStage { - pub fn new(num_shards: usize, partitioner_config: &dyn PartitionerConfig) -> Self { + pub fn new(sig_verify_num_threads: usize, num_shards: usize, partitioner_config: &dyn PartitionerConfig) -> Self { let maybe_partitioner = if num_shards == 0 { None } else { @@ -39,10 +29,16 @@ impl BlockPreparationStage { Some(partitioner) }; + let sig_verify_pool = rayon::ThreadPoolBuilder::new() + .num_threads(sig_verify_num_threads) // More than 8 threads doesn't seem to help much + .thread_name(|index| format!("signature-checker-{}", index)) + .build() + .unwrap(); Self { num_executor_shards: num_shards, num_blocks_processed: 0, maybe_partitioner, + sig_verify_pool, } } @@ -54,8 +50,14 @@ impl BlockPreparationStage { txns.len() ); let block_id = HashValue::random(); - let sig_verified_txns: Vec = SIG_VERIFY_POOL.install(|| { + let sig_verified_txns: Vec = self.sig_verify_pool.install(|| { + let _timer = TIMER.with_label_values(&["sig_verify"]).start_timer(); + let num_txns = txns.len(); + NUM_TXNS + .with_label_values(&["sig_verify"]) + .inc_by(num_txns as u64); + txns.into_par_iter() .with_min_len(optimal_min_len(num_txns, 32)) .map(|t| t.into()) @@ -64,6 +66,9 @@ impl BlockPreparationStage { let block: ExecutableBlock = match &self.maybe_partitioner { None => (block_id, sig_verified_txns).into(), Some(partitioner) => { + NUM_TXNS + .with_label_values(&["partition"]) + .inc_by(sig_verified_txns.len() as u64); let analyzed_transactions = sig_verified_txns.into_iter().map(|t| t.into()).collect(); let timer = TIMER.with_label_values(&["partition"]).start_timer(); diff --git a/execution/executor-benchmark/src/db_access.rs b/execution/executor-benchmark/src/db_access.rs index 98eebae63b4ca..73ba9a0a0706e 100644 --- a/execution/executor-benchmark/src/db_access.rs +++ b/execution/executor-benchmark/src/db_access.rs @@ -4,16 +4,14 @@ use anyhow::Result; use aptos_storage_interface::state_view::DbStateView; use aptos_types::{ - account_address::AccountAddress, - state_store::{state_key::StateKey, StateView}, - write_set::TOTAL_SUPPLY_STATE_KEY, + account_address::AccountAddress, account_config::{CoinStoreResource, FungibleStoreResource, ObjectGroupResource}, state_store::{state_key::StateKey, StateView}, write_set::TOTAL_SUPPLY_STATE_KEY, AptosCoinType }; use move_core_types::{ identifier::Identifier, - language_storage::{StructTag, TypeTag}, + language_storage::{StructTag, TypeTag}, move_resource::MoveStructType, }; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use std::str::FromStr; +use std::{collections::BTreeMap, str::FromStr}; #[derive(Debug, Default, Deserialize, Serialize)] pub struct CoinStore { @@ -88,15 +86,12 @@ impl DbAccessUtil { Self::new_state_key(address, AccountAddress::ONE, "account", "Account", vec![]) } - pub fn new_state_key_aptos_coin(address: AccountAddress) -> StateKey { - Self::new_state_key(address, AccountAddress::ONE, "coin", "CoinStore", vec![ - TypeTag::Struct(Box::new(Self::new_struct_tag( - AccountAddress::ONE, - "aptos_coin", - "AptosCoin", - vec![], - ))), - ]) + pub fn new_state_key_aptos_coin(address: &AccountAddress) -> StateKey { + StateKey::resource_typed::>(address).unwrap() + } + + pub fn new_state_key_object_resource_group(address: &AccountAddress) -> StateKey { + StateKey::resource_group(address, &ObjectGroupResource::struct_tag()) } pub fn get_account( @@ -106,6 +101,13 @@ impl DbAccessUtil { Self::get_value(account_key, state_view) } + pub fn get_fa_store( + store_key: &StateKey, + state_view: &impl StateView, + ) -> Result> { + Self::get_value(store_key, state_view) + } + pub fn get_coin_store( coin_store_key: &StateKey, state_view: &impl StateView, @@ -123,6 +125,13 @@ impl DbAccessUtil { value.transpose().map_err(anyhow::Error::msg) } + pub fn get_resource_group( + state_key: &StateKey, + state_view: &impl StateView, + ) -> Result>>> { + Self::get_value(state_key, state_view) + } + pub fn get_db_value( state_key: &StateKey, state_view: &DbStateView, diff --git a/execution/executor-benchmark/src/db_generator.rs b/execution/executor-benchmark/src/db_generator.rs index 626b07687205d..23fb2375cc525 100644 --- a/execution/executor-benchmark/src/db_generator.rs +++ b/execution/executor-benchmark/src/db_generator.rs @@ -62,7 +62,7 @@ pub fn create_db_with_accounts( ); } -fn bootstrap_with_genesis( +pub(crate) fn bootstrap_with_genesis( db_dir: impl AsRef, enable_storage_sharding: bool, init_features: Features, diff --git a/execution/executor-benchmark/src/db_reliable_submitter.rs b/execution/executor-benchmark/src/db_reliable_submitter.rs index d95b8f37e70fa..587c734807eb5 100644 --- a/execution/executor-benchmark/src/db_reliable_submitter.rs +++ b/execution/executor-benchmark/src/db_reliable_submitter.rs @@ -28,7 +28,7 @@ pub struct DbReliableTransactionSubmitter { impl ReliableTransactionSubmitter for DbReliableTransactionSubmitter { async fn get_account_balance(&self, account_address: AccountAddress) -> Result { let db_state_view = self.db.reader.latest_state_checkpoint_view().unwrap(); - let sender_coin_store_key = DbAccessUtil::new_state_key_aptos_coin(account_address); + let sender_coin_store_key = DbAccessUtil::new_state_key_aptos_coin(&account_address); let sender_coin_store = DbAccessUtil::get_db_value::(&sender_coin_store_key, &db_state_view)? .unwrap(); diff --git a/execution/executor-benchmark/src/lib.rs b/execution/executor-benchmark/src/lib.rs index d34ef9a7c7041..0333632fb9918 100644 --- a/execution/executor-benchmark/src/lib.rs +++ b/execution/executor-benchmark/src/lib.rs @@ -9,7 +9,7 @@ pub mod db_generator; mod db_reliable_submitter; mod ledger_update_stage; mod metrics; -pub mod native_executor; +pub mod native_loose_block_executor; pub mod pipeline; pub mod transaction_committer; pub mod transaction_executor; @@ -20,7 +20,6 @@ use crate::{ transaction_executor::TransactionExecutor, transaction_generator::TransactionGenerator, }; use aptos_block_executor::counters::{self as block_executor_counters, GasType}; -use aptos_block_partitioner::v2::counters::BLOCK_PARTITIONING_SECONDS; use aptos_config::config::{NodeConfig, PrunerConfig}; use aptos_db::AptosDB; use aptos_executor::{ @@ -43,6 +42,7 @@ use aptos_transaction_generator_lib::{ }; use aptos_types::on_chain_config::Features; use db_reliable_submitter::DbReliableTransactionSubmitter; +use metrics::TIMER; use pipeline::PipelineConfig; use std::{ collections::HashMap, @@ -91,16 +91,22 @@ fn create_checkpoint( .expect("db checkpoint creation fails."); } +pub enum BenchmarkWorkload { + TransactionMix(Vec<(TransactionType, usize)>), + Transfer { + connected_tx_grps: usize, + shuffle_connected_txns: bool, + hotspot_probability: Option, + } +} + /// Runs the benchmark with given parameters. #[allow(clippy::too_many_arguments)] pub fn run_benchmark( block_size: usize, num_blocks: usize, - transaction_mix: Option>, + workload: BenchmarkWorkload, mut transactions_per_sender: usize, - connected_tx_grps: usize, - shuffle_connected_txns: bool, - hotspot_probability: Option, num_main_signer_accounts: usize, num_additional_dst_pool_accounts: usize, source_dir: impl AsRef, @@ -126,7 +132,8 @@ pub fn run_benchmark( let (db, executor) = init_db_and_executor::(&config); let root_account = TransactionGenerator::read_root_account(genesis_key, &db); let root_account = Arc::new(root_account); - let transaction_generators = transaction_mix.clone().map(|transaction_mix| { + + let transaction_generators = if let BenchmarkWorkload::TransactionMix(transaction_mix) = &workload { let num_existing_accounts = TransactionGenerator::read_meta(&source_dir); let num_accounts_to_be_loaded = std::cmp::min( num_existing_accounts, @@ -134,7 +141,7 @@ pub fn run_benchmark( ); let mut num_accounts_to_skip = 0; - for (transaction_type, _) in &transaction_mix { + for (transaction_type, _) in transaction_mix { if matches!(transaction_type, CoinTransfer { non_conflicting, .. } if *non_conflicting) { // In case of random non-conflicting coin transfer using `P2PTransactionGenerator`, // `3*block_size` addresses is required: @@ -152,7 +159,7 @@ pub fn run_benchmark( accounts_cache.split(num_main_signer_accounts); let (transaction_generator_creator, phase) = init_workload::( - transaction_mix, + transaction_mix.clone(), root_account.clone(), main_signer_accounts, burner_accounts, @@ -162,8 +169,10 @@ pub fn run_benchmark( &PipelineConfig::default(), ); // need to initialize all workers and finish with all transactions before we start the timer: - ((0..pipeline_config.num_generator_workers).map(|_| transaction_generator_creator.create_transaction_generator()).collect::>(), phase) - }); + Some(((0..pipeline_config.num_generator_workers).map(|_| transaction_generator_creator.create_transaction_generator()).collect::>(), phase)) + } else { + None + }; let version = db.reader.expect_synced_version(); @@ -171,7 +180,8 @@ pub fn run_benchmark( Pipeline::new(executor, version, &pipeline_config, Some(num_blocks)); let mut num_accounts_to_load = num_main_signer_accounts; - if let Some(mix) = &transaction_mix { + + if let BenchmarkWorkload::TransactionMix(mix) = &workload { for (transaction_type, _) in mix { if matches!(transaction_type, CoinTransfer { non_conflicting, .. } if *non_conflicting) { @@ -200,39 +210,40 @@ pub fn run_benchmark( let mut overall_measuring = OverallMeasuring::start(); - let num_blocks_created = if let Some((transaction_generators, phase)) = transaction_generators { - generator.run_workload( - block_size, - num_blocks, - transaction_generators, - phase, - transactions_per_sender, - ) - } else { - generator.run_transfer( - block_size, - num_blocks, - transactions_per_sender, - connected_tx_grps, - shuffle_connected_txns, - hotspot_probability, - ) + let (num_blocks_created, workload_name) = match workload { + BenchmarkWorkload::TransactionMix(mix) => { + let (transaction_generators, phase) = transaction_generators.unwrap(); + let num_blocks_created = generator.run_workload( + block_size, + num_blocks, + transaction_generators, + phase, + transactions_per_sender, + ); + (num_blocks_created, format!("{:?} via txn generator", mix)) + }, + BenchmarkWorkload::Transfer { connected_tx_grps, shuffle_connected_txns, hotspot_probability } => { + let num_blocks_created = generator.run_transfer( + block_size, + num_blocks, + transactions_per_sender, + connected_tx_grps, + shuffle_connected_txns, + hotspot_probability, + ); + (num_blocks_created, "raw transfer".to_string()) + } }; - if pipeline_config.delay_execution_start { + if pipeline_config.delay_pipeline_start { overall_measuring.start_time = Instant::now(); } - pipeline.start_execution(); generator.drop_sender(); + info!("Done creating workload"); + pipeline.start_pipeline_processing(); + info!("Waiting for pipeline to finish"); pipeline.join(); - info!( - "Executed workload {}", - if let Some(mix) = transaction_mix { - format!("{:?} via txn generator", mix) - } else { - "raw transfer".to_string() - } - ); + info!("Executed workload {}", workload_name); if !pipeline_config.skip_commit { let num_txns = db.reader.expect_synced_version() - version - num_blocks_created as u64; @@ -277,7 +288,7 @@ where block_sender, }; - create_txn_generator_creator( + let result = create_txn_generator_creator( &[transaction_mix], AlwaysApproveRootAccountHandle { root_account }, &mut main_signer_accounts, @@ -287,9 +298,14 @@ where &transaction_factory, phase_clone, ) - .await + .await; + + drop(db_gen_init_transaction_executor); + + result }); + info!("Waiting for init to finish"); pipeline.join(); (txn_generator_creator, phase) @@ -376,8 +392,8 @@ fn add_accounts_impl( init_account_balance, block_size, ); - pipeline.start_execution(); generator.drop_sender(); + pipeline.start_pipeline_processing(); pipeline.join(); let elapsed = start_time.elapsed().as_secs_f32(); @@ -389,12 +405,12 @@ fn add_accounts_impl( ); if verify_sequence_numbers { - println!("Verifying sequence numbers..."); + info!("Verifying sequence numbers..."); // Do a sanity check on the sequence number to make sure all transactions are committed. generator.verify_sequence_numbers(db.reader.clone()); } - println!( + info!( "Created {} new accounts. Now at version {}, total # of accounts {}.", num_new_accounts, now_version, @@ -523,14 +539,13 @@ static OTHER_LABELS: &[(&str, bool, &str)] = &[ struct ExecutionTimeMeasurement { output_size: f64, - partitioning_total: f64, - execution_total: f64, - vm_only: f64, + sig_verify_total_time: f64, + partitioning_total_time: f64, + execution_total_time: f64, + vm_total_time: f64, by_other: HashMap<&'static str, f64>, ledger_update_total: f64, - commit_total: f64, - - vm_time: f64, + commit_total_time: f64, } impl ExecutionTimeMeasurement { @@ -539,9 +554,10 @@ impl ExecutionTimeMeasurement { .with_label_values(&["execution"]) .get_sample_sum(); - let partitioning_total = BLOCK_PARTITIONING_SECONDS.get_sample_sum(); + let sig_verify_total = TIMER.with_label_values(&["sig_verify"]).get_sample_sum(); + let partitioning_total = TIMER.with_label_values(&["partition"]).get_sample_sum(); let execution_total = EXECUTE_BLOCK.get_sample_sum(); - let vm_only = VM_EXECUTE_BLOCK.get_sample_sum(); + let vm_total = VM_EXECUTE_BLOCK.get_sample_sum(); let by_other = OTHER_LABELS .iter() @@ -557,17 +573,15 @@ impl ExecutionTimeMeasurement { let ledger_update_total = UPDATE_LEDGER.get_sample_sum(); let commit_total = COMMIT_BLOCKS.get_sample_sum(); - let vm_time = VM_EXECUTE_BLOCK.get_sample_sum(); - Self { output_size, - partitioning_total, - execution_total, - vm_only, + sig_verify_total_time: sig_verify_total, + partitioning_total_time: partitioning_total, + execution_total_time: execution_total, + vm_total_time: vm_total, by_other, ledger_update_total, - commit_total, - vm_time, + commit_total_time: commit_total, } } @@ -576,17 +590,17 @@ impl ExecutionTimeMeasurement { Self { output_size: end.output_size - self.output_size, - partitioning_total: end.partitioning_total - self.partitioning_total, - execution_total: end.execution_total - self.execution_total, - vm_only: end.vm_only - self.vm_only, + sig_verify_total_time: end.sig_verify_total_time - self.sig_verify_total_time, + partitioning_total_time: end.partitioning_total_time - self.partitioning_total_time, + execution_total_time: end.execution_total_time - self.execution_total_time, + vm_total_time: end.vm_total_time - self.vm_total_time, by_other: end .by_other .into_iter() .map(|(k, v)| (k, v - self.by_other.get(&k).unwrap())) .collect::>(), ledger_update_total: end.ledger_update_total - self.ledger_update_total, - commit_total: end.commit_total - self.commit_total, - vm_time: end.vm_time - self.vm_time, + commit_total_time: end.commit_total_time - self.commit_total_time, } } } @@ -620,13 +634,6 @@ impl OverallMeasuring { num_txns, elapsed ); - info!( - "{} VM execution TPS {} txn/s; ({} / {})", - prefix, - (num_txns / delta_execution.vm_time) as usize, - num_txns, - delta_execution.vm_time - ); info!("{} GPS: {} gas/s", prefix, delta_gas.gas / elapsed); info!( "{} effectiveGPS: {} gas/s ({} effective block gas, in {} s)", @@ -670,31 +677,36 @@ impl OverallMeasuring { ); info!( - "{} fraction of total: {:.3} in partitioning (component TPS: {})", + "{} fraction of total: {:.4} in signature verification (component TPS: {:.1})", prefix, - delta_execution.partitioning_total / elapsed, - num_txns / delta_execution.partitioning_total + delta_execution.sig_verify_total_time / elapsed, + num_txns / delta_execution.sig_verify_total_time ); - info!( - "{} fraction of total: {:.3} in execution (component TPS: {})", + "{} fraction of total: {:.4} in partitioning (component TPS: {:.1})", prefix, - delta_execution.execution_total / elapsed, - num_txns / delta_execution.execution_total + delta_execution.partitioning_total_time / elapsed, + num_txns / delta_execution.partitioning_total_time ); info!( - "{} fraction of execution {:.3} in VM (component TPS: {})", + "{} fraction of total: {:.4} in execution (component TPS: {:.1})", prefix, - delta_execution.vm_only / delta_execution.execution_total, - num_txns / delta_execution.vm_only + delta_execution.execution_total_time / elapsed, + num_txns / delta_execution.execution_total_time + ); + info!( + "{} fraction of execution {:.4} in VM (component TPS: {:.1})", + prefix, + delta_execution.vm_total_time / delta_execution.execution_total_time, + num_txns / delta_execution.vm_total_time ); for (prefix, top_level, other_label) in OTHER_LABELS { let time_in_label = delta_execution.by_other.get(other_label).unwrap(); - if *top_level || time_in_label / delta_execution.execution_total > 0.01 { + if *top_level || time_in_label / delta_execution.execution_total_time > 0.01 { info!( - "{} fraction of execution {:.3} in {} {} (component TPS: {})", + "{} fraction of execution {:.4} in {} {} (component TPS: {:.1})", prefix, - time_in_label / delta_execution.execution_total, + time_in_label / delta_execution.execution_total_time, prefix, other_label, num_txns / time_in_label @@ -703,17 +715,17 @@ impl OverallMeasuring { } info!( - "{} fraction of total: {:.3} in ledger update (component TPS: {})", + "{} fraction of total: {:.4} in ledger update (component TPS: {:.1})", prefix, delta_execution.ledger_update_total / elapsed, num_txns / delta_execution.ledger_update_total ); info!( - "{} fraction of total: {:.4} in commit (component TPS: {})", + "{} fraction of total: {:.4} in commit (component TPS: {:.1})", prefix, - delta_execution.commit_total / elapsed, - num_txns / delta_execution.commit_total + delta_execution.commit_total_time / elapsed, + num_txns / delta_execution.commit_total_time ); } } @@ -726,13 +738,105 @@ fn log_total_supply(db_reader: &Arc) { #[cfg(test)] mod tests { - use crate::{native_executor::NativeExecutor, pipeline::PipelineConfig}; + use std::fs; + + use crate::{db_generator::bootstrap_with_genesis, init_db_and_executor, native_loose_block_executor::NativeLooseBlockExecutor, pipeline::PipelineConfig, transaction_executor::BENCHMARKS_BLOCK_EXECUTOR_ONCHAIN_CONFIG, transaction_generator::TransactionGenerator, BenchmarkWorkload}; use aptos_config::config::NO_OP_STORAGE_PRUNER_CONFIG; + use aptos_crypto::HashValue; use aptos_executor::block_executor::TransactionBlockExecutor; + use aptos_sdk::{transaction_builder::{aptos_stdlib, TransactionFactory}, types::LocalAccount}; use aptos_temppath::TempPath; use aptos_transaction_generator_lib::{args::TransactionTypeArg, WorkflowProgress}; - use aptos_types::on_chain_config::Features; + use aptos_types::{block_executor::partitioner::ExecutableBlock, on_chain_config::{FeatureFlag, Features}, transaction::Transaction}; use aptos_vm::AptosVM; + use rand::thread_rng; + use aptos_executor_types::BlockExecutorTrait; + + #[test] + fn test_compare_vm_and_native() { + aptos_logger::Logger::new().init(); + + let db_dir = TempPath::new(); + + fs::create_dir_all(db_dir.as_ref()).unwrap(); + + let mut init_features = Features::default(); + init_features.enable(FeatureFlag::NEW_ACCOUNTS_DEFAULT_TO_FA_APT_STORE); + init_features.enable(FeatureFlag::OPERATIONS_DEFAULT_TO_FA_APT_STORE); + + bootstrap_with_genesis(&db_dir, false, init_features.clone()); + + let (mut config, genesis_key) = + aptos_genesis::test_utils::test_config_with_custom_features(init_features); + config.storage.dir = db_dir.as_ref().to_path_buf(); + config.storage.storage_pruner_config = NO_OP_STORAGE_PRUNER_CONFIG; + config.storage.rocksdb_configs.enable_storage_sharding = false; + + let (txn, vm_result) = { + let (vm_db, vm_executor) = init_db_and_executor::(&config); + let root_account = TransactionGenerator::read_root_account(genesis_key, &vm_db); + let dst = LocalAccount::generate(&mut thread_rng()); + let num_coins = 1000; + + let txn_factory = TransactionGenerator::create_transaction_factory(); + let txn = Transaction::UserTransaction(root_account.sign_with_transaction_builder(txn_factory.payload(aptos_stdlib::aptos_account_fungible_transfer_only( + dst.address(), num_coins, + )))); + let vm_result = vm_executor + .execute_and_state_checkpoint( + (HashValue::zero(), vec![txn.clone()]).into(), + vm_executor.committed_block_id(), + BENCHMARKS_BLOCK_EXECUTOR_ONCHAIN_CONFIG, + ) + .unwrap(); + + (txn, vm_result) + }; + + let (native_db, native_executor) = init_db_and_executor::(&config); + let native_result = native_executor + .execute_and_state_checkpoint( + (HashValue::zero(), vec![txn]).into(), + native_executor.committed_block_id(), + BENCHMARKS_BLOCK_EXECUTOR_ONCHAIN_CONFIG, + ) + .unwrap(); + + let ( + vm_txns, + _vm_state_updates_vec, + _vm_state_checkpoint_hashes, + _vm_state_updates_before_last_checkpoint, + _vm_sharded_state_cache, + _vm_block_end_info, + ) = vm_result.into_inner(); + let (vm_statuses_for_input_txns, + vm_to_commit, + vm_to_discard, + vm_to_retry, + ) = vm_txns.into_inner(); + + let ( + native_txns, + _native_state_updates_vec, + _native_state_checkpoint_hashes, + _native_state_updates_before_last_checkpoint, + _native_sharded_state_cache, + _native_block_end_info, + ) = native_result.into_inner(); + let (native_statuses_for_input_txns, + native_to_commit, + native_to_discard, + native_to_retry, + ) = native_txns.into_inner(); + + println!("{:?}", vm_to_commit.parsed_outputs()); + assert_eq!(vm_statuses_for_input_txns, native_statuses_for_input_txns); + assert_eq!(vm_to_commit, native_to_commit); + assert_eq!(vm_to_discard, native_to_discard); + assert_eq!(vm_to_retry, native_to_retry); + + } fn test_generic_benchmark( transaction_type: Option, @@ -765,12 +869,11 @@ mod tests { super::run_benchmark::( 10, /* block_size */ 30, /* num_blocks */ - transaction_type - .map(|t| vec![(t.materialize(1, true, WorkflowProgress::MoveByPhases), 1)]), + transaction_type.map_or_else( + || BenchmarkWorkload::Transfer { connected_tx_grps: 0, shuffle_connected_txns: false, hotspot_probability: None }, + |t| BenchmarkWorkload::TransactionMix(vec![(t.materialize(1, true, WorkflowProgress::MoveByPhases), 1)]) + ), 2, /* transactions per sender */ - 0, /* connected txn groups in a block */ - false, /* shuffle the connected txns in a block */ - None, /* maybe_hotspot_probability */ 25, /* num_main_signer_accounts */ 30, /* num_dst_pool_accounts */ storage_dir.as_ref(), @@ -793,7 +896,7 @@ mod tests { AptosVM::set_num_shards_once(1); AptosVM::set_concurrency_level_once(4); AptosVM::set_processed_transactions_detailed_counters(); - NativeExecutor::set_concurrency_level_once(4); + NativeLooseBlockExecutor::set_concurrency_level_once(4); test_generic_benchmark::( Some(TransactionTypeArg::ModifyGlobalMilestoneAggV2), true, @@ -803,6 +906,6 @@ mod tests { #[test] fn test_native_benchmark() { // correct execution not yet implemented, so cannot be checked for validity - test_generic_benchmark::(None, false); + test_generic_benchmark::(Some(TransactionTypeArg::AptFaTransfer), false); } } diff --git a/execution/executor-benchmark/src/main.rs b/execution/executor-benchmark/src/main.rs index f904cacc9bf08..93254658d4500 100644 --- a/execution/executor-benchmark/src/main.rs +++ b/execution/executor-benchmark/src/main.rs @@ -14,7 +14,7 @@ use aptos_config::config::{ EpochSnapshotPrunerConfig, LedgerPrunerConfig, PrunerConfig, StateMerklePrunerConfig, }; use aptos_executor::block_executor::TransactionBlockExecutor; -use aptos_executor_benchmark::{native_executor::NativeExecutor, pipeline::PipelineConfig}; +use aptos_executor_benchmark::{native_loose_block_executor::NativeLooseBlockExecutor, pipeline::PipelineConfig, BenchmarkWorkload}; use aptos_executor_service::remote_executor_client; use aptos_experimental_ptx_executor::PtxBlockExecutor; #[cfg(target_os = "linux")] @@ -114,6 +114,8 @@ pub struct PipelineOpt { allow_retries: bool, #[clap(long, default_value = "4")] num_generator_workers: usize, + #[clap(long, default_value = "8")] + sig_verify_num_threads: usize, #[clap(flatten)] sharding_opt: ShardingOpt, } @@ -121,16 +123,16 @@ pub struct PipelineOpt { impl PipelineOpt { fn pipeline_config(&self) -> PipelineConfig { PipelineConfig { - delay_execution_start: self.generate_then_execute, + delay_pipeline_start: self.generate_then_execute, split_stages: self.split_stages, skip_commit: self.skip_commit, allow_aborts: self.allow_aborts, allow_discards: self.allow_discards, allow_retries: self.allow_retries, num_executor_shards: self.sharding_opt.num_executor_shards, - use_global_executor: self.sharding_opt.use_global_executor, num_generator_workers: self.num_generator_workers, partitioner_config: self.sharding_opt.partitioner_config(), + sig_verify_num_threads: self.sig_verify_num_threads, } } } @@ -207,11 +209,11 @@ struct ProfilerOpt { #[derive(Parser, Debug)] #[clap(group( ArgGroup::new("vm_selection") - .args(&["use_native_executor", "use_ptx_executor"]), + .args(&["use_native_loose_block_executor", "use_ptx_executor"]), ))] pub struct VmSelectionOpt { #[clap(long)] - use_native_executor: bool, + use_native_loose_block_executor: bool, #[clap(long)] use_ptx_executor: bool, @@ -438,8 +440,8 @@ where // disable_feature, // ); - let transaction_mix = if transaction_type.is_empty() { - None + let workload = if transaction_type.is_empty() { + BenchmarkWorkload::Transfer { connected_tx_grps: opt.connected_tx_grps, shuffle_connected_txns: opt.shuffle_connected_txns, hotspot_probability: opt.hotspot_probability } } else { let mix_per_phase = TransactionTypeArg::args_to_transaction_mix_per_phase( &transaction_type, @@ -450,7 +452,7 @@ where WorkflowProgress::MoveByPhases, ); assert!(mix_per_phase.len() == 1); - Some(mix_per_phase[0].clone()) + BenchmarkWorkload::TransactionMix(mix_per_phase[0].clone()) }; if let Some(hotspot_probability) = opt.hotspot_probability { @@ -464,11 +466,8 @@ where aptos_executor_benchmark::run_benchmark::( opt.block_size, blocks, - transaction_mix, + workload, opt.transactions_per_sender, - opt.connected_tx_grps, - opt.shuffle_connected_txns, - opt.hotspot_probability, main_signer_accounts, additional_dst_pool_accounts, data_dir, @@ -565,7 +564,7 @@ fn main() { } AptosVM::set_num_shards_once(execution_shards); AptosVM::set_concurrency_level_once(execution_threads_per_shard); - NativeExecutor::set_concurrency_level_once(execution_threads_per_shard); + NativeLooseBlockExecutor::set_concurrency_level_once(execution_threads_per_shard); AptosVM::set_processed_transactions_detailed_counters(); let config = ProfilerConfig::new_with_defaults(); @@ -584,8 +583,8 @@ fn main() { let _mem_start = memory_profiler.start_profiling(); } - if opt.vm_selection_opt.use_native_executor { - run::(opt); + if opt.vm_selection_opt.use_native_loose_block_executor { + run::(opt); } else if opt.vm_selection_opt.use_ptx_executor { #[cfg(target_os = "linux")] ThreadManagerBuilder::set_thread_config_strategy(ThreadConfigStrategy::ThreadsPriority(48)); diff --git a/execution/executor-benchmark/src/native_executor.rs b/execution/executor-benchmark/src/native_loose_block_executor.rs similarity index 64% rename from execution/executor-benchmark/src/native_executor.rs rename to execution/executor-benchmark/src/native_loose_block_executor.rs index 2eea6d915e11a..eebc922dab321 100644 --- a/execution/executor-benchmark/src/native_executor.rs +++ b/execution/executor-benchmark/src/native_loose_block_executor.rs @@ -11,7 +11,7 @@ use aptos_executor_types::execution_output::ExecutionOutput; use aptos_storage_interface::cached_state_view::CachedStateView; use aptos_types::{ account_address::AccountAddress, - account_config::{deposit::DepositEvent, withdraw::WithdrawEvent}, + account_config::{deposit::DepositEvent, primary_apt_store, withdraw::WithdrawEvent, DepositFAEvent, FungibleStoreResource, MoveEventV2, WithdrawFAEvent}, block_executor::{config::BlockExecutorConfigFromOnchain, partitioner::ExecutableTransactions}, contract_event::ContractEvent, event::EventKey, @@ -30,7 +30,7 @@ use move_core_types::{ }; use once_cell::sync::{Lazy, OnceCell}; use rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; struct IncrementalOutput { write_set: Vec<(StateKey, WriteOp)>, @@ -64,18 +64,24 @@ impl IncrementalOutput { } } -pub struct NativeExecutor {} +/// Native block executor (replacing both BlockSTM and AptosVM), that is +/// "loose", i.e. doesn't compute outputs correctly. +/// It's loose in multiple ways: +/// - it ignores conflicts. All transactions see the state at the start of the block! +/// - it doesn't put everything in the writeset that should be there +/// - it doesn't compute gas +pub struct NativeLooseBlockExecutor {} static NATIVE_EXECUTOR_CONCURRENCY_LEVEL: OnceCell = OnceCell::new(); static NATIVE_EXECUTOR_POOL: Lazy = Lazy::new(|| { ThreadPoolBuilder::new() - .num_threads(NativeExecutor::get_concurrency_level()) + .num_threads(NativeLooseBlockExecutor::get_concurrency_level()) .thread_name(|index| format!("native_exe_{}", index)) .build() .unwrap() }); -impl NativeExecutor { +impl NativeLooseBlockExecutor { pub fn set_concurrency_level_once(concurrency_level: usize) { NATIVE_EXECUTOR_CONCURRENCY_LEVEL .set(concurrency_level) @@ -89,10 +95,155 @@ impl NativeExecutor { } } + + fn withdraw_fa_apt_from_signer( + sender_address: AccountAddress, + transfer_amount: u64, + state_view: &CachedStateView, + gas: u64, + ) -> Result> { + let sender_account_key = DbAccessUtil::new_state_key_account(sender_address); + let mut sender_account = { + let _timer = TIMER + .with_label_values(&["read_sender_account"]) + .start_timer(); + DbAccessUtil::get_account(&sender_account_key, state_view)?.unwrap() + }; + let sender_store_address = primary_apt_store(sender_address); + + let sender_fa_store_object_key = DbAccessUtil::new_state_key_object_resource_group(&sender_store_address); + let mut sender_fa_store_object = { + let _timer = TIMER + .with_label_values(&["read_sender_fa_store"]) + .start_timer(); + match DbAccessUtil::get_resource_group( &sender_fa_store_object_key, state_view)? { + Some(sender_fa_store_object) => sender_fa_store_object, + None => panic!("couldn't find {:?}", sender_fa_store_object_key) + + // return Ok(Ok(IncrementalOutput { write_set: vec![], events: vec![] })) + + // Ok(Err(TransactionStatus::Keep(ExecutionStatus::MoveAbort { + // location: AbortLocation::Module(ModuleId::new( + // AccountAddress::ONE, + // ident_str!("fungible_asset").into(), + // )), + // code: 7, + // info: None, + // }))) + } + }; + + let fungible_store_rg_tag = FungibleStoreResource::struct_tag(); + let mut sender_fa_store = bcs::from_bytes::(&sender_fa_store_object.remove(&fungible_store_rg_tag).unwrap())?; + + // Note: numbers below may not be real. When runninng in parallel there might be conflicts. + sender_fa_store.balance -= transfer_amount + gas; + + sender_fa_store_object.insert(fungible_store_rg_tag, bcs::to_bytes(&sender_fa_store)?); + + sender_account.sequence_number += 1; + + // add total supply via aggregators? + // let mut total_supply: u128 = + // DbAccessUtil::get_value(&TOTAL_SUPPLY_STATE_KEY, state_view)?.unwrap(); + // total_supply -= gas as u128; + + // TODO(grao): Add other reads to match the read set of the real transaction. + let write_set = vec![ + ( + sender_account_key, + WriteOp::legacy_modification(bcs::to_bytes(&sender_account)?.into()), + ), + ( + sender_fa_store_object_key, + WriteOp::legacy_modification(bcs::to_bytes(&sender_fa_store_object)?.into()), + ), + // ( + // TOTAL_SUPPLY_STATE_KEY.clone(), + // WriteOp::legacy_modification(bcs::to_bytes(&total_supply)?), + // ), + ]; + + let event = WithdrawFAEvent { + store: sender_store_address, + amount: transfer_amount, + }; + + let events = vec![ + event.create_event_v2() + ]; + Ok(Ok(IncrementalOutput { write_set, events })) + } + + fn deposit_fa_apt( + recipient_address: AccountAddress, + transfer_amount: u64, + state_view: &CachedStateView, + ) -> Result> { + let recipient_store_address = primary_apt_store(recipient_address); + let recipient_fa_store_object_key = DbAccessUtil::new_state_key_object_resource_group(&recipient_store_address); + let fungible_store_rg_tag = FungibleStoreResource::struct_tag(); + + let (recipient_fa_store, mut recipient_fa_store_object, recipient_fa_store_existed) = { + let _timer = TIMER + .with_label_values(&["read_recipient_fa_store"]) + .start_timer(); + match DbAccessUtil::get_resource_group(&recipient_fa_store_object_key, state_view)? { + Some(mut recipient_fa_store_object) => { + let mut recipient_fa_store = bcs::from_bytes::(&recipient_fa_store_object.remove(&fungible_store_rg_tag).unwrap())?; + recipient_fa_store.balance += transfer_amount; + (recipient_fa_store, recipient_fa_store_object, true) + }, + None => { + let receipeint_fa_store = FungibleStoreResource::new(AccountAddress::TEN, transfer_amount, false); + let receipeint_fa_store_object = BTreeMap::new(); + (receipeint_fa_store, receipeint_fa_store_object, false) + }, + } + }; + + recipient_fa_store_object.insert(fungible_store_rg_tag, bcs::to_bytes(&recipient_fa_store)?); + + + // Note: numbers below may not be real. When runninng in parallel there might be conflicts. + + // add total supply via aggregators? + // let mut total_supply: u128 = + // DbAccessUtil::get_value(&TOTAL_SUPPLY_STATE_KEY, state_view)?.unwrap(); + // total_supply -= gas as u128; + + // TODO(grao): Add other reads to match the read set of the real transaction. + let write_set = vec![ + ( + recipient_fa_store_object_key, + if recipient_fa_store_existed { + WriteOp::legacy_modification(bcs::to_bytes(&recipient_fa_store_object)?.into()) + } else { + WriteOp::legacy_creation(bcs::to_bytes(&recipient_fa_store_object)?.into()) + }, + ), + // ( + // TOTAL_SUPPLY_STATE_KEY.clone(), + // WriteOp::legacy_modification(bcs::to_bytes(&total_supply)?), + // ), + ]; + + let event = DepositFAEvent { + store: recipient_store_address, + amount: transfer_amount, + }; + + let events = vec![ + event.create_event_v2() + ]; + Ok(Ok(IncrementalOutput { write_set, events })) + } + fn withdraw_from_signer( sender_address: AccountAddress, transfer_amount: u64, state_view: &CachedStateView, + gas: u64, ) -> Result> { let sender_account_key = DbAccessUtil::new_state_key_account(sender_address); let mut sender_account = { @@ -101,7 +252,7 @@ impl NativeExecutor { .start_timer(); DbAccessUtil::get_account(&sender_account_key, state_view)?.unwrap() }; - let sender_coin_store_key = DbAccessUtil::new_state_key_aptos_coin(sender_address); + let sender_coin_store_key = DbAccessUtil::new_state_key_aptos_coin(&sender_address); let mut sender_coin_store = { let _timer = TIMER .with_label_values(&["read_sender_coin_store"]) @@ -112,7 +263,6 @@ impl NativeExecutor { // Note: numbers below may not be real. When runninng in parallel there might be conflicts. sender_coin_store.coin -= transfer_amount; - let gas = 1; sender_coin_store.coin -= gas; sender_account.sequence_number += 1; @@ -156,7 +306,7 @@ impl NativeExecutor { fail_on_missing: bool, ) -> Result> { let recipient_account_key = DbAccessUtil::new_state_key_account(recipient_address); - let recipient_coin_store_key = DbAccessUtil::new_state_key_aptos_coin(recipient_address); + let recipient_coin_store_key = DbAccessUtil::new_state_key_aptos_coin(&recipient_address); let recipient_account = { let _timer = TIMER.with_label_values(&["read_new_account"]).start_timer(); @@ -243,6 +393,39 @@ impl NativeExecutor { Ok(Ok(IncrementalOutput { write_set, events })) } + fn handle_fa_transfer( + sender_address: AccountAddress, + recipient_address: AccountAddress, + transfer_amount: u64, + state_view: &CachedStateView, + ) -> Result { + let _timer = TIMER.with_label_values(&["fa_transfer"]).start_timer(); + + let gas = 500; // hardcode gas consumed. + + let mut output = { + let output = Self::withdraw_fa_apt_from_signer(sender_address, transfer_amount, state_view, gas)?; + match output { + Ok(output) => output, + Err(status) => return Ok(IncrementalOutput::to_abort(status)), + } + }; + + let deposit_output = Self::deposit_fa_apt( + recipient_address, + transfer_amount, + state_view, + )?; + + match deposit_output { + Ok(deposit_output) => { + output.append(deposit_output); + output.into_success_output() + }, + Err(status) => Ok(IncrementalOutput::to_abort(status)), + } + } + fn handle_account_creation_and_transfer( sender_address: AccountAddress, recipient_address: AccountAddress, @@ -253,8 +436,10 @@ impl NativeExecutor { ) -> Result { let _timer = TIMER.with_label_values(&["account_creation"]).start_timer(); + let gas = 500; // hardcode gas consumed. + let mut output = { - let output = Self::withdraw_from_signer(sender_address, transfer_amount, state_view)?; + let output = Self::withdraw_from_signer(sender_address, transfer_amount, state_view, gas)?; match output { Ok(output) => output, Err(status) => return Ok(IncrementalOutput::to_abort(status)), @@ -287,6 +472,9 @@ impl NativeExecutor { fail_on_missing: bool, ) -> Result { let mut deltas = HashMap::new(); + + let gas = 5000; // hardcode gas consumed. + for (recipient, amount) in recipient_addresses .into_iter() .zip(transfer_amounts.into_iter()) @@ -307,7 +495,7 @@ impl NativeExecutor { assert!(amount_to_sender >= 0); let mut output = { let output = - Self::withdraw_from_signer(sender_address, amount_to_sender as u64, state_view)?; + Self::withdraw_from_signer(sender_address, amount_to_sender as u64, state_view, gas)?; match output { Ok(output) => output, Err(status) => return Ok(IncrementalOutput::to_abort(status)), @@ -345,7 +533,7 @@ impl NativeExecutor { } } -impl TransactionBlockExecutor for NativeExecutor { +impl TransactionBlockExecutor for NativeLooseBlockExecutor { fn execute_transaction_block( transactions: ExecutableTransactions, state_view: CachedStateView, @@ -367,6 +555,14 @@ impl TransactionBlockExecutor for NativeExecutor { f.module().name().as_str(), f.function().as_str(), ) { + (AccountAddress::ONE, "aptos_account", "fungible_transfer_only") => { + Self::handle_fa_transfer( + user_txn.sender(), + bcs::from_bytes(&f.args()[0]).unwrap(), + bcs::from_bytes(&f.args()[1]).unwrap(), + &state_view, + ) + }, (AccountAddress::ONE, "coin", "transfer") => { Self::handle_account_creation_and_transfer( user_txn.sender(), diff --git a/execution/executor-benchmark/src/pipeline.rs b/execution/executor-benchmark/src/pipeline.rs index 8d4efaf380d70..3cd8d2feb45c1 100644 --- a/execution/executor-benchmark/src/pipeline.rs +++ b/execution/executor-benchmark/src/pipeline.rs @@ -20,8 +20,7 @@ use derivative::Derivative; use std::{ marker::PhantomData, sync::{ - mpsc::{self, SyncSender}, - Arc, + mpsc::{self, SyncSender}, Arc }, thread::JoinHandle, time::{Duration, Instant}, @@ -30,7 +29,7 @@ use std::{ #[derive(Debug, Derivative)] #[derivative(Default)] pub struct PipelineConfig { - pub delay_execution_start: bool, + pub delay_pipeline_start: bool, pub split_stages: bool, pub skip_commit: bool, pub allow_aborts: bool, @@ -38,16 +37,16 @@ pub struct PipelineConfig { pub allow_retries: bool, #[derivative(Default(value = "0"))] pub num_executor_shards: usize, - pub use_global_executor: bool, #[derivative(Default(value = "4"))] pub num_generator_workers: usize, pub partitioner_config: PartitionerV2Config, + pub sig_verify_num_threads: usize, } pub struct Pipeline { join_handles: Vec>, phantom: PhantomData, - start_execution_tx: Option>, + start_pipeline_tx: Option>, } impl Pipeline @@ -67,15 +66,20 @@ where let executor_3 = executor_1.clone(); let (raw_block_sender, raw_block_receiver) = mpsc::sync_channel::>( - if config.delay_execution_start { + if config.delay_pipeline_start { (num_blocks.unwrap() + 1).max(50) } else { 10 }, /* bound */ ); - // Assume the distributed executor and the distributed partitioner share the same worker set. - let num_partitioner_shards = config.num_executor_shards; + let (executable_block_sender, executable_block_receiver) = mpsc::sync_channel::( + if config.split_stages { + (num_blocks.unwrap() + 1).max(50) + } else { + 10 + }, /* bound */ + ); let (ledger_update_sender, ledger_update_receiver) = mpsc::sync_channel::( @@ -94,24 +98,21 @@ where }, /* bound */ ); - let (start_execution_tx, start_execution_rx) = if config.delay_execution_start { - let (start_execution_tx, start_execution_rx) = mpsc::sync_channel::<()>(1); - (Some(start_execution_tx), Some(start_execution_rx)) - } else { - (None, None) - }; + let (start_pipeline_tx, start_pipeline_rx) = create_start_tx_rx(config.delay_pipeline_start); + let (start_execution_tx, start_execution_rx) = create_start_tx_rx(config.split_stages); + let (start_ledger_update_tx, start_ledger_update_rx) = create_start_tx_rx(config.split_stages); + let (start_commit_tx, start_commit_rx) = create_start_tx_rx(config.split_stages); - let (start_commit_tx, start_commit_rx) = if config.split_stages { - let (start_commit_tx, start_commit_rx) = mpsc::sync_channel::<()>(1); - (Some(start_commit_tx), Some(start_commit_rx)) - } else { - (None, None) - }; let mut join_handles = vec![]; - let mut partitioning_stage = - BlockPreparationStage::new(num_partitioner_shards, &config.partitioner_config); + // signature verification and partitioning + let mut preparation_stage = BlockPreparationStage::new( + config.sig_verify_num_threads, + // Assume the distributed executor and the distributed partitioner share the same worker set. + config.num_executor_shards, + &config.partitioner_config + ); let mut exe = TransactionExecutor::new( executor_1, @@ -130,22 +131,19 @@ where let mut ledger_update_stage = LedgerUpdateStage::new(executor_2, commit_processing, version); - let (executable_block_sender, executable_block_receiver) = - mpsc::sync_channel::(3); - - let partitioning_thread = std::thread::Builder::new() - .name("block_partitioning".to_string()) + let preparation_thread = std::thread::Builder::new() + .name("block_preparation".to_string()) .spawn(move || { + start_pipeline_rx.map(|rx| rx.recv()); while let Ok(txns) = raw_block_receiver.recv() { - NUM_TXNS - .with_label_values(&["partition"]) - .inc_by(txns.len() as u64); - let exe_block_msg = partitioning_stage.process(txns); + let exe_block_msg = preparation_stage.process(txns); executable_block_sender.send(exe_block_msg).unwrap(); } + info!("Done preparation"); + start_execution_tx.map(|tx| tx.send(())); }) .expect("Failed to spawn block partitioner thread."); - join_handles.push(partitioning_thread); + join_handles.push(preparation_thread); let exe_thread = std::thread::Builder::new() .name("txn_executor".to_string()) @@ -201,7 +199,7 @@ where if num_blocks.is_some() { overall_measuring.print_end("Overall execution", executed); } - start_commit_tx.map(|tx| tx.send(())); + start_ledger_update_tx.map(|tx| tx.send(())); }) .expect("Failed to spawn transaction executor thread."); join_handles.push(exe_thread); @@ -209,6 +207,8 @@ where let ledger_update_thread = std::thread::Builder::new() .name("ledger_update".to_string()) .spawn(move || { + start_ledger_update_rx.map(|rx| rx.recv()); + while let Ok(ledger_update_msg) = ledger_update_receiver.recv() { let input_block_size = ledger_update_msg.state_checkpoint_output.input_txns_len(); @@ -217,6 +217,7 @@ where .inc_by(input_block_size as u64); ledger_update_stage.ledger_update(ledger_update_msg); } + start_commit_tx.map(|tx| tx.send(())); }) .expect("Failed to spawn ledger update thread."); join_handles.push(ledger_update_thread); @@ -239,14 +240,14 @@ where Self { join_handles, phantom: PhantomData, - start_execution_tx, + start_pipeline_tx, }, raw_block_sender, ) } - pub fn start_execution(&self) { - self.start_execution_tx.as_ref().map(|tx| tx.send(())); + pub fn start_pipeline_processing(&self) { + self.start_pipeline_tx.as_ref().map(|tx| tx.send(())); } pub fn join(self) { @@ -256,6 +257,16 @@ where } } +fn create_start_tx_rx(should_wait: bool) -> (Option>, Option>) { + let (start_tx, start_rx) = if should_wait { + let (start_tx, start_rx) = mpsc::sync_channel::<()>(1); + (Some(start_tx), Some(start_rx)) + } else { + (None, None) + }; + (start_tx, start_rx) +} + /// Message from partitioning stage to execution stage. pub struct ExecuteBlockMessage { pub current_block_start_time: Instant, diff --git a/execution/executor-types/src/parsed_transaction_output.rs b/execution/executor-types/src/parsed_transaction_output.rs index ddf81e5a2873f..d4306119cb34d 100644 --- a/execution/executor-types/src/parsed_transaction_output.rs +++ b/execution/executor-types/src/parsed_transaction_output.rs @@ -17,7 +17,7 @@ use std::ops::Deref; pub static NEW_EPOCH_EVENT_KEY: Lazy = Lazy::new(on_chain_config::new_epoch_event_key); -#[derive(Clone)] +#[derive(Clone, Debug, Eq, PartialEq)] pub struct ParsedTransactionOutput { output: TransactionOutput, reconfig_events: Vec, @@ -89,7 +89,7 @@ impl ParsedTransactionOutput { } } -#[derive(Default)] +#[derive(Default, Debug, Eq, PartialEq)] pub struct TransactionsWithParsedOutput { transactions: Vec, parsed_output: Vec, diff --git a/testsuite/single_node_performance.py b/testsuite/single_node_performance.py index 0032151853eb6..82e2899829fbf 100755 --- a/testsuite/single_node_performance.py +++ b/testsuite/single_node_performance.py @@ -8,7 +8,7 @@ import tempfile import json import itertools -from typing import Callable, Optional, Tuple, Mapping, Sequence, Any +from typing import Callable, Optional, Tuple, Mapping, Sequence, Any, List from tabulate import tabulate from subprocess import Popen, PIPE, CalledProcessError from dataclasses import dataclass, field @@ -50,7 +50,7 @@ class Flow(Flag): DEFAULT_NUM_INIT_ACCOUNTS = ( "100000000" if SELECTED_FLOW == Flow.MAINNET_LARGE_DB else "2000000" ) -DEFAULT_MAX_BLOCK_SIZE = "10000" +DEFAULT_MAX_BLOCK_SIZE = "20000" MAX_BLOCK_SIZE = int(os.environ.get("MAX_BLOCK_SIZE", default=DEFAULT_MAX_BLOCK_SIZE)) NUM_BLOCKS = int(os.environ.get("NUM_BLOCKS_PER_TEST", default=15)) @@ -101,10 +101,8 @@ class Flow(Flag): if os.environ.get("DISABLE_FA_APT"): FEATURE_FLAGS = "" - SKIP_NATIVE = False else: FEATURE_FLAGS = "--enable-feature NEW_ACCOUNTS_DEFAULT_TO_FA_APT_STORE --enable-feature OPERATIONS_DEFAULT_TO_FA_APT_STORE" - SKIP_NATIVE = True if os.environ.get("ENABLE_PRUNER"): DB_PRUNER_FLAGS = "--enable-state-pruner --enable-ledger-pruner --enable-epoch-snapshot-pruner --ledger-pruning-batch-size 10000 --state-prune-window 3000000 --epoch-snapshot-prune-window 3000000 --ledger-prune-window 3000000" @@ -127,6 +125,9 @@ class RunGroupKeyExtra: transaction_type_override: Optional[str] = field(default=None) transaction_weights_override: Optional[str] = field(default=None) sharding_traffic_flags: Optional[str] = field(default=None) + sig_verify_num_threads_override: Optional[int] = field(default=None) + execution_num_threads_override: Optional[int] = field(default=None) + split_stages_override: bool = field(default=False) @dataclass @@ -162,6 +163,11 @@ class RunGroupConfig: no-op 1 VM 57 0.758 1.079 40390.5 no-op 1000 VM 57 0.740 1.040 22473.1 apt-fa-transfer 1 VM 57 0.762 1.070 28769.8 +apt-fa-transfer 1 native 57 0.762 1.070 28769.8 +apt_fa_transfer_by_stages 1 VM 57 0.762 1.070 28769.8 +apt_fa_transfer_by_stages 1 native 57 0.762 1.070 28769.8 +apt_fa_transfer_sequential_by_stages 1 VM 57 0.762 1.070 10000. +apt_fa_transfer_sequential_by_stages 1 native 57 0.762 1.070 10000. account-generation 1 VM 57 0.774 1.055 23332.3 account-resource32-b 1 VM 57 0.799 1.084 35822.6 modify-global-resource 1 VM 57 0.810 1.022 2789.1 @@ -207,92 +213,116 @@ class RunGroupConfig: DEFAULT_MODULE_WORKING_SET_SIZE = 100 TESTS = [ - RunGroupConfig(key=RunGroupKey("no-op"), included_in=LAND_BLOCKING_AND_C), - RunGroupConfig(key=RunGroupKey("no-op", module_working_set_size=1000), included_in=LAND_BLOCKING_AND_C), + # RunGroupConfig(key=RunGroupKey("no-op"), included_in=LAND_BLOCKING_AND_C), + # RunGroupConfig(key=RunGroupKey("no-op", module_working_set_size=1000), included_in=LAND_BLOCKING_AND_C), RunGroupConfig(key=RunGroupKey("apt-fa-transfer"), included_in=LAND_BLOCKING_AND_C | Flow.REPRESENTATIVE | Flow.MAINNET), RunGroupConfig(key=RunGroupKey("apt-fa-transfer", executor_type="native"), included_in=LAND_BLOCKING_AND_C), - RunGroupConfig(key=RunGroupKey("account-generation"), included_in=LAND_BLOCKING_AND_C | Flow.REPRESENTATIVE | Flow.MAINNET), - RunGroupConfig(key=RunGroupKey("account-generation", executor_type="native"), included_in=Flow.CONTINUOUS), - RunGroupConfig(key=RunGroupKey("account-resource32-b"), included_in=Flow.CONTINUOUS), - RunGroupConfig(key=RunGroupKey("modify-global-resource"), included_in=LAND_BLOCKING_AND_C | Flow.REPRESENTATIVE), - RunGroupConfig(key=RunGroupKey("modify-global-resource", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.CONTINUOUS), - RunGroupConfig(key=RunGroupKey("publish-package"), included_in=LAND_BLOCKING_AND_C | Flow.REPRESENTATIVE | Flow.MAINNET), - RunGroupConfig(key=RunGroupKey("mix_publish_transfer"), key_extra=RunGroupKeyExtra( - transaction_type_override="publish-package apt-fa-transfer", - transaction_weights_override="1 500", + + RunGroupConfig(key=RunGroupKey("apt_fa_transfer_by_stages"), key_extra=RunGroupKeyExtra( + transaction_type_override="apt-fa-transfer", + split_stages_override=True, + sig_verify_num_threads_override=NUMBER_OF_EXECUTION_THREADS, + ), included_in=LAND_BLOCKING_AND_C), + RunGroupConfig(key=RunGroupKey("apt_fa_transfer_by_stages", executor_type="native"), key_extra=RunGroupKeyExtra( + transaction_type_override="apt-fa-transfer", + split_stages_override=True, + sig_verify_num_threads_override=NUMBER_OF_EXECUTION_THREADS, + ), included_in=LAND_BLOCKING_AND_C), + RunGroupConfig(key=RunGroupKey("apt_fa_transfer_sequential_by_stages"), key_extra=RunGroupKeyExtra( + transaction_type_override="apt-fa-transfer", + sig_verify_num_threads_override=1, + execution_num_threads_override=1, + split_stages_override=True, ), included_in=LAND_BLOCKING_AND_C), - RunGroupConfig(key=RunGroupKey("batch100-transfer"), included_in=LAND_BLOCKING_AND_C), - RunGroupConfig(key=RunGroupKey("batch100-transfer", executor_type="native"), included_in=Flow.CONTINUOUS), - - RunGroupConfig(expected_tps=100, key=RunGroupKey("vector-picture40"), included_in=Flow(0), waived=True), - RunGroupConfig(expected_tps=1000, key=RunGroupKey("vector-picture40", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow(0), waived=True), - RunGroupConfig(key=RunGroupKey("vector-picture30k"), included_in=LAND_BLOCKING_AND_C), - RunGroupConfig(key=RunGroupKey("vector-picture30k", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.CONTINUOUS), - RunGroupConfig(key=RunGroupKey("smart-table-picture30-k-with200-change"), included_in=LAND_BLOCKING_AND_C), - RunGroupConfig(key=RunGroupKey("smart-table-picture30-k-with200-change", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.CONTINUOUS), - # RunGroupConfig(expected_tps=10, key=RunGroupKey("smart-table-picture1-m-with256-change"), included_in=LAND_BLOCKING_AND_C), - # RunGroupConfig(expected_tps=40, key=RunGroupKey("smart-table-picture1-m-with256-change", module_working_set_size=20), included_in=Flow.CONTINUOUS), - - RunGroupConfig(key=RunGroupKey("modify-global-resource-agg-v2"), included_in=Flow.AGG_V2 | LAND_BLOCKING_AND_C), - RunGroupConfig(expected_tps=10000, key=RunGroupKey("modify-global-resource-agg-v2", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.AGG_V2, waived=True), - RunGroupConfig(key=RunGroupKey("modify-global-flag-agg-v2"), included_in=Flow.AGG_V2 | Flow.CONTINUOUS), - RunGroupConfig(expected_tps=10000, key=RunGroupKey("modify-global-flag-agg-v2", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.AGG_V2, waived=True), - RunGroupConfig(key=RunGroupKey("modify-global-bounded-agg-v2"), included_in=Flow.AGG_V2 | Flow.CONTINUOUS), - RunGroupConfig(expected_tps=10000, key=RunGroupKey("modify-global-bounded-agg-v2", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.AGG_V2, waived=True), - RunGroupConfig(key=RunGroupKey("modify-global-milestone-agg-v2"), included_in=Flow.AGG_V2 | Flow.CONTINUOUS), - - RunGroupConfig(key=RunGroupKey("resource-groups-global-write-tag1-kb"), included_in=LAND_BLOCKING_AND_C | Flow.RESOURCE_GROUPS), - RunGroupConfig(expected_tps=8000, key=RunGroupKey("resource-groups-global-write-tag1-kb", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.RESOURCE_GROUPS, waived=True), - RunGroupConfig(key=RunGroupKey("resource-groups-global-write-and-read-tag1-kb"), included_in=Flow.CONTINUOUS | Flow.RESOURCE_GROUPS), - RunGroupConfig(expected_tps=8000, key=RunGroupKey("resource-groups-global-write-and-read-tag1-kb", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.RESOURCE_GROUPS, waived=True), - RunGroupConfig(key=RunGroupKey("resource-groups-sender-write-tag1-kb"), included_in=Flow.CONTINUOUS | Flow.RESOURCE_GROUPS), - RunGroupConfig(expected_tps=8000, key=RunGroupKey("resource-groups-sender-write-tag1-kb", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.RESOURCE_GROUPS, waived=True), - RunGroupConfig(key=RunGroupKey("resource-groups-sender-multi-change1-kb"), included_in=LAND_BLOCKING_AND_C | Flow.RESOURCE_GROUPS), - RunGroupConfig(expected_tps=8000, key=RunGroupKey("resource-groups-sender-multi-change1-kb", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.RESOURCE_GROUPS, waived=True), - - RunGroupConfig(key=RunGroupKey("token-v1ft-mint-and-transfer"), included_in=Flow.CONTINUOUS), - RunGroupConfig(key=RunGroupKey("token-v1ft-mint-and-transfer", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.CONTINUOUS), - RunGroupConfig(key=RunGroupKey("token-v1nft-mint-and-transfer-sequential"), included_in=Flow.CONTINUOUS), - RunGroupConfig(key=RunGroupKey("token-v1nft-mint-and-transfer-sequential", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.CONTINUOUS), - RunGroupConfig(expected_tps=1300, key=RunGroupKey("token-v1nft-mint-and-transfer-parallel"), included_in=Flow(0), waived=True), - RunGroupConfig(expected_tps=5300, key=RunGroupKey("token-v1nft-mint-and-transfer-parallel", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow(0), waived=True), - - RunGroupConfig(key=RunGroupKey("coin-init-and-mint", module_working_set_size=1), included_in=Flow.CONTINUOUS), - RunGroupConfig(key=RunGroupKey("coin-init-and-mint", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.CONTINUOUS), - RunGroupConfig(key=RunGroupKey("fungible-asset-mint", module_working_set_size=1), included_in=LAND_BLOCKING_AND_C), - RunGroupConfig(key=RunGroupKey("fungible-asset-mint", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.CONTINUOUS), - - # RunGroupConfig(expected_tps=1000, key=RunGroupKey("token-v1ft-mint-and-store"), included_in=Flow(0)), - # RunGroupConfig(expected_tps=1000, key=RunGroupKey("token-v1nft-mint-and-store-sequential"), included_in=Flow(0)), - # RunGroupConfig(expected_tps=1000, key=RunGroupKey("token-v1nft-mint-and-transfer-parallel"), included_in=Flow(0)), - - RunGroupConfig(key=RunGroupKey("no-op5-signers"), included_in=Flow.CONTINUOUS), - - RunGroupConfig(key=RunGroupKey("token-v2-ambassador-mint"), included_in=LAND_BLOCKING_AND_C | Flow.REPRESENTATIVE | Flow.MAINNET), - RunGroupConfig(key=RunGroupKey("token-v2-ambassador-mint", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.CONTINUOUS), - - RunGroupConfig(key=RunGroupKey("liquidity-pool-swap"), included_in=LAND_BLOCKING_AND_C | Flow.REPRESENTATIVE), - RunGroupConfig(key=RunGroupKey("liquidity-pool-swap", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.CONTINUOUS), - - RunGroupConfig(key=RunGroupKey("liquidity-pool-swap-stable"), included_in=Flow.CONTINUOUS), - RunGroupConfig(key=RunGroupKey("liquidity-pool-swap-stable", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.CONTINUOUS), - - RunGroupConfig(key=RunGroupKey("deserialize-u256"), included_in=Flow.CONTINUOUS), - - # fee payer sequentializes transactions today. in these tests module publisher is the fee payer, so larger number of modules tests throughput with multiple fee payers - RunGroupConfig(key=RunGroupKey("no-op-fee-payer"), included_in=LAND_BLOCKING_AND_C), - RunGroupConfig(key=RunGroupKey("no-op-fee-payer", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.CONTINUOUS), - - RunGroupConfig(expected_tps=50000, key=RunGroupKey("coin_transfer_connected_components", executor_type="sharded"), key_extra=RunGroupKeyExtra(sharding_traffic_flags="--connected-tx-grps 5000", transaction_type_override=""), included_in=Flow.REPRESENTATIVE, waived=True), - RunGroupConfig(expected_tps=50000, key=RunGroupKey("coin_transfer_hotspot", executor_type="sharded"), key_extra=RunGroupKeyExtra(sharding_traffic_flags="--hotspot-probability 0.8", transaction_type_override=""), included_in=Flow.REPRESENTATIVE, waived=True), - - # setting separately for previewnet, as we run on a different number of cores. - RunGroupConfig(expected_tps=20000, key=RunGroupKey("apt-fa-transfer"), included_in=Flow.MAINNET_LARGE_DB), - RunGroupConfig(expected_tps=15000, key=RunGroupKey("account-generation"), included_in=Flow.MAINNET_LARGE_DB), - RunGroupConfig(expected_tps=60, key=RunGroupKey("publish-package"), included_in=Flow.MAINNET_LARGE_DB), - RunGroupConfig(expected_tps=6800, key=RunGroupKey("token-v2-ambassador-mint"), included_in=Flow.MAINNET_LARGE_DB), - # RunGroupConfig(expected_tps=17000 if NUM_ACCOUNTS < 5000000 else 28000, key=RunGroupKey("coin_transfer_connected_components", executor_type="sharded"), key_extra=RunGroupKeyExtra(sharding_traffic_flags="--connected-tx-grps 5000", transaction_type_override=""), included_in=Flow.MAINNET | Flow.MAINNET_LARGE_DB, waived=True), - # RunGroupConfig(expected_tps=27000 if NUM_ACCOUNTS < 5000000 else 23000, key=RunGroupKey("coin_transfer_hotspot", executor_type="sharded"), key_extra=RunGroupKeyExtra(sharding_traffic_flags="--hotspot-probability 0.8", transaction_type_override=""), included_in=Flow.MAINNET | Flow.MAINNET_LARGE_DB, waived=True), + RunGroupConfig(key=RunGroupKey("apt_fa_transfer_sequential_by_stages", executor_type="native"), key_extra=RunGroupKeyExtra( + transaction_type_override="apt-fa-transfer", + sig_verify_num_threads_override=1, + execution_num_threads_override=1, + split_stages_override=True, + ), included_in=LAND_BLOCKING_AND_C), + + # RunGroupConfig(key=RunGroupKey("account-generation"), included_in=LAND_BLOCKING_AND_C | Flow.REPRESENTATIVE | Flow.MAINNET), + # RunGroupConfig(key=RunGroupKey("account-generation", executor_type="native"), included_in=Flow.CONTINUOUS), + # RunGroupConfig(key=RunGroupKey("account-resource32-b"), included_in=Flow.CONTINUOUS), + # RunGroupConfig(key=RunGroupKey("modify-global-resource"), included_in=LAND_BLOCKING_AND_C | Flow.REPRESENTATIVE), + # RunGroupConfig(key=RunGroupKey("modify-global-resource", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.CONTINUOUS), + # RunGroupConfig(key=RunGroupKey("publish-package"), included_in=LAND_BLOCKING_AND_C | Flow.REPRESENTATIVE | Flow.MAINNET), + # RunGroupConfig(key=RunGroupKey("mix_publish_transfer"), key_extra=RunGroupKeyExtra( + # transaction_type_override="publish-package apt-fa-transfer", + # transaction_weights_override="1 500", + # ), included_in=LAND_BLOCKING_AND_C), + # RunGroupConfig(key=RunGroupKey("batch100-transfer"), included_in=LAND_BLOCKING_AND_C), + # RunGroupConfig(key=RunGroupKey("batch100-transfer", executor_type="native"), included_in=Flow.CONTINUOUS), + + # RunGroupConfig(expected_tps=100, key=RunGroupKey("vector-picture40"), included_in=Flow(0), waived=True), + # RunGroupConfig(expected_tps=1000, key=RunGroupKey("vector-picture40", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow(0), waived=True), + # RunGroupConfig(key=RunGroupKey("vector-picture30k"), included_in=LAND_BLOCKING_AND_C), + # RunGroupConfig(key=RunGroupKey("vector-picture30k", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.CONTINUOUS), + # RunGroupConfig(key=RunGroupKey("smart-table-picture30-k-with200-change"), included_in=LAND_BLOCKING_AND_C), + # RunGroupConfig(key=RunGroupKey("smart-table-picture30-k-with200-change", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.CONTINUOUS), + # # RunGroupConfig(expected_tps=10, key=RunGroupKey("smart-table-picture1-m-with256-change"), included_in=LAND_BLOCKING_AND_C), + # # RunGroupConfig(expected_tps=40, key=RunGroupKey("smart-table-picture1-m-with256-change", module_working_set_size=20), included_in=Flow.CONTINUOUS), + + # RunGroupConfig(key=RunGroupKey("modify-global-resource-agg-v2"), included_in=Flow.AGG_V2 | LAND_BLOCKING_AND_C), + # RunGroupConfig(expected_tps=10000, key=RunGroupKey("modify-global-resource-agg-v2", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.AGG_V2, waived=True), + # RunGroupConfig(key=RunGroupKey("modify-global-flag-agg-v2"), included_in=Flow.AGG_V2 | Flow.CONTINUOUS), + # RunGroupConfig(expected_tps=10000, key=RunGroupKey("modify-global-flag-agg-v2", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.AGG_V2, waived=True), + # RunGroupConfig(key=RunGroupKey("modify-global-bounded-agg-v2"), included_in=Flow.AGG_V2 | Flow.CONTINUOUS), + # RunGroupConfig(expected_tps=10000, key=RunGroupKey("modify-global-bounded-agg-v2", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.AGG_V2, waived=True), + # RunGroupConfig(key=RunGroupKey("modify-global-milestone-agg-v2"), included_in=Flow.AGG_V2 | Flow.CONTINUOUS), + + # RunGroupConfig(key=RunGroupKey("resource-groups-global-write-tag1-kb"), included_in=LAND_BLOCKING_AND_C | Flow.RESOURCE_GROUPS), + # RunGroupConfig(expected_tps=8000, key=RunGroupKey("resource-groups-global-write-tag1-kb", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.RESOURCE_GROUPS, waived=True), + # RunGroupConfig(key=RunGroupKey("resource-groups-global-write-and-read-tag1-kb"), included_in=Flow.CONTINUOUS | Flow.RESOURCE_GROUPS), + # RunGroupConfig(expected_tps=8000, key=RunGroupKey("resource-groups-global-write-and-read-tag1-kb", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.RESOURCE_GROUPS, waived=True), + # RunGroupConfig(key=RunGroupKey("resource-groups-sender-write-tag1-kb"), included_in=Flow.CONTINUOUS | Flow.RESOURCE_GROUPS), + # RunGroupConfig(expected_tps=8000, key=RunGroupKey("resource-groups-sender-write-tag1-kb", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.RESOURCE_GROUPS, waived=True), + # RunGroupConfig(key=RunGroupKey("resource-groups-sender-multi-change1-kb"), included_in=LAND_BLOCKING_AND_C | Flow.RESOURCE_GROUPS), + # RunGroupConfig(expected_tps=8000, key=RunGroupKey("resource-groups-sender-multi-change1-kb", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.RESOURCE_GROUPS, waived=True), + + # RunGroupConfig(key=RunGroupKey("token-v1ft-mint-and-transfer"), included_in=Flow.CONTINUOUS), + # RunGroupConfig(key=RunGroupKey("token-v1ft-mint-and-transfer", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.CONTINUOUS), + # RunGroupConfig(key=RunGroupKey("token-v1nft-mint-and-transfer-sequential"), included_in=Flow.CONTINUOUS), + # RunGroupConfig(key=RunGroupKey("token-v1nft-mint-and-transfer-sequential", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.CONTINUOUS), + # RunGroupConfig(expected_tps=1300, key=RunGroupKey("token-v1nft-mint-and-transfer-parallel"), included_in=Flow(0), waived=True), + # RunGroupConfig(expected_tps=5300, key=RunGroupKey("token-v1nft-mint-and-transfer-parallel", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow(0), waived=True), + + # RunGroupConfig(key=RunGroupKey("coin-init-and-mint", module_working_set_size=1), included_in=Flow.CONTINUOUS), + # RunGroupConfig(key=RunGroupKey("coin-init-and-mint", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.CONTINUOUS), + # RunGroupConfig(key=RunGroupKey("fungible-asset-mint", module_working_set_size=1), included_in=LAND_BLOCKING_AND_C), + # RunGroupConfig(key=RunGroupKey("fungible-asset-mint", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.CONTINUOUS), + + # # RunGroupConfig(expected_tps=1000, key=RunGroupKey("token-v1ft-mint-and-store"), included_in=Flow(0)), + # # RunGroupConfig(expected_tps=1000, key=RunGroupKey("token-v1nft-mint-and-store-sequential"), included_in=Flow(0)), + # # RunGroupConfig(expected_tps=1000, key=RunGroupKey("token-v1nft-mint-and-transfer-parallel"), included_in=Flow(0)), + + # RunGroupConfig(key=RunGroupKey("no-op5-signers"), included_in=Flow.CONTINUOUS), + + # RunGroupConfig(key=RunGroupKey("token-v2-ambassador-mint"), included_in=LAND_BLOCKING_AND_C | Flow.REPRESENTATIVE | Flow.MAINNET), + # RunGroupConfig(key=RunGroupKey("token-v2-ambassador-mint", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.CONTINUOUS), + + # RunGroupConfig(key=RunGroupKey("liquidity-pool-swap"), included_in=LAND_BLOCKING_AND_C | Flow.REPRESENTATIVE), + # RunGroupConfig(key=RunGroupKey("liquidity-pool-swap", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.CONTINUOUS), + + # RunGroupConfig(key=RunGroupKey("liquidity-pool-swap-stable"), included_in=Flow.CONTINUOUS), + # RunGroupConfig(key=RunGroupKey("liquidity-pool-swap-stable", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.CONTINUOUS), + + # RunGroupConfig(key=RunGroupKey("deserialize-u256"), included_in=Flow.CONTINUOUS), + + # # fee payer sequentializes transactions today. in these tests module publisher is the fee payer, so larger number of modules tests throughput with multiple fee payers + # RunGroupConfig(key=RunGroupKey("no-op-fee-payer"), included_in=LAND_BLOCKING_AND_C), + # RunGroupConfig(key=RunGroupKey("no-op-fee-payer", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.CONTINUOUS), + + # RunGroupConfig(expected_tps=50000, key=RunGroupKey("coin_transfer_connected_components", executor_type="sharded"), key_extra=RunGroupKeyExtra(sharding_traffic_flags="--connected-tx-grps 5000", transaction_type_override=""), included_in=Flow.REPRESENTATIVE, waived=True), + # RunGroupConfig(expected_tps=50000, key=RunGroupKey("coin_transfer_hotspot", executor_type="sharded"), key_extra=RunGroupKeyExtra(sharding_traffic_flags="--hotspot-probability 0.8", transaction_type_override=""), included_in=Flow.REPRESENTATIVE, waived=True), + + # # setting separately for previewnet, as we run on a different number of cores. + # RunGroupConfig(expected_tps=20000, key=RunGroupKey("apt-fa-transfer"), included_in=Flow.MAINNET_LARGE_DB), + # RunGroupConfig(expected_tps=15000, key=RunGroupKey("account-generation"), included_in=Flow.MAINNET_LARGE_DB), + # RunGroupConfig(expected_tps=60, key=RunGroupKey("publish-package"), included_in=Flow.MAINNET_LARGE_DB), + # RunGroupConfig(expected_tps=6800, key=RunGroupKey("token-v2-ambassador-mint"), included_in=Flow.MAINNET_LARGE_DB), + # # RunGroupConfig(expected_tps=17000 if NUM_ACCOUNTS < 5000000 else 28000, key=RunGroupKey("coin_transfer_connected_components", executor_type="sharded"), key_extra=RunGroupKeyExtra(sharding_traffic_flags="--connected-tx-grps 5000", transaction_type_override=""), included_in=Flow.MAINNET | Flow.MAINNET_LARGE_DB, waived=True), + # # RunGroupConfig(expected_tps=27000 if NUM_ACCOUNTS < 5000000 else 23000, key=RunGroupKey("coin_transfer_hotspot", executor_type="sharded"), key_extra=RunGroupKeyExtra(sharding_traffic_flags="--hotspot-probability 0.8", transaction_type_override=""), included_in=Flow.MAINNET | Flow.MAINNET_LARGE_DB, waived=True), ] # fmt: on @@ -352,8 +382,10 @@ class RunResults: gpt: float storage_fee_pt: float output_bps: float + fraction_in_sig_verify: float fraction_in_execution: float fraction_of_execution_in_vm: float + fraction_in_ledger_update: float fraction_in_commit: float @@ -407,8 +439,10 @@ def extract_run_results( gpt = 0 storage_fee_pt = 0 output_bps = 0 + fraction_in_sig_verify = 0 fraction_in_execution = 0 fraction_of_execution_in_vm = 0 + fraction_in_ledger_update = 0 fraction_in_commit = 0 else: tps = float(get_only(re.findall(prefix + r" TPS: (\d+\.?\d*) txn/s", output))) @@ -432,6 +466,12 @@ def extract_run_results( output_bps = float( get_only(re.findall(prefix + r" output: (\d+\.?\d*) bytes/s", output)) ) + fraction_in_sig_verify = float( + re.findall( + prefix + r" fraction of total: (\d+\.?\d*) in signature verification", + output, + )[-1] + ) fraction_in_execution = float( re.findall( prefix + r" fraction of total: (\d+\.?\d*) in execution", output @@ -440,6 +480,11 @@ def extract_run_results( fraction_of_execution_in_vm = float( re.findall(prefix + r" fraction of execution (\d+\.?\d*) in VM", output)[-1] ) + fraction_in_ledger_update = float( + re.findall(prefix + r" fraction of total: (\d+\.?\d*) in ledger update", output)[ + -1 + ] + ) fraction_in_commit = float( re.findall(prefix + r" fraction of total: (\d+\.?\d*) in commit", output)[ -1 @@ -455,8 +500,10 @@ def extract_run_results( gpt=gpt, storage_fee_pt=storage_fee_pt, output_bps=output_bps, + fraction_in_sig_verify=fraction_in_sig_verify, fraction_in_execution=fraction_in_execution, fraction_of_execution_in_vm=fraction_of_execution_in_vm, + fraction_in_ledger_update=fraction_in_ledger_update, fraction_in_commit=fraction_in_commit, ) @@ -464,29 +511,37 @@ def extract_run_results( def print_table( results: Sequence[RunGroupInstance], by_levels: bool, - single_field: Optional[Tuple[str, Callable[[RunResults], Any]]], + only_fields: List[Tuple[str, Callable[[RunGroupInstance], Any]]], number_of_execution_threads=EXECUTION_ONLY_NUMBER_OF_THREADS, ): headers = [ "transaction_type", "module_working_set", "executor", - "block_size", - "expected t/s", ] + + if not only_fields: + headers.extend( + [ + "block_size", + "expected t/s", + ] + ) + if by_levels: headers.extend( [f"exe_only {num_threads}" for num_threads in number_of_execution_threads] ) - assert single_field is not None + assert only_fields - if single_field is not None: - field_name, _ = single_field - headers.append(field_name) + if only_fields: + for field_name, _ in only_fields: + headers.append(field_name) else: headers.extend( [ "t/s", + "sigver/total", "exe/total", "vm/exe", "commit/total", @@ -509,9 +564,15 @@ def print_table( result.block_size, result.expected_tps, ] + if not only_fields: + row.extend( + [ + result.block_size, + result.expected_tps, + ] + ) if by_levels: - if single_field is not None: - _, field_getter = single_field + for _, field_getter in only_fields: for num_threads in number_of_execution_threads: if num_threads in result.number_of_threads_results: row.append( @@ -520,11 +581,12 @@ def print_table( else: row.append("-") - if single_field is not None: - _, field_getter = single_field - row.append(field_getter(result.single_node_result)) + if only_fields: + for _, field_getter in only_fields: + row.append(field_getter(result)) else: row.append(int(round(result.single_node_result.tps))) + row.append(round(result.single_node_result.fraction_in_sig_verify, 3)) row.append(round(result.single_node_result.fraction_in_execution, 3)) row.append(round(result.single_node_result.fraction_of_execution_in_vm, 3)) row.append(round(result.single_node_result.fraction_in_commit, 3)) @@ -602,9 +664,6 @@ def print_table( if SELECTED_FLOW not in test.included_in: continue - if SKIP_NATIVE and test.key.executor_type == "native": - continue - if test.expected_tps is not None: print(f"WARNING: using uncalibrated TPS for {test.key}") criteria = Criteria( @@ -653,21 +712,39 @@ def print_table( ) workload_args_str = f"--transaction-type {transaction_type_list} --transaction-weights {transaction_weights_list}" + pipeline_extra_args = [] + + number_of_execution_threads = NUMBER_OF_EXECUTION_THREADS + if test.key_extra.execution_num_threads_override: + number_of_execution_threads = test.key_extra.execution_num_threads_override + + if test.key_extra.sig_verify_num_threads_override: + pipeline_extra_args.extend(["--sig-verify-num-threads", str(test.key_extra.sig_verify_num_threads_override)]) + + if test.key_extra.split_stages_override: + pipeline_extra_args.append("--split-stages") + sharding_traffic_flags = test.key_extra.sharding_traffic_flags or "" if test.key.executor_type == "VM": executor_type_str = "--transactions-per-sender 1" elif test.key.executor_type == "native": - executor_type_str = "--use-native-executor --transactions-per-sender 1" + executor_type_str = ( + "--use-native-loose-block-executor --transactions-per-sender 1" + ) elif test.key.executor_type == "sharded": - executor_type_str = f"--num-executor-shards {NUMBER_OF_EXECUTION_THREADS} {sharding_traffic_flags}" + executor_type_str = f"--num-executor-shards {number_of_execution_threads} {sharding_traffic_flags}" else: raise Exception(f"executor type not supported {test.key.executor_type}") - txn_emitter_prefix_str = "" if NUM_BLOCKS > 200 else " --generate-then-execute" + + if NUM_BLOCKS < 200: + pipeline_extra_args.append("--generate-then-execute") + + pipeline_extra_args_str = " ".join(pipeline_extra_args) ADDITIONAL_DST_POOL_ACCOUNTS = 2 * MAX_BLOCK_SIZE * NUM_BLOCKS - common_command_suffix = f"{executor_type_str} {txn_emitter_prefix_str} --block-size {cur_block_size} {DB_CONFIG_FLAGS} {DB_PRUNER_FLAGS} run-executor {FEATURE_FLAGS} {workload_args_str} --module-working-set-size {test.key.module_working_set_size} --main-signer-accounts {MAIN_SIGNER_ACCOUNTS} --additional-dst-pool-accounts {ADDITIONAL_DST_POOL_ACCOUNTS} --data-dir {tmpdirname}/db --checkpoint-dir {tmpdirname}/cp" + common_command_suffix = f"{executor_type_str} {pipeline_extra_args_str} --block-size {cur_block_size} {DB_CONFIG_FLAGS} {DB_PRUNER_FLAGS} run-executor {FEATURE_FLAGS} {workload_args_str} --module-working-set-size {test.key.module_working_set_size} --main-signer-accounts {MAIN_SIGNER_ACCOUNTS} --additional-dst-pool-accounts {ADDITIONAL_DST_POOL_ACCOUNTS} --data-dir {tmpdirname}/db --checkpoint-dir {tmpdirname}/cp" number_of_threads_results = {} @@ -679,7 +756,7 @@ def print_table( output, "Overall execution" ) - test_db_command = f"RUST_BACKTRACE=1 {BUILD_FOLDER}/aptos-executor-benchmark --execution-threads {NUMBER_OF_EXECUTION_THREADS} {common_command_suffix} --blocks {NUM_BLOCKS}" + test_db_command = f"RUST_BACKTRACE=1 {BUILD_FOLDER}/aptos-executor-benchmark --execution-threads {number_of_execution_threads} {common_command_suffix} --blocks {NUM_BLOCKS}" output = execute_command(test_db_command) single_node_result = extract_run_results(output, "Overall") @@ -729,7 +806,7 @@ def print_table( "module_working_set_size": test.key.module_working_set_size, "executor_type": test.key.executor_type, "block_size": cur_block_size, - "execution_threads": NUMBER_OF_EXECUTION_THREADS, + "execution_threads": number_of_execution_threads, "warmup_num_accounts": NUM_ACCOUNTS, "expected_tps": criteria.expected_tps, "expected_min_tps": criteria.min_tps, @@ -738,6 +815,11 @@ def print_table( "tps": single_node_result.tps, "gps": single_node_result.gps, "gpt": single_node_result.gpt, + "fraction_in_sig_verify": single_node_result.fraction_in_sig_verify, + "fraction_in_execution": single_node_result.fraction_in_execution, + "fraction_of_execution_in_vm": single_node_result.fraction_of_execution_in_vm, + "fraction_in_ledger_update": single_node_result.fraction_in_ledger_update, + "fraction_in_commit": single_node_result.fraction_in_commit, "code_perf_version": CODE_PERF_VERSION, "flow": str(SELECTED_FLOW), "test_index": test_index, @@ -749,40 +831,100 @@ def print_table( print_table( results, by_levels=True, - single_field=("t/s", lambda r: int(round(r.tps))), + only_fields=[ + ("block_size", lambda r: r.block_size), + ("expected t/s", lambda r: r.expected_tps), + ("t/s", lambda r: int(round(r.single_node_result.tps))), + ], ) print_table( - results, + results[1:], by_levels=True, - single_field=("g/s", lambda r: int(round(r.gps))), - ) - print_table( - results, - by_levels=False, - single_field=("gas/txn", lambda r: int(round(r.gpt))), - ) - print_table( - results, - by_levels=False, - single_field=( - "storage fee/txn", - lambda r: int(round(r.storage_fee_pt)), - ), + only_fields=[ + ("g/s", lambda r: int(round(r.single_node_result.gps))), + ("gas/txn", lambda r: int(round(r.single_node_result.gpt))), + ( + "storage fee/txn", + lambda r: int(round(r.single_node_result.storage_fee_pt)), + ), + ], ) print_table( - results, + results[1:], by_levels=True, - single_field=("exe/total", lambda r: round(r.fraction_in_execution, 3)), + only_fields=[ + ( + "sigver/total", + lambda r: round(r.single_node_result.fraction_in_sig_verify, 3), + ), + ( + "exe/total", + lambda r: round(r.single_node_result.fraction_in_execution, 3), + ), + ( + "vm/exe", + lambda r: round( + r.single_node_result.fraction_of_execution_in_vm, 3 + ), + ), + ( + "ledger/total", + lambda r: round(r.single_node_result.fraction_in_ledger_update, 3), + ), + ( + "commit/total", + lambda r: round(r.single_node_result.fraction_in_commit, 3), + ), + ], ) print_table( - results, + results[1:], by_levels=True, - single_field=( - "vm/exe", - lambda r: round(r.fraction_of_execution_in_vm, 3), - ), + only_fields=[ + ( + "sigver tps", + lambda r: round( + r.single_node_result.tps + / max(r.single_node_result.fraction_in_sig_verify, 0.001), + 1, + ), + ), + ( + "exe tps", + lambda r: round( + r.single_node_result.tps + / max(r.single_node_result.fraction_in_execution, 0.001), + 1, + ), + ), + ( + "vm tps", + lambda r: round( + r.single_node_result.tps + / max(r.single_node_result.fraction_in_execution, 0.001) + / max(r.single_node_result.fraction_of_execution_in_vm, 0.001), + 1, + ), + ), + ( + "ledger tps", + lambda r: round( + r.single_node_result.tps + / max(r.single_node_result.fraction_in_ledger_update, 0.001), + 1, + ), + ), + ( + "commit tps", + lambda r: round( + r.single_node_result.tps + / max(r.single_node_result.fraction_in_commit, 0.001), + 1, + ), + ), + ], ) - print_table(results, by_levels=False, single_field=None) + print_table(results, by_levels=False, only_fields=None) if single_node_result.tps < criteria.min_tps: text = f"regression detected {single_node_result.tps}, expected median {criteria.expected_tps}, threshold: {criteria.min_tps}), {test.key} didn't meet TPS requirements" diff --git a/types/src/account_config/resources/fungible_store.rs b/types/src/account_config/resources/fungible_store.rs index e8066ca712c00..cd58a4bd68ca2 100644 --- a/types/src/account_config/resources/fungible_store.rs +++ b/types/src/account_config/resources/fungible_store.rs @@ -2,12 +2,9 @@ // SPDX-License-Identifier: Apache-2.0 use super::aggregator::AggregatorResource; -use crate::account_address::create_derived_object_address; +use crate::{account_address::create_derived_object_address, contract_event::ContractEvent}; use move_core_types::{ - account_address::AccountAddress, - ident_str, - identifier::IdentStr, - move_resource::{MoveResource, MoveStructType}, + account_address::AccountAddress, ident_str, identifier::IdentStr, language_storage::TypeTag, move_resource::{MoveResource, MoveStructType} }; #[cfg(any(test, feature = "fuzzing"))] use proptest_derive::Arbitrary; @@ -94,3 +91,40 @@ impl MoveStructType for MigrationFlag { } impl MoveResource for MigrationFlag {} + +pub trait MoveEventV2: MoveStructType + Serialize { + fn create_event_v2(&self) -> ContractEvent { + ContractEvent::new_v2( + TypeTag::Struct(Box::new(Self::struct_tag())), + bcs::to_bytes(self).unwrap() + ) + } +} + +/// Struct that represents a Withdraw event. +#[derive(Debug, Serialize, Deserialize)] +pub struct WithdrawFAEvent { + pub store: AccountAddress, + pub amount: u64, +} + +impl MoveEventV2 for WithdrawFAEvent {} + +impl MoveStructType for WithdrawFAEvent { + const MODULE_NAME: &'static IdentStr = ident_str!("fungble_asset"); + const STRUCT_NAME: &'static IdentStr = ident_str!("Withdraw"); +} + +/// Struct that represents a Deposit event. +#[derive(Debug, Serialize, Deserialize)] +pub struct DepositFAEvent { + pub store: AccountAddress, + pub amount: u64, +} + +impl MoveEventV2 for DepositFAEvent {} + +impl MoveStructType for DepositFAEvent { + const MODULE_NAME: &'static IdentStr = ident_str!("fungble_asset"); + const STRUCT_NAME: &'static IdentStr = ident_str!("Deposit"); +}