diff --git a/.gitlab/pipeline/test.yml b/.gitlab/pipeline/test.yml index d244316000aa..476ac6333f58 100644 --- a/.gitlab/pipeline/test.yml +++ b/.gitlab/pipeline/test.yml @@ -503,6 +503,7 @@ subsystem-regression-tests: - .run-immediately script: - cargo bench --profile=testnet -p polkadot-availability-recovery --bench availability-recovery-regression-bench --features subsystem-benchmarks + - cargo bench --profile=testnet -p polkadot-availability-distribution --bench availability-distribution-regression-bench --features subsystem-benchmarks tags: - benchmark allow_failure: true diff --git a/polkadot/node/network/availability-distribution/Cargo.toml b/polkadot/node/network/availability-distribution/Cargo.toml index 182d92cb1631..ac606bd377f7 100644 --- a/polkadot/node/network/availability-distribution/Cargo.toml +++ b/polkadot/node/network/availability-distribution/Cargo.toml @@ -39,9 +39,9 @@ polkadot-primitives-test-helpers = { path = "../../../primitives/test-helpers" } polkadot-subsystem-bench = { path = "../../subsystem-bench" } -[[test]] +[[bench]] name = "availability-distribution-regression-bench" -path = "tests/availability-distribution-regression-bench.rs" +path = "benches/availability-distribution-regression-bench.rs" harness = false required-features = ["subsystem-benchmarks"] diff --git a/polkadot/node/network/availability-distribution/tests/availability-distribution-regression-bench.rs b/polkadot/node/network/availability-distribution/benches/availability-distribution-regression-bench.rs similarity index 57% rename from polkadot/node/network/availability-distribution/tests/availability-distribution-regression-bench.rs rename to polkadot/node/network/availability-distribution/benches/availability-distribution-regression-bench.rs index bdab11298d5c..019eb1222082 100644 --- a/polkadot/node/network/availability-distribution/tests/availability-distribution-regression-bench.rs +++ b/polkadot/node/network/availability-distribution/benches/availability-distribution-regression-bench.rs @@ -24,46 +24,55 @@ //! - availability-store use polkadot_subsystem_bench::{ - availability::{benchmark_availability_write, prepare_test, TestDataAvailability, TestState}, + availability::{benchmark_availability_write, prepare_test, TestState}, configuration::TestConfiguration, - utils::{warm_up_and_benchmark, WarmUpOptions}, + usage::BenchmarkUsage, }; +use std::io::Write; + +const BENCH_COUNT: usize = 50; fn main() -> Result<(), String> { let mut messages = vec![]; let mut config = TestConfiguration::default(); - // A single node effort roughly n_cores * needed_approvals / n_validators = 60 * 30 / 300 - config.n_cores = 6; + // A single node effort roughly + config.n_cores = 10; + config.n_validators = 500; config.num_blocks = 3; config.generate_pov_sizes(); + let state = TestState::new(&config); - let usage = warm_up_and_benchmark( - WarmUpOptions::new(&[ - "availability-distribution", - "bitfield-distribution", - "availability-store", - ]), - || { - let mut state = TestState::new(&config); - let (mut env, _protocol_config) = - prepare_test(config.clone(), &mut state, TestDataAvailability::Write, false); + println!("Benchmarking..."); + let usages: Vec = (0..BENCH_COUNT) + .map(|n| { + print!("\r[{}{}]", "#".repeat(n), "_".repeat(BENCH_COUNT - n)); + std::io::stdout().flush().unwrap(); + let (mut env, _cfgs) = prepare_test( + &state, + polkadot_subsystem_bench::availability::TestDataAvailability::Write, + false, + ); env.runtime().block_on(benchmark_availability_write( "data_availability_write", &mut env, - state, + &state, )) - }, - )?; - println!("{}", usage); + }) + .collect(); + println!("\rDone!{}", " ".repeat(BENCH_COUNT)); + let average_usage = BenchmarkUsage::average(&usages); + println!("{}", average_usage); - messages.extend(usage.check_network_usage(&[ - ("Received from peers", 443.333, 0.05), - ("Sent to peers", 21818.555, 0.05), + // We expect no variance for received and sent + // but use 0.001 because we operate with floats + messages.extend(average_usage.check_network_usage(&[ + ("Received from peers", 433.3, 0.001), + ("Sent to peers", 18480.0, 0.001), ])); - messages.extend(usage.check_cpu_usage(&[ - ("availability-distribution", 0.011, 0.05), - ("bitfield-distribution", 0.029, 0.05), - ("availability-store", 0.232, 0.05), + messages.extend(average_usage.check_cpu_usage(&[ + ("availability-distribution", 0.012, 0.05), + ("availability-store", 0.153, 0.05), + ("bitfield-distribution", 0.026, 0.05), ])); if messages.is_empty() { diff --git a/polkadot/node/network/availability-recovery/benches/availability-recovery-regression-bench.rs b/polkadot/node/network/availability-recovery/benches/availability-recovery-regression-bench.rs index 42b1787e0450..5e8b81be82dd 100644 --- a/polkadot/node/network/availability-recovery/benches/availability-recovery-regression-bench.rs +++ b/polkadot/node/network/availability-recovery/benches/availability-recovery-regression-bench.rs @@ -14,9 +14,9 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -//! availability-write regression tests +//! availability-read regression tests //! -//! Availability write benchmark based on Kusama parameters and scale. +//! Availability read benchmark based on Kusama parameters and scale. //! //! Subsystems involved: //! - availability-recovery @@ -27,8 +27,11 @@ use polkadot_subsystem_bench::{ TestDataAvailability, TestState, }, configuration::TestConfiguration, - utils::{warm_up_and_benchmark, WarmUpOptions}, + usage::BenchmarkUsage, }; +use std::io::Write; + +const BENCH_COUNT: usize = 50; fn main() -> Result<(), String> { let mut messages = vec![]; @@ -38,27 +41,33 @@ fn main() -> Result<(), String> { config.num_blocks = 3; config.generate_pov_sizes(); - let usage = warm_up_and_benchmark(WarmUpOptions::new(&["availability-recovery"]), || { - let mut state = TestState::new(&config); - let (mut env, _protocol_config) = prepare_test( - config.clone(), - &mut state, - TestDataAvailability::Read(options.clone()), - false, - ); - env.runtime().block_on(benchmark_availability_read( - "data_availability_read", - &mut env, - state, - )) - })?; - println!("{}", usage); + let state = TestState::new(&config); + + println!("Benchmarking..."); + let usages: Vec = (0..BENCH_COUNT) + .map(|n| { + print!("\r[{}{}]", "#".repeat(n), "_".repeat(BENCH_COUNT - n)); + std::io::stdout().flush().unwrap(); + let (mut env, _cfgs) = + prepare_test(&state, TestDataAvailability::Read(options.clone()), false); + env.runtime().block_on(benchmark_availability_read( + "data_availability_read", + &mut env, + &state, + )) + }) + .collect(); + println!("\rDone!{}", " ".repeat(BENCH_COUNT)); + let average_usage = BenchmarkUsage::average(&usages); + println!("{}", average_usage); - messages.extend(usage.check_network_usage(&[ - ("Received from peers", 307200.000, 0.05), - ("Sent to peers", 1.667, 0.05), + // We expect no variance for received and sent + // but use 0.001 because we operate with floats + messages.extend(average_usage.check_network_usage(&[ + ("Received from peers", 307200.000, 0.001), + ("Sent to peers", 1.667, 0.001), ])); - messages.extend(usage.check_cpu_usage(&[("availability-recovery", 11.500, 0.05)])); + messages.extend(average_usage.check_cpu_usage(&[("availability-recovery", 11.500, 0.05)])); if messages.is_empty() { Ok(()) diff --git a/polkadot/node/subsystem-bench/Cargo.toml b/polkadot/node/subsystem-bench/Cargo.toml index 71711ad0fbd0..2570fe9cfa23 100644 --- a/polkadot/node/subsystem-bench/Cargo.toml +++ b/polkadot/node/subsystem-bench/Cargo.toml @@ -56,7 +56,7 @@ bitvec = "1.0.1" kvdb-memorydb = "0.13.0" parity-scale-codec = { version = "3.6.1", features = ["derive", "std"] } -tokio = "1.24.2" +tokio = { version = "1.24.2", features = ["rt-multi-thread", "parking_lot"] } clap-num = "1.0.2" polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" } sp-keyring = { path = "../../../substrate/primitives/keyring" } diff --git a/polkadot/node/subsystem-bench/src/cli/subsystem-bench.rs b/polkadot/node/subsystem-bench/src/cli/subsystem-bench.rs index deb351360d74..10953b6c7839 100644 --- a/polkadot/node/subsystem-bench/src/cli/subsystem-bench.rs +++ b/polkadot/node/subsystem-bench/src/cli/subsystem-bench.rs @@ -136,31 +136,29 @@ impl BenchCli { let usage = match objective { TestObjective::DataAvailabilityRead(opts) => { - let mut state = availability::TestState::new(&test_config); + let state = availability::TestState::new(&test_config); let (mut env, _protocol_config) = availability::prepare_test( - test_config, - &mut state, + &state, availability::TestDataAvailability::Read(opts), true, ); env.runtime().block_on(availability::benchmark_availability_read( &benchmark_name, &mut env, - state, + &state, )) }, TestObjective::DataAvailabilityWrite => { - let mut state = availability::TestState::new(&test_config); + let state = availability::TestState::new(&test_config); let (mut env, _protocol_config) = availability::prepare_test( - test_config, - &mut state, + &state, availability::TestDataAvailability::Write, true, ); env.runtime().block_on(availability::benchmark_availability_write( &benchmark_name, &mut env, - state, + &state, )) }, TestObjective::ApprovalVoting(ref options) => { diff --git a/polkadot/node/subsystem-bench/src/lib/availability/mod.rs b/polkadot/node/subsystem-bench/src/lib/availability/mod.rs index dc4e1e403102..765afdd5912b 100644 --- a/polkadot/node/subsystem-bench/src/lib/availability/mod.rs +++ b/polkadot/node/subsystem-bench/src/lib/availability/mod.rs @@ -15,11 +15,11 @@ // along with Polkadot. If not, see . use crate::{ - configuration::TestConfiguration, + availability::av_store_helpers::new_av_store, dummy_builder, environment::{TestEnvironment, TestEnvironmentDependencies, GENESIS_HASH}, mock::{ - av_store::{self, MockAvailabilityStore}, + av_store::{self, MockAvailabilityStore, NetworkAvailabilityState}, chain_api::{ChainApiState, MockChainApi}, network_bridge::{self, MockNetworkBridgeRx, MockNetworkBridgeTx}, runtime_api::{self, MockRuntimeApi}, @@ -28,12 +28,8 @@ use crate::{ network::new_network, usage::BenchmarkUsage, }; -use av_store::NetworkAvailabilityState; -use av_store_helpers::new_av_store; -use bitvec::bitvec; use colored::Colorize; use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt}; -use itertools::Itertools; use parity_scale_codec::Encode; use polkadot_availability_bitfield_distribution::BitfieldDistribution; use polkadot_availability_distribution::{ @@ -43,37 +39,27 @@ use polkadot_availability_recovery::AvailabilityRecoverySubsystem; use polkadot_node_core_av_store::AvailabilityStoreSubsystem; use polkadot_node_metrics::metrics::Metrics; use polkadot_node_network_protocol::{ - request_response::{v1::ChunkFetchingRequest, IncomingRequest, ReqProtocolNames}, - OurView, Versioned, VersionedValidationProtocol, + request_response::{IncomingRequest, ReqProtocolNames}, + OurView, }; -use polkadot_node_primitives::{AvailableData, BlockData, ErasureChunk, PoV}; use polkadot_node_subsystem::{ messages::{AllMessages, AvailabilityRecoveryMessage}, Overseer, OverseerConnector, SpawnGlue, }; -use polkadot_node_subsystem_test_helpers::{ - derive_erasure_chunks_with_proofs_and_root, mock::new_block_import_info, -}; use polkadot_node_subsystem_types::{ messages::{AvailabilityStoreMessage, NetworkBridgeEvent}, Span, }; use polkadot_overseer::{metrics::Metrics as OverseerMetrics, Handle as OverseerHandle}; -use polkadot_primitives::{ - AvailabilityBitfield, BlockNumber, CandidateHash, CandidateReceipt, GroupIndex, Hash, HeadData, - Header, PersistedValidationData, Signed, SigningContext, ValidatorIndex, -}; -use polkadot_primitives_test_helpers::{dummy_candidate_receipt, dummy_hash}; -use sc_network::{ - request_responses::{IncomingRequest as RawIncomingRequest, ProtocolConfig}, - PeerId, -}; +use polkadot_primitives::GroupIndex; +use sc_network::request_responses::{IncomingRequest as RawIncomingRequest, ProtocolConfig}; use sc_service::SpawnTaskHandle; use serde::{Deserialize, Serialize}; -use sp_core::H256; -use std::{collections::HashMap, iter::Cycle, ops::Sub, sync::Arc, time::Instant}; +use std::{ops::Sub, sync::Arc, time::Instant}; +pub use test_state::TestState; mod av_store_helpers; +mod test_state; const LOG_TARGET: &str = "subsystem-bench::availability"; @@ -149,94 +135,48 @@ fn build_overseer_for_availability_write( (overseer, OverseerHandle::new(raw_handle)) } -/// Takes a test configuration and uses it to create the `TestEnvironment`. pub fn prepare_test( - config: TestConfiguration, - state: &mut TestState, - mode: TestDataAvailability, - with_prometheus_endpoint: bool, -) -> (TestEnvironment, Vec) { - prepare_test_inner( - config, - state, - mode, - TestEnvironmentDependencies::default(), - with_prometheus_endpoint, - ) -} - -fn prepare_test_inner( - config: TestConfiguration, - state: &mut TestState, + state: &TestState, mode: TestDataAvailability, - dependencies: TestEnvironmentDependencies, with_prometheus_endpoint: bool, ) -> (TestEnvironment, Vec) { - // Generate test authorities. - let test_authorities = config.generate_authorities(); - - let mut candidate_hashes: HashMap> = HashMap::new(); - - // Prepare per block candidates. - // Genesis block is always finalized, so we start at 1. - for block_num in 1..=config.num_blocks { - for _ in 0..config.n_cores { - candidate_hashes - .entry(Hash::repeat_byte(block_num as u8)) - .or_default() - .push(state.next_candidate().expect("Cycle iterator")) - } - - // First candidate is our backed candidate. - state.backed_candidates.push( - candidate_hashes - .get(&Hash::repeat_byte(block_num as u8)) - .expect("just inserted above") - .first() - .expect("just inserted above") - .clone(), - ); - } - - let runtime_api = runtime_api::MockRuntimeApi::new( - config.clone(), - test_authorities.clone(), - candidate_hashes, - Default::default(), - Default::default(), - 0, - ); - - let availability_state = NetworkAvailabilityState { - candidate_hashes: state.candidate_hashes.clone(), - available_data: state.available_data.clone(), - chunks: state.chunks.clone(), - }; - - let mut req_cfgs = Vec::new(); - let (collation_req_receiver, collation_req_cfg) = IncomingRequest::get_config_receiver(&ReqProtocolNames::new(GENESIS_HASH, None)); - req_cfgs.push(collation_req_cfg); - let (pov_req_receiver, pov_req_cfg) = IncomingRequest::get_config_receiver(&ReqProtocolNames::new(GENESIS_HASH, None)); - let (chunk_req_receiver, chunk_req_cfg) = IncomingRequest::get_config_receiver(&ReqProtocolNames::new(GENESIS_HASH, None)); - req_cfgs.push(pov_req_cfg); + let req_cfgs = vec![collation_req_cfg, pov_req_cfg]; - let (network, network_interface, network_receiver) = - new_network(&config, &dependencies, &test_authorities, vec![Arc::new(availability_state)]); + let dependencies = TestEnvironmentDependencies::default(); + let availability_state = NetworkAvailabilityState { + candidate_hashes: state.candidate_hashes.clone(), + available_data: state.available_data.clone(), + chunks: state.chunks.clone(), + }; + let (network, network_interface, network_receiver) = new_network( + &state.config, + &dependencies, + &state.test_authorities, + vec![Arc::new(availability_state.clone())], + ); let network_bridge_tx = network_bridge::MockNetworkBridgeTx::new( network.clone(), network_interface.subsystem_sender(), - test_authorities.clone(), + state.test_authorities.clone(), ); - let network_bridge_rx = - network_bridge::MockNetworkBridgeRx::new(network_receiver, Some(chunk_req_cfg.clone())); + network_bridge::MockNetworkBridgeRx::new(network_receiver, Some(chunk_req_cfg)); + + let runtime_api = runtime_api::MockRuntimeApi::new( + state.config.clone(), + state.test_authorities.clone(), + state.candidate_receipts.clone(), + Default::default(), + Default::default(), + 0, + ); let (overseer, overseer_handle) = match &mode { TestDataAvailability::Read(options) => { @@ -271,27 +211,12 @@ fn prepare_test_inner( }, TestDataAvailability::Write => { let availability_distribution = AvailabilityDistributionSubsystem::new( - test_authorities.keyring.keystore(), + state.test_authorities.keyring.keystore(), IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver }, Metrics::try_register(&dependencies.registry).unwrap(), ); - let block_headers = (1..=config.num_blocks) - .map(|block_number| { - ( - Hash::repeat_byte(block_number as u8), - Header { - digest: Default::default(), - number: block_number as BlockNumber, - parent_hash: Default::default(), - extrinsics_root: Default::default(), - state_root: Default::default(), - }, - ) - }) - .collect::>(); - - let chain_api_state = ChainApiState { block_headers }; + let chain_api_state = ChainApiState { block_headers: state.block_headers.clone() }; let chain_api = MockChainApi::new(chain_api_state); let bitfield_distribution = BitfieldDistribution::new(Metrics::try_register(&dependencies.registry).unwrap()); @@ -311,167 +236,42 @@ fn prepare_test_inner( ( TestEnvironment::new( dependencies, - config, + state.config.clone(), network, overseer, overseer_handle, - test_authorities, + state.test_authorities.clone(), with_prometheus_endpoint, ), req_cfgs, ) } -#[derive(Clone)] -pub struct TestState { - // Full test configuration - config: TestConfiguration, - // A cycle iterator on all PoV sizes used in the test. - pov_sizes: Cycle>, - // Generated candidate receipts to be used in the test - candidates: Cycle>, - // Map from pov size to candidate index - pov_size_to_candidate: HashMap, - // Map from generated candidate hashes to candidate index in `available_data` - // and `chunks`. - candidate_hashes: HashMap, - // Per candidate index receipts. - candidate_receipt_templates: Vec, - // Per candidate index `AvailableData` - available_data: Vec, - // Per candiadte index chunks - chunks: Vec>, - // Per relay chain block - candidate backed by our backing group - backed_candidates: Vec, -} - -impl TestState { - pub fn next_candidate(&mut self) -> Option { - let candidate = self.candidates.next(); - let candidate_hash = candidate.as_ref().unwrap().hash(); - gum::trace!(target: LOG_TARGET, "Next candidate selected {:?}", candidate_hash); - candidate - } - - /// Generate candidates to be used in the test. - fn generate_candidates(&mut self) { - let count = self.config.n_cores * self.config.num_blocks; - gum::info!(target: LOG_TARGET,"{}", format!("Pre-generating {} candidates.", count).bright_blue()); - - // Generate all candidates - self.candidates = (0..count) - .map(|index| { - let pov_size = self.pov_sizes.next().expect("This is a cycle; qed"); - let candidate_index = *self - .pov_size_to_candidate - .get(&pov_size) - .expect("pov_size always exists; qed"); - let mut candidate_receipt = - self.candidate_receipt_templates[candidate_index].clone(); - - // Make it unique. - candidate_receipt.descriptor.relay_parent = Hash::from_low_u64_be(index as u64); - // Store the new candidate in the state - self.candidate_hashes.insert(candidate_receipt.hash(), candidate_index); - - gum::debug!(target: LOG_TARGET, candidate_hash = ?candidate_receipt.hash(), "new candidate"); - - candidate_receipt - }) - .collect::>() - .into_iter() - .cycle(); - } - - pub fn new(config: &TestConfiguration) -> Self { - let config = config.clone(); - - let mut chunks = Vec::new(); - let mut available_data = Vec::new(); - let mut candidate_receipt_templates = Vec::new(); - let mut pov_size_to_candidate = HashMap::new(); - - // we use it for all candidates. - let persisted_validation_data = PersistedValidationData { - parent_head: HeadData(vec![7, 8, 9]), - relay_parent_number: Default::default(), - max_pov_size: 1024, - relay_parent_storage_root: Default::default(), - }; - - // For each unique pov we create a candidate receipt. - for (index, pov_size) in config.pov_sizes().iter().cloned().unique().enumerate() { - gum::info!(target: LOG_TARGET, index, pov_size, "{}", "Generating template candidate".bright_blue()); - - let mut candidate_receipt = dummy_candidate_receipt(dummy_hash()); - let pov = PoV { block_data: BlockData(vec![index as u8; pov_size]) }; - - let new_available_data = AvailableData { - validation_data: persisted_validation_data.clone(), - pov: Arc::new(pov), - }; - - let (new_chunks, erasure_root) = derive_erasure_chunks_with_proofs_and_root( - config.n_validators, - &new_available_data, - |_, _| {}, - ); - - candidate_receipt.descriptor.erasure_root = erasure_root; - - chunks.push(new_chunks); - available_data.push(new_available_data); - pov_size_to_candidate.insert(pov_size, index); - candidate_receipt_templates.push(candidate_receipt); - } - - gum::info!(target: LOG_TARGET, "{}","Created test environment.".bright_blue()); - - let mut _self = Self { - available_data, - candidate_receipt_templates, - chunks, - pov_size_to_candidate, - pov_sizes: Vec::from(config.pov_sizes()).into_iter().cycle(), - candidate_hashes: HashMap::new(), - candidates: Vec::new().into_iter().cycle(), - backed_candidates: Vec::new(), - config, - }; - - _self.generate_candidates(); - _self - } - - pub fn backed_candidates(&mut self) -> &mut Vec { - &mut self.backed_candidates - } -} - pub async fn benchmark_availability_read( benchmark_name: &str, env: &mut TestEnvironment, - mut state: TestState, + state: &TestState, ) -> BenchmarkUsage { let config = env.config().clone(); - env.import_block(new_block_import_info(Hash::repeat_byte(1), 1)).await; - - let test_start = Instant::now(); - let mut batch = FuturesUnordered::new(); - let mut availability_bytes = 0u128; - env.metrics().set_n_validators(config.n_validators); env.metrics().set_n_cores(config.n_cores); - for block_num in 1..=env.config().num_blocks { + let mut batch = FuturesUnordered::new(); + let mut availability_bytes = 0u128; + let mut candidates = state.candidates.clone(); + let test_start = Instant::now(); + for block_info in state.block_infos.iter() { + let block_num = block_info.number as usize; gum::info!(target: LOG_TARGET, "Current block {}/{}", block_num, env.config().num_blocks); env.metrics().set_current_block(block_num); let block_start_ts = Instant::now(); + env.import_block(block_info.clone()).await; + for candidate_num in 0..config.n_cores as u64 { let candidate = - state.next_candidate().expect("We always send up to n_cores*num_blocks; qed"); + candidates.next().expect("We always send up to n_cores*num_blocks; qed"); let (tx, rx) = oneshot::channel(); batch.push(rx); @@ -519,7 +319,7 @@ pub async fn benchmark_availability_read( pub async fn benchmark_availability_write( benchmark_name: &str, env: &mut TestEnvironment, - mut state: TestState, + state: &TestState, ) -> BenchmarkUsage { let config = env.config().clone(); @@ -527,7 +327,7 @@ pub async fn benchmark_availability_write( env.metrics().set_n_cores(config.n_cores); gum::info!(target: LOG_TARGET, "Seeding availability store with candidates ..."); - for backed_candidate in state.backed_candidates().clone() { + for backed_candidate in state.backed_candidates.clone() { let candidate_index = *state.candidate_hashes.get(&backed_candidate.hash()).unwrap(); let available_data = state.available_data[candidate_index].clone(); let (tx, rx) = oneshot::channel(); @@ -550,15 +350,14 @@ pub async fn benchmark_availability_write( gum::info!(target: LOG_TARGET, "Done"); let test_start = Instant::now(); - - for block_num in 1..=env.config().num_blocks { + for block_info in state.block_infos.iter() { + let block_num = block_info.number as usize; gum::info!(target: LOG_TARGET, "Current block #{}", block_num); env.metrics().set_current_block(block_num); let block_start_ts = Instant::now(); - let relay_block_hash = Hash::repeat_byte(block_num as u8); - env.import_block(new_block_import_info(relay_block_hash, block_num as BlockNumber)) - .await; + let relay_block_hash = block_info.hash; + env.import_block(block_info.clone()).await; // Inform bitfield distribution about our view of current test block let message = polkadot_node_subsystem_types::messages::BitfieldDistributionMessage::NetworkBridgeUpdate( @@ -569,20 +368,13 @@ pub async fn benchmark_availability_write( let chunk_fetch_start_ts = Instant::now(); // Request chunks of our own backed candidate from all other validators. - let mut receivers = Vec::new(); - for index in 1..config.n_validators { + let payloads = state.chunk_fetching_requests.get(block_num - 1).expect("pregenerated"); + let receivers = (1..config.n_validators).filter_map(|index| { let (pending_response, pending_response_receiver) = oneshot::channel(); - let request = RawIncomingRequest { - peer: PeerId::random(), - payload: ChunkFetchingRequest { - candidate_hash: state.backed_candidates()[block_num - 1].hash(), - index: ValidatorIndex(index as u32), - } - .encode(), - pending_response, - }; - + let peer_id = *env.authorities().peer_ids.get(index).expect("all validators have ids"); + let payload = payloads.get(index).expect("pregenerated").clone(); + let request = RawIncomingRequest { peer: peer_id, payload, pending_response }; let peer = env .authorities() .validator_authority_id @@ -592,59 +384,39 @@ pub async fn benchmark_availability_write( if env.network().is_peer_connected(peer) && env.network().send_request_from_peer(peer, request).is_ok() { - receivers.push(pending_response_receiver); + Some(pending_response_receiver) + } else { + None } - } + }); gum::info!(target: LOG_TARGET, "Waiting for all emulated peers to receive their chunk from us ..."); - for receiver in receivers.into_iter() { - let response = receiver.await.expect("Chunk is always served successfully"); - // TODO: check if chunk is the one the peer expects to receive. - assert!(response.result.is_ok()); - } - let chunk_fetch_duration = Instant::now().sub(chunk_fetch_start_ts).as_millis(); + let responses = futures::future::try_join_all(receivers) + .await + .expect("Chunk is always served successfully"); + // TODO: check if chunk is the one the peer expects to receive. + assert!(responses.iter().all(|v| v.result.is_ok())); + let chunk_fetch_duration = Instant::now().sub(chunk_fetch_start_ts).as_millis(); gum::info!(target: LOG_TARGET, "All chunks received in {}ms", chunk_fetch_duration); - let signing_context = SigningContext { session_index: 0, parent_hash: relay_block_hash }; let network = env.network().clone(); let authorities = env.authorities().clone(); - let n_validators = config.n_validators; // Spawn a task that will generate `n_validator` - 1 signed bitfiends and // send them from the emulated peers to the subsystem. // TODO: Implement topology. - env.spawn_blocking("send-bitfields", async move { - for index in 1..n_validators { - let validator_public = - authorities.validator_public.get(index).expect("All validator keys are known"); - - // Node has all the chunks in the world. - let payload: AvailabilityBitfield = - AvailabilityBitfield(bitvec![u8, bitvec::order::Lsb0; 1u8; 32]); - // TODO(soon): Use pre-signed messages. This is quite intensive on the CPU. - let signed_bitfield = Signed::::sign( - &authorities.keyring.keystore(), - payload, - &signing_context, - ValidatorIndex(index as u32), - validator_public, - ) - .ok() - .flatten() - .expect("should be signed"); - - let from_peer = &authorities.validator_authority_id[index]; - - let message = peer_bitfield_message_v2(relay_block_hash, signed_bitfield); + let messages = state.signed_bitfields.get(&relay_block_hash).expect("pregenerated").clone(); + for index in 1..config.n_validators { + let from_peer = &authorities.validator_authority_id[index]; + let message = messages.get(index).expect("pregenerated").clone(); - // Send the action from peer only if it is connected to our node. - if network.is_peer_connected(from_peer) { - let _ = network.send_message_from_peer(from_peer, message); - } + // Send the action from peer only if it is connected to our node. + if network.is_peer_connected(from_peer) { + let _ = network.send_message_from_peer(from_peer, message); } - }); + } gum::info!( "Waiting for {} bitfields to be received and processed", @@ -679,17 +451,3 @@ pub async fn benchmark_availability_write( &["availability-distribution", "bitfield-distribution", "availability-store"], ) } - -pub fn peer_bitfield_message_v2( - relay_hash: H256, - signed_bitfield: Signed, -) -> VersionedValidationProtocol { - let bitfield = polkadot_node_network_protocol::v2::BitfieldDistributionMessage::Bitfield( - relay_hash, - signed_bitfield.into(), - ); - - Versioned::V2(polkadot_node_network_protocol::v2::ValidationProtocol::BitfieldDistribution( - bitfield, - )) -} diff --git a/polkadot/node/subsystem-bench/src/lib/availability/test_state.rs b/polkadot/node/subsystem-bench/src/lib/availability/test_state.rs new file mode 100644 index 000000000000..c328ffedf916 --- /dev/null +++ b/polkadot/node/subsystem-bench/src/lib/availability/test_state.rs @@ -0,0 +1,268 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use crate::configuration::{TestAuthorities, TestConfiguration}; +use bitvec::bitvec; +use colored::Colorize; +use itertools::Itertools; +use parity_scale_codec::Encode; +use polkadot_node_network_protocol::{ + request_response::v1::ChunkFetchingRequest, Versioned, VersionedValidationProtocol, +}; +use polkadot_node_primitives::{AvailableData, BlockData, ErasureChunk, PoV}; +use polkadot_node_subsystem_test_helpers::{ + derive_erasure_chunks_with_proofs_and_root, mock::new_block_import_info, +}; +use polkadot_overseer::BlockInfo; +use polkadot_primitives::{ + AvailabilityBitfield, BlockNumber, CandidateHash, CandidateReceipt, Hash, HeadData, Header, + PersistedValidationData, Signed, SigningContext, ValidatorIndex, +}; +use polkadot_primitives_test_helpers::{dummy_candidate_receipt, dummy_hash}; +use sp_core::H256; +use std::{collections::HashMap, iter::Cycle, sync::Arc}; + +const LOG_TARGET: &str = "subsystem-bench::availability::test_state"; + +#[derive(Clone)] +pub struct TestState { + // Full test configuration + pub config: TestConfiguration, + // A cycle iterator on all PoV sizes used in the test. + pub pov_sizes: Cycle>, + // Generated candidate receipts to be used in the test + pub candidates: Cycle>, + // Map from pov size to candidate index + pub pov_size_to_candidate: HashMap, + // Map from generated candidate hashes to candidate index in `available_data` and `chunks`. + pub candidate_hashes: HashMap, + // Per candidate index receipts. + pub candidate_receipt_templates: Vec, + // Per candidate index `AvailableData` + pub available_data: Vec, + // Per candiadte index chunks + pub chunks: Vec>, + // Per relay chain block - candidate backed by our backing group + pub backed_candidates: Vec, + // Relay chain block infos + pub block_infos: Vec, + // Chung fetching requests for backed candidates + pub chunk_fetching_requests: Vec>>, + // Pregenerated signed availability bitfields + pub signed_bitfields: HashMap>, + // Relay chain block headers + pub block_headers: HashMap, + // Authority keys for the network emulation. + pub test_authorities: TestAuthorities, + // Map from generated candidate receipts + pub candidate_receipts: HashMap>, +} + +impl TestState { + pub fn new(config: &TestConfiguration) -> Self { + let mut test_state = Self { + available_data: Default::default(), + candidate_receipt_templates: Default::default(), + chunks: Default::default(), + pov_size_to_candidate: Default::default(), + pov_sizes: Vec::from(config.pov_sizes()).into_iter().cycle(), + candidate_hashes: HashMap::new(), + candidates: Vec::new().into_iter().cycle(), + backed_candidates: Vec::new(), + config: config.clone(), + block_infos: Default::default(), + chunk_fetching_requests: Default::default(), + signed_bitfields: Default::default(), + candidate_receipts: Default::default(), + block_headers: Default::default(), + test_authorities: config.generate_authorities(), + }; + + // we use it for all candidates. + let persisted_validation_data = PersistedValidationData { + parent_head: HeadData(vec![7, 8, 9]), + relay_parent_number: Default::default(), + max_pov_size: 1024, + relay_parent_storage_root: Default::default(), + }; + + // For each unique pov we create a candidate receipt. + for (index, pov_size) in config.pov_sizes().iter().cloned().unique().enumerate() { + gum::info!(target: LOG_TARGET, index, pov_size, "{}", "Generating template candidate".bright_blue()); + + let mut candidate_receipt = dummy_candidate_receipt(dummy_hash()); + let pov = PoV { block_data: BlockData(vec![index as u8; pov_size]) }; + + let new_available_data = AvailableData { + validation_data: persisted_validation_data.clone(), + pov: Arc::new(pov), + }; + + let (new_chunks, erasure_root) = derive_erasure_chunks_with_proofs_and_root( + config.n_validators, + &new_available_data, + |_, _| {}, + ); + + candidate_receipt.descriptor.erasure_root = erasure_root; + + test_state.chunks.push(new_chunks); + test_state.available_data.push(new_available_data); + test_state.pov_size_to_candidate.insert(pov_size, index); + test_state.candidate_receipt_templates.push(candidate_receipt); + } + + test_state.block_infos = (1..=config.num_blocks) + .map(|block_num| { + let relay_block_hash = Hash::repeat_byte(block_num as u8); + new_block_import_info(relay_block_hash, block_num as BlockNumber) + }) + .collect(); + + test_state.block_headers = test_state + .block_infos + .iter() + .map(|info| { + ( + info.hash, + Header { + digest: Default::default(), + number: info.number, + parent_hash: info.parent_hash, + extrinsics_root: Default::default(), + state_root: Default::default(), + }, + ) + }) + .collect::>(); + + // Generate all candidates + let candidates_count = config.n_cores * config.num_blocks; + gum::info!(target: LOG_TARGET,"{}", format!("Pre-generating {} candidates.", candidates_count).bright_blue()); + test_state.candidates = (0..candidates_count) + .map(|index| { + let pov_size = test_state.pov_sizes.next().expect("This is a cycle; qed"); + let candidate_index = *test_state + .pov_size_to_candidate + .get(&pov_size) + .expect("pov_size always exists; qed"); + let mut candidate_receipt = + test_state.candidate_receipt_templates[candidate_index].clone(); + + // Make it unique. + candidate_receipt.descriptor.relay_parent = Hash::from_low_u64_be(index as u64); + // Store the new candidate in the state + test_state.candidate_hashes.insert(candidate_receipt.hash(), candidate_index); + + gum::debug!(target: LOG_TARGET, candidate_hash = ?candidate_receipt.hash(), "new candidate"); + + candidate_receipt + }) + .collect::>() + .into_iter() + .cycle(); + + // Prepare per block candidates. + // Genesis block is always finalized, so we start at 1. + for info in test_state.block_infos.iter() { + for _ in 0..config.n_cores { + let receipt = test_state.candidates.next().expect("Cycle iterator"); + test_state.candidate_receipts.entry(info.hash).or_default().push(receipt); + } + + // First candidate is our backed candidate. + test_state.backed_candidates.push( + test_state + .candidate_receipts + .get(&info.hash) + .expect("just inserted above") + .first() + .expect("just inserted above") + .clone(), + ); + } + + test_state.chunk_fetching_requests = test_state + .backed_candidates + .iter() + .map(|candidate| { + (0..config.n_validators) + .map(|index| { + ChunkFetchingRequest { + candidate_hash: candidate.hash(), + index: ValidatorIndex(index as u32), + } + .encode() + }) + .collect::>() + }) + .collect::>(); + + test_state.signed_bitfields = test_state + .block_infos + .iter() + .map(|block_info| { + let signing_context = + SigningContext { session_index: 0, parent_hash: block_info.hash }; + let messages = (0..config.n_validators) + .map(|index| { + let validator_public = test_state + .test_authorities + .validator_public + .get(index) + .expect("All validator keys are known"); + + // Node has all the chunks in the world. + let payload: AvailabilityBitfield = + AvailabilityBitfield(bitvec![u8, bitvec::order::Lsb0; 1u8; 32]); + let signed_bitfield = Signed::::sign( + &test_state.test_authorities.keyring.keystore(), + payload, + &signing_context, + ValidatorIndex(index as u32), + validator_public, + ) + .ok() + .flatten() + .expect("should be signed"); + + peer_bitfield_message_v2(block_info.hash, signed_bitfield) + }) + .collect::>(); + + (block_info.hash, messages) + }) + .collect(); + + gum::info!(target: LOG_TARGET, "{}","Created test environment.".bright_blue()); + + test_state + } +} + +fn peer_bitfield_message_v2( + relay_hash: H256, + signed_bitfield: Signed, +) -> VersionedValidationProtocol { + let bitfield = polkadot_node_network_protocol::v2::BitfieldDistributionMessage::Bitfield( + relay_hash, + signed_bitfield.into(), + ); + + Versioned::V2(polkadot_node_network_protocol::v2::ValidationProtocol::BitfieldDistribution( + bitfield, + )) +} diff --git a/polkadot/node/subsystem-bench/src/lib/environment.rs b/polkadot/node/subsystem-bench/src/lib/environment.rs index 958ed50d0894..2d80d75a14aa 100644 --- a/polkadot/node/subsystem-bench/src/lib/environment.rs +++ b/polkadot/node/subsystem-bench/src/lib/environment.rs @@ -118,6 +118,7 @@ fn new_runtime() -> tokio::runtime::Runtime { .thread_name("subsystem-bench") .enable_all() .thread_stack_size(3 * 1024 * 1024) + .worker_threads(4) .build() .unwrap() } diff --git a/polkadot/node/subsystem-bench/src/lib/lib.rs b/polkadot/node/subsystem-bench/src/lib/lib.rs index ef2724abc989..d06f2822a895 100644 --- a/polkadot/node/subsystem-bench/src/lib/lib.rs +++ b/polkadot/node/subsystem-bench/src/lib/lib.rs @@ -26,4 +26,3 @@ pub(crate) mod keyring; pub(crate) mod mock; pub(crate) mod network; pub mod usage; -pub mod utils; diff --git a/polkadot/node/subsystem-bench/src/lib/mock/av_store.rs b/polkadot/node/subsystem-bench/src/lib/mock/av_store.rs index 9064f17940f0..080644da92a0 100644 --- a/polkadot/node/subsystem-bench/src/lib/mock/av_store.rs +++ b/polkadot/node/subsystem-bench/src/lib/mock/av_store.rs @@ -41,6 +41,7 @@ const LOG_TARGET: &str = "subsystem-bench::av-store-mock"; /// Mockup helper. Contains Ccunks and full availability data of all parachain blocks /// used in a test. +#[derive(Clone)] pub struct NetworkAvailabilityState { pub candidate_hashes: HashMap, pub available_data: Vec, diff --git a/polkadot/node/subsystem-bench/src/lib/mock/runtime_api.rs b/polkadot/node/subsystem-bench/src/lib/mock/runtime_api.rs index 53faf562f03c..3c39de870a28 100644 --- a/polkadot/node/subsystem-bench/src/lib/mock/runtime_api.rs +++ b/polkadot/node/subsystem-bench/src/lib/mock/runtime_api.rs @@ -36,6 +36,7 @@ use std::collections::HashMap; const LOG_TARGET: &str = "subsystem-bench::runtime-api-mock"; /// Minimal state to answer requests. +#[derive(Clone)] pub struct RuntimeApiState { // All authorities in the test, authorities: TestAuthorities, @@ -49,6 +50,7 @@ pub struct RuntimeApiState { } /// A mocked `runtime-api` subsystem. +#[derive(Clone)] pub struct MockRuntimeApi { state: RuntimeApiState, config: TestConfiguration, diff --git a/polkadot/node/subsystem-bench/src/lib/usage.rs b/polkadot/node/subsystem-bench/src/lib/usage.rs index ef60d67372ae..7172969a8f92 100644 --- a/polkadot/node/subsystem-bench/src/lib/usage.rs +++ b/polkadot/node/subsystem-bench/src/lib/usage.rs @@ -17,6 +17,7 @@ //! Test usage implementation use colored::Colorize; +use itertools::Itertools; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -37,10 +38,16 @@ impl std::fmt::Display for BenchmarkUsage { self.network_usage .iter() .map(|v| v.to_string()) + .sorted() .collect::>() .join("\n"), format!("{:<32}{:>12}{:>12}", "CPU usage, seconds", "total", "per block").blue(), - self.cpu_usage.iter().map(|v| v.to_string()).collect::>().join("\n") + self.cpu_usage + .iter() + .map(|v| v.to_string()) + .sorted() + .collect::>() + .join("\n") ) } } @@ -101,8 +108,8 @@ fn check_resource_usage( None } else { Some(format!( - "The resource `{}` is expected to be equal to {} with a precision {}, but the current value is {}", - resource_name, base, precision, usage.per_block + "The resource `{}` is expected to be equal to {} with a precision {}, but the current value is {} ({})", + resource_name, base, precision, usage.per_block, diff )) } } else { @@ -119,7 +126,7 @@ pub struct ResourceUsage { impl std::fmt::Display for ResourceUsage { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{:<32}{:>12.3}{:>12.3}", self.resource_name.cyan(), self.total, self.per_block) + write!(f, "{:<32}{:>12.4}{:>12.4}", self.resource_name.cyan(), self.total, self.per_block) } } diff --git a/polkadot/node/subsystem-bench/src/lib/utils.rs b/polkadot/node/subsystem-bench/src/lib/utils.rs deleted file mode 100644 index 75b72cc11b98..000000000000 --- a/polkadot/node/subsystem-bench/src/lib/utils.rs +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright (C) Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! Test utils - -use crate::usage::BenchmarkUsage; -use std::io::{stdout, Write}; - -pub struct WarmUpOptions<'a> { - /// The maximum number of runs considered for marming up. - pub warm_up: usize, - /// The number of runs considered for benchmarking. - pub bench: usize, - /// The difference in CPU usage between runs considered as normal - pub precision: f64, - /// The subsystems whose CPU usage is checked during warm-up cycles - pub subsystems: &'a [&'a str], -} - -impl<'a> WarmUpOptions<'a> { - pub fn new(subsystems: &'a [&'a str]) -> Self { - Self { warm_up: 100, bench: 3, precision: 0.02, subsystems } - } -} - -pub fn warm_up_and_benchmark( - options: WarmUpOptions, - run: impl Fn() -> BenchmarkUsage, -) -> Result { - println!("Warming up..."); - let mut usages = Vec::with_capacity(options.bench); - - for n in 1..=options.warm_up { - let curr = run(); - if let Some(prev) = usages.last() { - let diffs = options - .subsystems - .iter() - .map(|&v| { - curr.cpu_usage_diff(prev, v) - .ok_or(format!("{} not found in benchmark {:?}", v, prev)) - }) - .collect::, String>>()?; - if !diffs.iter().all(|&v| v < options.precision) { - usages.clear(); - } - } - usages.push(curr); - print!("\r{}%", n * 100 / options.warm_up); - if usages.len() == options.bench { - println!("\rTook {} runs to warm up", n.saturating_sub(options.bench)); - break; - } - stdout().flush().unwrap(); - } - - if usages.len() != options.bench { - println!("Didn't warm up after {} runs", options.warm_up); - return Err("Can't warm up".to_string()) - } - - Ok(BenchmarkUsage::average(&usages)) -}