diff --git a/Cargo.lock b/Cargo.lock index 54a01f12f35c..4db38311fe66 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13589,8 +13589,10 @@ dependencies = [ "nix 0.28.0", "parity-scale-codec", "polkadot-node-core-pvf-common", + "polkadot-node-primitives", "polkadot-parachain-primitives", "polkadot-primitives", + "sp-maybe-compressed-blob", "tracing-gum", ] @@ -13605,6 +13607,7 @@ dependencies = [ "nix 0.28.0", "parity-scale-codec", "polkadot-node-core-pvf-common", + "polkadot-node-primitives", "polkadot-primitives", "rayon", "rococo-runtime", diff --git a/polkadot/node/core/candidate-validation/Cargo.toml b/polkadot/node/core/candidate-validation/Cargo.toml index 13ab3e3fba50..fcacc38cae65 100644 --- a/polkadot/node/core/candidate-validation/Cargo.toml +++ b/polkadot/node/core/candidate-validation/Cargo.toml @@ -17,7 +17,6 @@ gum = { workspace = true, default-features = true } sp-keystore = { workspace = true } sp-application-crypto = { workspace = true } -sp-maybe-compressed-blob = { workspace = true, default-features = true } codec = { features = ["bit-vec", "derive"], workspace = true } polkadot-primitives = { workspace = true, default-features = true } @@ -36,5 +35,6 @@ sp-keyring = { workspace = true, default-features = true } futures = { features = ["thread-pool"], workspace = true } assert_matches = { workspace = true } polkadot-node-subsystem-test-helpers = { workspace = true } +sp-maybe-compressed-blob = { workspace = true, default-features = true } sp-core = { workspace = true, default-features = true } polkadot-primitives-test-helpers = { workspace = true } diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index 1985964ebc51..103d29e8d269 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -27,9 +27,7 @@ use polkadot_node_core_pvf::{ InternalValidationError, InvalidCandidate as WasmInvalidCandidate, PossiblyInvalidError, PrepareError, PrepareJobKind, PvfPrepData, ValidationError, ValidationHost, }; -use polkadot_node_primitives::{ - BlockData, InvalidCandidate, PoV, ValidationResult, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT, -}; +use polkadot_node_primitives::{InvalidCandidate, PoV, ValidationResult}; use polkadot_node_subsystem::{ errors::RuntimeApiError, messages::{ @@ -41,9 +39,7 @@ use polkadot_node_subsystem::{ }; use polkadot_node_subsystem_util as util; use polkadot_overseer::ActiveLeavesUpdate; -use polkadot_parachain_primitives::primitives::{ - ValidationParams, ValidationResult as WasmValidationResult, -}; +use polkadot_parachain_primitives::primitives::ValidationResult as WasmValidationResult; use polkadot_primitives::{ executor_params::{ DEFAULT_APPROVAL_EXECUTION_TIMEOUT, DEFAULT_BACKING_EXECUTION_TIMEOUT, @@ -504,21 +500,12 @@ where continue; }; - let pvf = match sp_maybe_compressed_blob::decompress( - &validation_code.0, - VALIDATION_CODE_BOMB_LIMIT, - ) { - Ok(code) => PvfPrepData::from_code( - code.into_owned(), - executor_params.clone(), - timeout, - PrepareJobKind::Prechecking, - ), - Err(e) => { - gum::debug!(target: LOG_TARGET, err=?e, "cannot decompress validation code"); - continue - }, - }; + let pvf = PvfPrepData::from_code( + validation_code.0, + executor_params.clone(), + timeout, + PrepareJobKind::Prechecking, + ); active_pvfs.push(pvf); processed_code_hashes.push(code_hash); @@ -651,21 +638,12 @@ where let timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Precheck); - let pvf = match sp_maybe_compressed_blob::decompress( - &validation_code.0, - VALIDATION_CODE_BOMB_LIMIT, - ) { - Ok(code) => PvfPrepData::from_code( - code.into_owned(), - executor_params, - timeout, - PrepareJobKind::Prechecking, - ), - Err(e) => { - gum::debug!(target: LOG_TARGET, err=?e, "precheck: cannot decompress validation code"); - return PreCheckOutcome::Invalid - }, - }; + let pvf = PvfPrepData::from_code( + validation_code.0, + executor_params, + timeout, + PrepareJobKind::Prechecking, + ); match validation_backend.precheck_pvf(pvf).await { Ok(_) => PreCheckOutcome::Valid, @@ -873,41 +851,7 @@ async fn validate_candidate_exhaustive( return Ok(ValidationResult::Invalid(e)) } - let raw_validation_code = match sp_maybe_compressed_blob::decompress( - &validation_code.0, - VALIDATION_CODE_BOMB_LIMIT, - ) { - Ok(code) => code, - Err(e) => { - gum::info!(target: LOG_TARGET, ?para_id, err=?e, "Invalid candidate (validation code)"); - - // Code already passed pre-checking, if decompression fails now this most likely means - // some local corruption happened. - return Err(ValidationFailed("Code decompression failed".to_string())) - }, - }; - metrics.observe_code_size(raw_validation_code.len()); - - metrics.observe_pov_size(pov.block_data.0.len(), true); - let raw_block_data = - match sp_maybe_compressed_blob::decompress(&pov.block_data.0, POV_BOMB_LIMIT) { - Ok(block_data) => BlockData(block_data.to_vec()), - Err(e) => { - gum::info!(target: LOG_TARGET, ?para_id, err=?e, "Invalid candidate (PoV code)"); - - // If the PoV is invalid, the candidate certainly is. - return Ok(ValidationResult::Invalid(InvalidCandidate::PoVDecompressionFailure)) - }, - }; - metrics.observe_pov_size(raw_block_data.0.len(), false); - - let params = ValidationParams { - parent_head: persisted_validation_data.parent_head.clone(), - block_data: raw_block_data, - relay_parent_number: persisted_validation_data.relay_parent_number, - relay_parent_storage_root: persisted_validation_data.relay_parent_storage_root, - }; - + let persisted_validation_data = Arc::new(persisted_validation_data); let result = match exec_kind { // Retry is disabled to reduce the chance of nondeterministic blocks getting backed and // honest backers getting slashed. @@ -915,7 +859,7 @@ async fn validate_candidate_exhaustive( let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare); let exec_timeout = pvf_exec_timeout(&executor_params, exec_kind); let pvf = PvfPrepData::from_code( - raw_validation_code.to_vec(), + validation_code.0, executor_params, prep_timeout, PrepareJobKind::Compilation, @@ -925,7 +869,8 @@ async fn validate_candidate_exhaustive( .validate_candidate( pvf, exec_timeout, - params.encode(), + persisted_validation_data.clone(), + pov, polkadot_node_core_pvf::Priority::Normal, ) .await @@ -933,9 +878,10 @@ async fn validate_candidate_exhaustive( PvfExecKind::Approval => validation_backend .validate_candidate_with_retry( - raw_validation_code.to_vec(), + validation_code.0, pvf_exec_timeout(&executor_params, exec_kind), - params, + persisted_validation_data.clone(), + pov, executor_params, PVF_APPROVAL_EXECUTION_RETRY_DELAY, polkadot_node_core_pvf::Priority::Critical, @@ -961,6 +907,8 @@ async fn validate_candidate_exhaustive( Ok(ValidationResult::Invalid(InvalidCandidate::Timeout)), Err(ValidationError::Invalid(WasmInvalidCandidate::WorkerReportedInvalid(e))) => Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(e))), + Err(ValidationError::Invalid(WasmInvalidCandidate::PoVDecompressionFailure)) => + Ok(ValidationResult::Invalid(InvalidCandidate::PoVDecompressionFailure)), Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)) => Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError( "ambiguous worker death".to_string(), @@ -1007,7 +955,7 @@ async fn validate_candidate_exhaustive( // invalid. Ok(ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch)) } else { - Ok(ValidationResult::Valid(outputs, persisted_validation_data)) + Ok(ValidationResult::Valid(outputs, (*persisted_validation_data).clone())) } }, } @@ -1020,7 +968,8 @@ trait ValidationBackend { &mut self, pvf: PvfPrepData, exec_timeout: Duration, - encoded_params: Vec, + pvd: Arc, + pov: Arc, // The priority for the preparation job. prepare_priority: polkadot_node_core_pvf::Priority, ) -> Result; @@ -1035,9 +984,10 @@ trait ValidationBackend { /// preparation. async fn validate_candidate_with_retry( &mut self, - raw_validation_code: Vec, + code: Vec, exec_timeout: Duration, - params: ValidationParams, + pvd: Arc, + pov: Arc, executor_params: ExecutorParams, retry_delay: Duration, // The priority for the preparation job. @@ -1046,7 +996,7 @@ trait ValidationBackend { let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare); // Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap. let pvf = PvfPrepData::from_code( - raw_validation_code, + code, executor_params, prep_timeout, PrepareJobKind::Compilation, @@ -1057,7 +1007,13 @@ trait ValidationBackend { // Use `Priority::Critical` as finality trumps parachain liveliness. let mut validation_result = self - .validate_candidate(pvf.clone(), exec_timeout, params.encode(), prepare_priority) + .validate_candidate( + pvf.clone(), + exec_timeout, + pvd.clone(), + pov.clone(), + prepare_priority, + ) .await; if validation_result.is_ok() { return validation_result @@ -1130,10 +1086,14 @@ trait ValidationBackend { validation_result ); - // Encode the params again when re-trying. We expect the retry case to be relatively - // rare, and we want to avoid unconditionally cloning data. validation_result = self - .validate_candidate(pvf.clone(), new_timeout, params.encode(), prepare_priority) + .validate_candidate( + pvf.clone(), + new_timeout, + pvd.clone(), + pov.clone(), + prepare_priority, + ) .await; } } @@ -1153,13 +1113,13 @@ impl ValidationBackend for ValidationHost { &mut self, pvf: PvfPrepData, exec_timeout: Duration, - encoded_params: Vec, + pvd: Arc, + pov: Arc, // The priority for the preparation job. prepare_priority: polkadot_node_core_pvf::Priority, ) -> Result { let (tx, rx) = oneshot::channel(); - if let Err(err) = - self.execute_pvf(pvf, exec_timeout, encoded_params, prepare_priority, tx).await + if let Err(err) = self.execute_pvf(pvf, exec_timeout, pvd, pov, prepare_priority, tx).await { return Err(InternalValidationError::HostCommunication(format!( "cannot send pvf to the validation host, it might have shut down: {:?}", diff --git a/polkadot/node/core/candidate-validation/src/metrics.rs b/polkadot/node/core/candidate-validation/src/metrics.rs index 28fc957ddb1a..1459907aa599 100644 --- a/polkadot/node/core/candidate-validation/src/metrics.rs +++ b/polkadot/node/core/candidate-validation/src/metrics.rs @@ -23,8 +23,6 @@ pub(crate) struct MetricsInner { pub(crate) validate_from_chain_state: prometheus::Histogram, pub(crate) validate_from_exhaustive: prometheus::Histogram, pub(crate) validate_candidate_exhaustive: prometheus::Histogram, - pub(crate) pov_size: prometheus::HistogramVec, - pub(crate) code_size: prometheus::Histogram, } /// Candidate validation metrics. @@ -70,21 +68,6 @@ impl Metrics { .as_ref() .map(|metrics| metrics.validate_candidate_exhaustive.start_timer()) } - - pub fn observe_code_size(&self, code_size: usize) { - if let Some(metrics) = &self.0 { - metrics.code_size.observe(code_size as f64); - } - } - - pub fn observe_pov_size(&self, pov_size: usize, compressed: bool) { - if let Some(metrics) = &self.0 { - metrics - .pov_size - .with_label_values(&[if compressed { "true" } else { "false" }]) - .observe(pov_size as f64); - } - } } impl metrics::Metrics for Metrics { @@ -121,33 +104,6 @@ impl metrics::Metrics for Metrics { ))?, registry, )?, - pov_size: prometheus::register( - prometheus::HistogramVec::new( - prometheus::HistogramOpts::new( - "polkadot_parachain_candidate_validation_pov_size", - "The compressed and decompressed size of the proof of validity of a candidate", - ) - .buckets( - prometheus::exponential_buckets(16384.0, 2.0, 10) - .expect("arguments are always valid; qed"), - ), - &["compressed"], - )?, - registry, - )?, - code_size: prometheus::register( - prometheus::Histogram::with_opts( - prometheus::HistogramOpts::new( - "polkadot_parachain_candidate_validation_code_size", - "The size of the decompressed WASM validation blob used for checking a candidate", - ) - .buckets( - prometheus::exponential_buckets(16384.0, 2.0, 10) - .expect("arguments are always valid; qed"), - ), - )?, - registry, - )?, }; Ok(Metrics(Some(metrics))) } diff --git a/polkadot/node/core/candidate-validation/src/tests.rs b/polkadot/node/core/candidate-validation/src/tests.rs index 86d855f78b45..55282fdf4ee1 100644 --- a/polkadot/node/core/candidate-validation/src/tests.rs +++ b/polkadot/node/core/candidate-validation/src/tests.rs @@ -20,6 +20,7 @@ use super::*; use assert_matches::assert_matches; use futures::executor; use polkadot_node_core_pvf::PrepareError; +use polkadot_node_primitives::{BlockData, VALIDATION_CODE_BOMB_LIMIT}; use polkadot_node_subsystem::messages::AllMessages; use polkadot_node_subsystem_util::reexports::SubsystemContext; use polkadot_overseer::ActivatedLeaf; @@ -385,7 +386,8 @@ impl ValidationBackend for MockValidateCandidateBackend { &mut self, _pvf: PvfPrepData, _timeout: Duration, - _encoded_params: Vec, + _pvd: Arc, + _pov: Arc, _prepare_priority: polkadot_node_core_pvf::Priority, ) -> Result { // This is expected to panic if called more times than expected, indicating an error in the @@ -950,115 +952,6 @@ fn compressed_code_works() { assert_matches!(v, Ok(ValidationResult::Valid(_, _))); } -#[test] -fn code_decompression_failure_is_error() { - let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() }; - let pov = PoV { block_data: BlockData(vec![1; 32]) }; - let head_data = HeadData(vec![1, 1, 1]); - - let raw_code = vec![2u8; VALIDATION_CODE_BOMB_LIMIT + 1]; - let validation_code = - sp_maybe_compressed_blob::compress(&raw_code, VALIDATION_CODE_BOMB_LIMIT + 1) - .map(ValidationCode) - .unwrap(); - - let descriptor = make_valid_candidate_descriptor( - ParaId::from(1_u32), - dummy_hash(), - validation_data.hash(), - pov.hash(), - validation_code.hash(), - head_data.hash(), - dummy_hash(), - Sr25519Keyring::Alice, - ); - - let validation_result = WasmValidationResult { - head_data, - new_validation_code: None, - upward_messages: Default::default(), - horizontal_messages: Default::default(), - processed_downward_messages: 0, - hrmp_watermark: 0, - }; - - let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; - - let pool = TaskExecutor::new(); - let (_ctx, _ctx_handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context::< - AllMessages, - _, - >(pool.clone()); - - let v = executor::block_on(validate_candidate_exhaustive( - MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), - validation_data, - validation_code, - candidate_receipt, - Arc::new(pov), - ExecutorParams::default(), - PvfExecKind::Backing, - &Default::default(), - )); - - assert_matches!(v, Err(_)); -} - -#[test] -fn pov_decompression_failure_is_invalid() { - let validation_data = - PersistedValidationData { max_pov_size: POV_BOMB_LIMIT as u32, ..Default::default() }; - let head_data = HeadData(vec![1, 1, 1]); - - let raw_block_data = vec![2u8; POV_BOMB_LIMIT + 1]; - let pov = sp_maybe_compressed_blob::compress(&raw_block_data, POV_BOMB_LIMIT + 1) - .map(|raw| PoV { block_data: BlockData(raw) }) - .unwrap(); - - let validation_code = ValidationCode(vec![2; 16]); - - let descriptor = make_valid_candidate_descriptor( - ParaId::from(1_u32), - dummy_hash(), - validation_data.hash(), - pov.hash(), - validation_code.hash(), - head_data.hash(), - dummy_hash(), - Sr25519Keyring::Alice, - ); - - let validation_result = WasmValidationResult { - head_data, - new_validation_code: None, - upward_messages: Default::default(), - horizontal_messages: Default::default(), - processed_downward_messages: 0, - hrmp_watermark: 0, - }; - - let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; - - let pool = TaskExecutor::new(); - let (_ctx, _ctx_handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context::< - AllMessages, - _, - >(pool.clone()); - - let v = executor::block_on(validate_candidate_exhaustive( - MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), - validation_data, - validation_code, - candidate_receipt, - Arc::new(pov), - ExecutorParams::default(), - PvfExecKind::Backing, - &Default::default(), - )); - - assert_matches!(v, Ok(ValidationResult::Invalid(InvalidCandidate::PoVDecompressionFailure))); -} - struct MockPreCheckBackend { result: Result<(), PrepareError>, } @@ -1075,7 +968,8 @@ impl ValidationBackend for MockPreCheckBackend { &mut self, _pvf: PvfPrepData, _timeout: Duration, - _encoded_params: Vec, + _pvd: Arc, + _pov: Arc, _prepare_priority: polkadot_node_core_pvf::Priority, ) -> Result { unreachable!() @@ -1149,70 +1043,6 @@ fn precheck_works() { executor::block_on(test_fut); } -#[test] -fn precheck_invalid_pvf_blob_compression() { - let relay_parent = [3; 32].into(); - - let raw_code = vec![2u8; VALIDATION_CODE_BOMB_LIMIT + 1]; - let validation_code = - sp_maybe_compressed_blob::compress(&raw_code, VALIDATION_CODE_BOMB_LIMIT + 1) - .map(ValidationCode) - .unwrap(); - let validation_code_hash = validation_code.hash(); - - let pool = TaskExecutor::new(); - let (mut ctx, mut ctx_handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context::< - AllMessages, - _, - >(pool.clone()); - - let (check_fut, check_result) = precheck_pvf( - ctx.sender(), - MockPreCheckBackend::with_hardcoded_result(Ok(())), - relay_parent, - validation_code_hash, - ) - .remote_handle(); - - let test_fut = async move { - assert_matches!( - ctx_handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - rp, - RuntimeApiRequest::ValidationCodeByHash( - vch, - tx - ), - )) => { - assert_eq!(vch, validation_code_hash); - assert_eq!(rp, relay_parent); - - let _ = tx.send(Ok(Some(validation_code.clone()))); - } - ); - assert_matches!( - ctx_handle.recv().await, - AllMessages::RuntimeApi( - RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx)) - ) => { - tx.send(Ok(1u32.into())).unwrap(); - } - ); - assert_matches!( - ctx_handle.recv().await, - AllMessages::RuntimeApi( - RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(_, tx)) - ) => { - tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); - } - ); - assert_matches!(check_result.await, PreCheckOutcome::Invalid); - }; - - let test_fut = future::join(test_fut, check_fut); - executor::block_on(test_fut); -} - #[test] fn precheck_properly_classifies_outcomes() { let inner = |prepare_result, precheck_outcome| { @@ -1292,7 +1122,8 @@ impl ValidationBackend for MockHeadsUp { &mut self, _pvf: PvfPrepData, _timeout: Duration, - _encoded_params: Vec, + _pvd: Arc, + _pov: Arc, _prepare_priority: polkadot_node_core_pvf::Priority, ) -> Result { unreachable!() diff --git a/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs b/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs index 97a03e6596d1..342128b7cca2 100644 --- a/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs +++ b/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs @@ -116,7 +116,7 @@ fn host_prepare_rococo_runtime(c: &mut Criterion) { cfg.prepare_workers_hard_max_num = 1; }) .await, - pvf.clone().code(), + pvf.clone().maybe_compressed_code(), ) }, |result| async move { diff --git a/polkadot/node/core/pvf/common/src/error.rs b/polkadot/node/core/pvf/common/src/error.rs index 7ee05448d3c5..b0cdba9501db 100644 --- a/polkadot/node/core/pvf/common/src/error.rs +++ b/polkadot/node/core/pvf/common/src/error.rs @@ -94,6 +94,10 @@ pub enum PrepareError { #[codec(index = 11)] #[error("prepare: error interfacing with the kernel: {0}")] Kernel(String), + /// Code blob failed to decompress + #[codec(index = 12)] + #[error("prepare: could not decompress code blob: {0}")] + CouldNotDecompressCodeBlob(String), } impl PrepareError { @@ -106,7 +110,11 @@ impl PrepareError { pub fn is_deterministic(&self) -> bool { use PrepareError::*; match self { - Prevalidation(_) | Preparation(_) | JobError(_) | OutOfMemory => true, + Prevalidation(_) | + Preparation(_) | + JobError(_) | + OutOfMemory | + CouldNotDecompressCodeBlob(_) => true, IoErr(_) | JobDied { .. } | CreateTmpFile(_) | diff --git a/polkadot/node/core/pvf/common/src/execute.rs b/polkadot/node/core/pvf/common/src/execute.rs index 46862f9f80b6..cff3f3b86e95 100644 --- a/polkadot/node/core/pvf/common/src/execute.rs +++ b/polkadot/node/core/pvf/common/src/execute.rs @@ -35,6 +35,8 @@ pub struct WorkerResponse { pub job_response: JobResponse, /// The amount of CPU time taken by the job. pub duration: Duration, + /// The uncompressed PoV size. + pub pov_size: u32, } /// An error occurred in the worker process. @@ -77,6 +79,8 @@ pub enum JobResponse { RuntimeConstruction(String), /// The candidate is invalid. InvalidCandidate(String), + /// PoV decompression failed + PoVDecompressionFailure, } impl JobResponse { diff --git a/polkadot/node/core/pvf/common/src/prepare.rs b/polkadot/node/core/pvf/common/src/prepare.rs index 81e165a7b8a4..4cd1beb30991 100644 --- a/polkadot/node/core/pvf/common/src/prepare.rs +++ b/polkadot/node/core/pvf/common/src/prepare.rs @@ -44,6 +44,8 @@ pub struct PrepareStats { pub cpu_time_elapsed: std::time::Duration, /// The observed memory statistics for the preparation job. pub memory_stats: MemoryStats, + /// The decompressed Wasm code length observed during the preparation. + pub observed_wasm_code_len: u32, } /// Helper struct to contain all the memory stats, including `MemoryAllocationStats` and, if diff --git a/polkadot/node/core/pvf/common/src/pvf.rs b/polkadot/node/core/pvf/common/src/pvf.rs index e2ac36a2406a..4019a8d8b0d0 100644 --- a/polkadot/node/core/pvf/common/src/pvf.rs +++ b/polkadot/node/core/pvf/common/src/pvf.rs @@ -26,9 +26,9 @@ use std::{fmt, sync::Arc, time::Duration}; /// Should be cheap to clone. #[derive(Clone, Encode, Decode)] pub struct PvfPrepData { - /// Wasm code (uncompressed) - code: Arc>, - /// Wasm code hash + /// Wasm code (maybe compressed) + maybe_compressed_code: Arc>, + /// Wasm code hash. code_hash: ValidationCodeHash, /// Executor environment parameters for the session for which artifact is prepared executor_params: Arc, @@ -46,20 +46,20 @@ impl PvfPrepData { prep_timeout: Duration, prep_kind: PrepareJobKind, ) -> Self { - let code = Arc::new(code); - let code_hash = sp_crypto_hashing::blake2_256(&code).into(); + let maybe_compressed_code = Arc::new(code); + let code_hash = sp_crypto_hashing::blake2_256(&maybe_compressed_code).into(); let executor_params = Arc::new(executor_params); - Self { code, code_hash, executor_params, prep_timeout, prep_kind } + Self { maybe_compressed_code, code_hash, executor_params, prep_timeout, prep_kind } } - /// Returns validation code hash for the PVF + /// Returns validation code hash pub fn code_hash(&self) -> ValidationCodeHash { self.code_hash } - /// Returns PVF code - pub fn code(&self) -> Arc> { - self.code.clone() + /// Returns PVF code blob + pub fn maybe_compressed_code(&self) -> Arc> { + self.maybe_compressed_code.clone() } /// Returns executor params diff --git a/polkadot/node/core/pvf/execute-worker/Cargo.toml b/polkadot/node/core/pvf/execute-worker/Cargo.toml index f24b66dc4a0e..6ad340d25612 100644 --- a/polkadot/node/core/pvf/execute-worker/Cargo.toml +++ b/polkadot/node/core/pvf/execute-worker/Cargo.toml @@ -19,8 +19,11 @@ libc = { workspace = true } codec = { features = ["derive"], workspace = true } polkadot-node-core-pvf-common = { workspace = true, default-features = true } +polkadot-node-primitives = { workspace = true, default-features = true } polkadot-parachain-primitives = { workspace = true, default-features = true } polkadot-primitives = { workspace = true, default-features = true } +sp-maybe-compressed-blob = { workspace = true, default-features = true } + [features] builder = [] diff --git a/polkadot/node/core/pvf/execute-worker/src/lib.rs b/polkadot/node/core/pvf/execute-worker/src/lib.rs index 35858ab36cec..4b7c167cc9ec 100644 --- a/polkadot/node/core/pvf/execute-worker/src/lib.rs +++ b/polkadot/node/core/pvf/execute-worker/src/lib.rs @@ -22,6 +22,7 @@ pub use polkadot_node_core_pvf_common::{ error::ExecuteError, executor_interface::execute_artifact, }; +use polkadot_parachain_primitives::primitives::ValidationParams; // NOTE: Initializing logging in e.g. tests will not have an effect in the workers, as they are // separate spawned processes. Run with e.g. `RUST_LOG=parachain::pvf-execute-worker=trace`. @@ -50,8 +51,9 @@ use polkadot_node_core_pvf_common::{ }, worker_dir, }; +use polkadot_node_primitives::{BlockData, PoV, POV_BOMB_LIMIT}; use polkadot_parachain_primitives::primitives::ValidationResult; -use polkadot_primitives::ExecutorParams; +use polkadot_primitives::{ExecutorParams, PersistedValidationData}; use std::{ io::{self, Read}, os::{ @@ -85,8 +87,23 @@ fn recv_execute_handshake(stream: &mut UnixStream) -> io::Result { Ok(handshake) } -fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec, Duration)> { - let params = framed_recv_blocking(stream)?; +fn recv_request(stream: &mut UnixStream) -> io::Result<(PersistedValidationData, PoV, Duration)> { + let pvd = framed_recv_blocking(stream)?; + let pvd = PersistedValidationData::decode(&mut &pvd[..]).map_err(|_| { + io::Error::new( + io::ErrorKind::Other, + "execute pvf recv_request: failed to decode persisted validation data".to_string(), + ) + })?; + + let pov = framed_recv_blocking(stream)?; + let pov = PoV::decode(&mut &pov[..]).map_err(|_| { + io::Error::new( + io::ErrorKind::Other, + "execute pvf recv_request: failed to decode PoV".to_string(), + ) + })?; + let execution_timeout = framed_recv_blocking(stream)?; let execution_timeout = Duration::decode(&mut &execution_timeout[..]).map_err(|_| { io::Error::new( @@ -94,7 +111,7 @@ fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec, Duration)> { "execute pvf recv_request: failed to decode duration".to_string(), ) })?; - Ok((params, execution_timeout)) + Ok((pvd, pov, execution_timeout)) } /// Sends an error to the host and returns the original error wrapped in `io::Error`. @@ -149,7 +166,7 @@ pub fn worker_entrypoint( let execute_thread_stack_size = max_stack_size(&executor_params); loop { - let (params, execution_timeout) = recv_request(&mut stream).map_err(|e| { + let (pvd, pov, execution_timeout) = recv_request(&mut stream).map_err(|e| { map_and_send_err!( e, InternalValidationError::HostCommunication, @@ -197,7 +214,33 @@ pub fn worker_entrypoint( let stream_fd = stream.as_raw_fd(); let compiled_artifact_blob = Arc::new(compiled_artifact_blob); - let params = Arc::new(params); + + let raw_block_data = + match sp_maybe_compressed_blob::decompress(&pov.block_data.0, POV_BOMB_LIMIT) { + Ok(data) => data, + Err(_) => { + send_result::( + &mut stream, + Ok(WorkerResponse { + job_response: JobResponse::PoVDecompressionFailure, + duration: Duration::ZERO, + pov_size: 0, + }), + worker_info, + )?; + continue; + }, + }; + + let pov_size = raw_block_data.len() as u32; + + let params = ValidationParams { + parent_head: pvd.parent_head.clone(), + block_data: BlockData(raw_block_data.to_vec()), + relay_parent_number: pvd.relay_parent_number, + relay_parent_storage_root: pvd.relay_parent_storage_root, + }; + let params = Arc::new(params.encode()); cfg_if::cfg_if! { if #[cfg(target_os = "linux")] { @@ -214,6 +257,7 @@ pub fn worker_entrypoint( worker_info, security_status.can_unshare_user_namespace_and_change_root, usage_before, + pov_size, )? } else { // Fall back to using fork. @@ -228,6 +272,7 @@ pub fn worker_entrypoint( execute_thread_stack_size, worker_info, usage_before, + pov_size, )? }; } else { @@ -242,6 +287,7 @@ pub fn worker_entrypoint( execute_thread_stack_size, worker_info, usage_before, + pov_size, )?; } } @@ -300,6 +346,7 @@ fn handle_clone( worker_info: &WorkerInfo, have_unshare_newuser: bool, usage_before: Usage, + pov_size: u32, ) -> io::Result> { use polkadot_node_core_pvf_common::worker::security; @@ -329,6 +376,7 @@ fn handle_clone( worker_info, child, usage_before, + pov_size, execution_timeout, ), Err(security::clone::Error::Clone(errno)) => @@ -347,6 +395,7 @@ fn handle_fork( execute_worker_stack_size: usize, worker_info: &WorkerInfo, usage_before: Usage, + pov_size: u32, ) -> io::Result> { // SAFETY: new process is spawned within a single threaded process. This invariant // is enforced by tests. @@ -367,6 +416,7 @@ fn handle_fork( worker_info, child, usage_before, + pov_size, execution_timeout, ), Err(errno) => Ok(Err(internal_error_from_errno("fork", errno))), @@ -513,6 +563,7 @@ fn handle_parent_process( worker_info: &WorkerInfo, job_pid: Pid, usage_before: Usage, + pov_size: u32, timeout: Duration, ) -> io::Result> { // the read end will wait until all write ends have been closed, @@ -578,7 +629,7 @@ fn handle_parent_process( )))); } - Ok(Ok(WorkerResponse { job_response, duration: cpu_tv })) + Ok(Ok(WorkerResponse { job_response, pov_size, duration: cpu_tv })) }, Err(job_error) => { gum::warn!( diff --git a/polkadot/node/core/pvf/prepare-worker/Cargo.toml b/polkadot/node/core/pvf/prepare-worker/Cargo.toml index 9e0d01fc438b..56235bd82192 100644 --- a/polkadot/node/core/pvf/prepare-worker/Cargo.toml +++ b/polkadot/node/core/pvf/prepare-worker/Cargo.toml @@ -23,10 +23,12 @@ nix = { features = ["process", "resource", "sched"], workspace = true } codec = { features = ["derive"], workspace = true } polkadot-node-core-pvf-common = { workspace = true, default-features = true } +polkadot-node-primitives = { workspace = true, default-features = true } polkadot-primitives = { workspace = true, default-features = true } sc-executor-common = { workspace = true, default-features = true } sc-executor-wasmtime = { workspace = true, default-features = true } +sp-maybe-compressed-blob = { workspace = true, default-features = true } [target.'cfg(target_os = "linux")'.dependencies] tikv-jemallocator = "0.5.0" diff --git a/polkadot/node/core/pvf/prepare-worker/benches/prepare_rococo_runtime.rs b/polkadot/node/core/pvf/prepare-worker/benches/prepare_rococo_runtime.rs index d531c90b64b5..49b30dc33ceb 100644 --- a/polkadot/node/core/pvf/prepare-worker/benches/prepare_rococo_runtime.rs +++ b/polkadot/node/core/pvf/prepare-worker/benches/prepare_rococo_runtime.rs @@ -24,7 +24,11 @@ use polkadot_primitives::ExecutorParams; use std::time::Duration; fn do_prepare_runtime(pvf: PvfPrepData) { - let blob = match prevalidate(&pvf.code()) { + let maybe_compressed_code = pvf.maybe_compressed_code(); + let raw_validation_code = + sp_maybe_compressed_blob::decompress(&maybe_compressed_code, usize::MAX).unwrap(); + + let blob = match prevalidate(&raw_validation_code) { Err(err) => panic!("{:?}", err), Ok(b) => b, }; diff --git a/polkadot/node/core/pvf/prepare-worker/src/lib.rs b/polkadot/node/core/pvf/prepare-worker/src/lib.rs index ef33d11720eb..f8ebb6effcec 100644 --- a/polkadot/node/core/pvf/prepare-worker/src/lib.rs +++ b/polkadot/node/core/pvf/prepare-worker/src/lib.rs @@ -38,6 +38,7 @@ use polkadot_node_core_pvf_common::{ executor_interface::{prepare, prevalidate}, worker::{pipe2_cloexec, PipeFd, WorkerInfo}, }; +use polkadot_node_primitives::VALIDATION_CODE_BOMB_LIMIT; use codec::{Decode, Encode}; use polkadot_node_core_pvf_common::{ @@ -105,6 +106,12 @@ impl AsRef<[u8]> for CompiledArtifact { } } +#[derive(Encode, Decode)] +pub struct PrepareOutcome { + pub compiled_artifact: CompiledArtifact, + pub observed_wasm_code_len: u32, +} + /// Get a worker request. fn recv_request(stream: &mut UnixStream) -> io::Result { let pvf = framed_recv_blocking(stream)?; @@ -294,14 +301,23 @@ pub fn worker_entrypoint( ); } -fn prepare_artifact(pvf: PvfPrepData) -> Result { - let blob = match prevalidate(&pvf.code()) { +fn prepare_artifact(pvf: PvfPrepData) -> Result { + let maybe_compressed_code = pvf.maybe_compressed_code(); + let raw_validation_code = + sp_maybe_compressed_blob::decompress(&maybe_compressed_code, VALIDATION_CODE_BOMB_LIMIT) + .map_err(|e| PrepareError::CouldNotDecompressCodeBlob(e.to_string()))?; + let observed_wasm_code_len = raw_validation_code.len() as u32; + + let blob = match prevalidate(&raw_validation_code) { Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))), Ok(b) => b, }; match prepare(blob, &pvf.executor_params()) { - Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)), + Ok(compiled_artifact) => Ok(PrepareOutcome { + compiled_artifact: CompiledArtifact::new(compiled_artifact), + observed_wasm_code_len, + }), Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))), } } @@ -322,6 +338,7 @@ fn runtime_construction_check( struct JobResponse { artifact: CompiledArtifact, memory_stats: MemoryStats, + observed_wasm_code_len: u32, } #[cfg(target_os = "linux")] @@ -500,11 +517,11 @@ fn handle_child_process( "prepare worker", move || { #[allow(unused_mut)] - let mut result = prepare_artifact(pvf); + let mut result = prepare_artifact(pvf).map(|o| (o,)); // Get the `ru_maxrss` stat. If supported, call getrusage for the thread. #[cfg(target_os = "linux")] - let mut result = result.map(|artifact| (artifact, get_max_rss_thread())); + let mut result = result.map(|outcome| (outcome.0, get_max_rss_thread())); // If we are pre-checking, check for runtime construction errors. // @@ -513,7 +530,10 @@ fn handle_child_process( // anyway. if let PrepareJobKind::Prechecking = prepare_job_kind { result = result.and_then(|output| { - runtime_construction_check(output.0.as_ref(), &executor_params)?; + runtime_construction_check( + output.0.compiled_artifact.as_ref(), + &executor_params, + )?; Ok(output) }); } @@ -553,9 +573,9 @@ fn handle_child_process( Ok(ok) => { cfg_if::cfg_if! { if #[cfg(target_os = "linux")] { - let (artifact, max_rss) = ok; + let (PrepareOutcome { compiled_artifact, observed_wasm_code_len }, max_rss) = ok; } else { - let artifact = ok; + let (PrepareOutcome { compiled_artifact, observed_wasm_code_len },) = ok; } } @@ -574,7 +594,11 @@ fn handle_child_process( peak_tracked_alloc: if peak_alloc > 0 { peak_alloc as u64 } else { 0u64 }, }; - Ok(JobResponse { artifact, memory_stats }) + Ok(JobResponse { + artifact: compiled_artifact, + observed_wasm_code_len, + memory_stats, + }) }, } }, @@ -665,7 +689,7 @@ fn handle_parent_process( match result { Err(err) => Err(err), - Ok(JobResponse { artifact, memory_stats }) => { + Ok(JobResponse { artifact, memory_stats, observed_wasm_code_len }) => { // The exit status should have been zero if no error occurred. if exit_status != 0 { return Err(PrepareError::JobError(format!( @@ -696,7 +720,11 @@ fn handle_parent_process( let checksum = blake3::hash(&artifact.as_ref()).to_hex().to_string(); Ok(PrepareWorkerSuccess { checksum, - stats: PrepareStats { memory_stats, cpu_time_elapsed: cpu_tv }, + stats: PrepareStats { + memory_stats, + cpu_time_elapsed: cpu_tv, + observed_wasm_code_len, + }, }) }, } diff --git a/polkadot/node/core/pvf/src/error.rs b/polkadot/node/core/pvf/src/error.rs index 8dc96305eadb..a0634106052d 100644 --- a/polkadot/node/core/pvf/src/error.rs +++ b/polkadot/node/core/pvf/src/error.rs @@ -52,6 +52,9 @@ pub enum InvalidCandidate { /// PVF execution (compilation is not included) took more time than was allotted. #[error("invalid: hard timeout")] HardTimeout, + /// Proof-of-validity failed to decompress correctly + #[error("invalid: PoV failed to decompress")] + PoVDecompressionFailure, } /// Possibly transient issue that may resolve after retries. diff --git a/polkadot/node/core/pvf/src/execute/queue.rs b/polkadot/node/core/pvf/src/execute/queue.rs index bb00a5a652d6..11031bf1074a 100644 --- a/polkadot/node/core/pvf/src/execute/queue.rs +++ b/polkadot/node/core/pvf/src/execute/queue.rs @@ -34,12 +34,14 @@ use polkadot_node_core_pvf_common::{ execute::{JobResponse, WorkerError, WorkerResponse}, SecurityStatus, }; -use polkadot_primitives::{ExecutorParams, ExecutorParamsHash}; +use polkadot_node_primitives::PoV; +use polkadot_primitives::{ExecutorParams, ExecutorParamsHash, PersistedValidationData}; use slotmap::HopSlotMap; use std::{ collections::VecDeque, fmt, path::PathBuf, + sync::Arc, time::{Duration, Instant}, }; @@ -68,7 +70,8 @@ pub enum FromQueue { #[derive(Debug)] pub struct PendingExecutionRequest { pub exec_timeout: Duration, - pub params: Vec, + pub pvd: Arc, + pub pov: Arc, pub executor_params: ExecutorParams, pub result_tx: ResultSender, } @@ -76,7 +79,8 @@ pub struct PendingExecutionRequest { struct ExecuteJob { artifact: ArtifactPathId, exec_timeout: Duration, - params: Vec, + pvd: Arc, + pov: Arc, executor_params: ExecutorParams, result_tx: ResultSender, waiting_since: Instant, @@ -293,18 +297,20 @@ async fn purge_dead(metrics: &Metrics, workers: &mut Workers) { fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) { let ToQueue::Enqueue { artifact, pending_execution_request } = to_queue; - let PendingExecutionRequest { exec_timeout, params, executor_params, result_tx } = + let PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx } = pending_execution_request; gum::debug!( target: LOG_TARGET, validation_code_hash = ?artifact.id.code_hash, "enqueueing an artifact for execution", ); + queue.metrics.observe_pov_size(pov.block_data.0.len(), true); queue.metrics.execute_enqueued(); let job = ExecuteJob { artifact, exec_timeout, - params, + pvd, + pov, executor_params, result_tx, waiting_since: Instant::now(), @@ -352,15 +358,19 @@ async fn handle_job_finish( artifact_id: ArtifactId, result_tx: ResultSender, ) { - let (idle_worker, result, duration, sync_channel) = match worker_result { + let (idle_worker, result, duration, sync_channel, pov_size) = match worker_result { Ok(WorkerInterfaceResponse { worker_response: - WorkerResponse { job_response: JobResponse::Ok { result_descriptor }, duration }, + WorkerResponse { + job_response: JobResponse::Ok { result_descriptor }, + duration, + pov_size, + }, idle_worker, }) => { // TODO: propagate the soft timeout - (Some(idle_worker), Ok(result_descriptor), Some(duration), None) + (Some(idle_worker), Ok(result_descriptor), Some(duration), None, Some(pov_size)) }, Ok(WorkerInterfaceResponse { worker_response: WorkerResponse { job_response: JobResponse::InvalidCandidate(err), .. }, @@ -370,6 +380,18 @@ async fn handle_job_finish( Err(ValidationError::Invalid(InvalidCandidate::WorkerReportedInvalid(err))), None, None, + None, + ), + Ok(WorkerInterfaceResponse { + worker_response: + WorkerResponse { job_response: JobResponse::PoVDecompressionFailure, .. }, + idle_worker, + }) => ( + Some(idle_worker), + Err(ValidationError::Invalid(InvalidCandidate::PoVDecompressionFailure)), + None, + None, + None, ), Ok(WorkerInterfaceResponse { worker_response: @@ -393,39 +415,46 @@ async fn handle_job_finish( ))), None, Some(result_rx), + None, ) }, Err(WorkerInterfaceError::InternalError(err)) | Err(WorkerInterfaceError::WorkerError(WorkerError::InternalError(err))) => - (None, Err(ValidationError::Internal(err)), None, None), + (None, Err(ValidationError::Internal(err)), None, None, None), // Either the worker or the job timed out. Kill the worker in either case. Treated as // definitely-invalid, because if we timed out, there's no time left for a retry. Err(WorkerInterfaceError::HardTimeout) | Err(WorkerInterfaceError::WorkerError(WorkerError::JobTimedOut)) => - (None, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)), None, None), + (None, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)), None, None, None), // "Maybe invalid" errors (will retry). Err(WorkerInterfaceError::CommunicationErr(_err)) => ( None, Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)), None, None, + None, ), Err(WorkerInterfaceError::WorkerError(WorkerError::JobDied { err, .. })) => ( None, Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousJobDeath(err))), None, None, + None, ), Err(WorkerInterfaceError::WorkerError(WorkerError::JobError(err))) => ( None, Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(err.to_string()))), None, None, + None, ), }; queue.metrics.execute_finished(); + if let Some(pov_size) = pov_size { + queue.metrics.observe_pov_size(pov_size as usize, false) + } if let Err(ref err) = result { gum::warn!( target: LOG_TARGET, @@ -573,7 +602,8 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) { idle, job.artifact.clone(), job.exec_timeout, - job.params, + job.pvd, + job.pov, ) .await; QueueEvent::StartWork(worker, result, job.artifact.id, job.result_tx) diff --git a/polkadot/node/core/pvf/src/execute/worker_interface.rs b/polkadot/node/core/pvf/src/execute/worker_interface.rs index d15d7c15426e..77bd6bedd75c 100644 --- a/polkadot/node/core/pvf/src/execute/worker_interface.rs +++ b/polkadot/node/core/pvf/src/execute/worker_interface.rs @@ -32,8 +32,9 @@ use polkadot_node_core_pvf_common::{ execute::{Handshake, WorkerError, WorkerResponse}, worker_dir, SecurityStatus, }; -use polkadot_primitives::ExecutorParams; -use std::{path::Path, time::Duration}; +use polkadot_node_primitives::PoV; +use polkadot_primitives::{ExecutorParams, PersistedValidationData}; +use std::{path::Path, sync::Arc, time::Duration}; use tokio::{io, net::UnixStream}; /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout. @@ -123,7 +124,8 @@ pub async fn start_work( worker: IdleWorker, artifact: ArtifactPathId, execution_timeout: Duration, - validation_params: Vec, + pvd: Arc, + pov: Arc, ) -> Result { let IdleWorker { mut stream, pid, worker_dir } = worker; @@ -137,18 +139,16 @@ pub async fn start_work( ); with_worker_dir_setup(worker_dir, pid, &artifact.path, |worker_dir| async move { - send_request(&mut stream, &validation_params, execution_timeout).await.map_err( - |error| { - gum::warn!( - target: LOG_TARGET, - worker_pid = %pid, - validation_code_hash = ?artifact.id.code_hash, - "failed to send an execute request: {}", - error, - ); - Error::InternalError(InternalValidationError::HostCommunication(error.to_string())) - }, - )?; + send_request(&mut stream, pvd, pov, execution_timeout).await.map_err(|error| { + gum::warn!( + target: LOG_TARGET, + worker_pid = %pid, + validation_code_hash = ?artifact.id.code_hash, + "failed to send an execute request: {}", + error, + ); + Error::InternalError(InternalValidationError::HostCommunication(error.to_string())) + })?; // We use a generous timeout here. This is in addition to the one in the child process, in // case the child stalls. We have a wall clock timeout here in the host, but a CPU timeout @@ -288,10 +288,12 @@ async fn send_execute_handshake(stream: &mut UnixStream, handshake: Handshake) - async fn send_request( stream: &mut UnixStream, - validation_params: &[u8], + pvd: Arc, + pov: Arc, execution_timeout: Duration, ) -> io::Result<()> { - framed_send(stream, validation_params).await?; + framed_send(stream, &pvd.encode()).await?; + framed_send(stream, &pov.encode()).await?; framed_send(stream, &execution_timeout.encode()).await } diff --git a/polkadot/node/core/pvf/src/host.rs b/polkadot/node/core/pvf/src/host.rs index 462631d33b52..44a4cba2fbf8 100644 --- a/polkadot/node/core/pvf/src/host.rs +++ b/polkadot/node/core/pvf/src/host.rs @@ -36,11 +36,14 @@ use polkadot_node_core_pvf_common::{ prepare::PrepareSuccess, pvf::PvfPrepData, }; +use polkadot_node_primitives::PoV; use polkadot_node_subsystem::{SubsystemError, SubsystemResult}; use polkadot_parachain_primitives::primitives::ValidationResult; +use polkadot_primitives::PersistedValidationData; use std::{ collections::HashMap, path::PathBuf, + sync::Arc, time::{Duration, SystemTime}, }; @@ -108,7 +111,8 @@ impl ValidationHost { &mut self, pvf: PvfPrepData, exec_timeout: Duration, - params: Vec, + pvd: Arc, + pov: Arc, priority: Priority, result_tx: ResultSender, ) -> Result<(), String> { @@ -116,7 +120,8 @@ impl ValidationHost { .send(ToHost::ExecutePvf(ExecutePvfInputs { pvf, exec_timeout, - params, + pvd, + pov, priority, result_tx, })) @@ -147,7 +152,8 @@ enum ToHost { struct ExecutePvfInputs { pvf: PvfPrepData, exec_timeout: Duration, - params: Vec, + pvd: Arc, + pov: Arc, priority: Priority, result_tx: ResultSender, } @@ -539,7 +545,7 @@ async fn handle_execute_pvf( awaiting_prepare: &mut AwaitingPrepare, inputs: ExecutePvfInputs, ) -> Result<(), Fatal> { - let ExecutePvfInputs { pvf, exec_timeout, params, priority, result_tx } = inputs; + let ExecutePvfInputs { pvf, exec_timeout, pvd, pov, priority, result_tx } = inputs; let artifact_id = ArtifactId::from_pvf_prep_data(&pvf); let executor_params = (*pvf.executor_params()).clone(); @@ -558,7 +564,8 @@ async fn handle_execute_pvf( artifact: ArtifactPathId::new(artifact_id, path), pending_execution_request: PendingExecutionRequest { exec_timeout, - params, + pvd, + pov, executor_params, result_tx, }, @@ -587,7 +594,8 @@ async fn handle_execute_pvf( artifact_id, PendingExecutionRequest { exec_timeout, - params, + pvd, + pov, executor_params, result_tx, }, @@ -598,7 +606,7 @@ async fn handle_execute_pvf( ArtifactState::Preparing { .. } => { awaiting_prepare.add( artifact_id, - PendingExecutionRequest { exec_timeout, params, executor_params, result_tx }, + PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx }, ); }, ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => { @@ -627,7 +635,8 @@ async fn handle_execute_pvf( artifact_id, PendingExecutionRequest { exec_timeout, - params, + pvd, + pov, executor_params, result_tx, }, @@ -648,7 +657,7 @@ async fn handle_execute_pvf( pvf, priority, artifact_id, - PendingExecutionRequest { exec_timeout, params, executor_params, result_tx }, + PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx }, ) .await?; } @@ -770,7 +779,7 @@ async fn handle_prepare_done( // It's finally time to dispatch all the execution requests that were waiting for this artifact // to be prepared. let pending_requests = awaiting_prepare.take(&artifact_id); - for PendingExecutionRequest { exec_timeout, params, executor_params, result_tx } in + for PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx } in pending_requests { if result_tx.is_canceled() { @@ -793,7 +802,8 @@ async fn handle_prepare_done( artifact: ArtifactPathId::new(artifact_id.clone(), &path), pending_execution_request: PendingExecutionRequest { exec_timeout, - params, + pvd, + pov, executor_params, result_tx, }, @@ -967,6 +977,8 @@ pub(crate) mod tests { use assert_matches::assert_matches; use futures::future::BoxFuture; use polkadot_node_core_pvf_common::prepare::PrepareStats; + use polkadot_node_primitives::BlockData; + use sp_core::H256; const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3); pub(crate) const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(30); @@ -1223,12 +1235,21 @@ pub(crate) mod tests { async fn execute_pvf_requests() { let mut test = Builder::default().build(); let mut host = test.host_handle(); + let pvd = Arc::new(PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }); + let pov1 = Arc::new(PoV { block_data: BlockData(b"pov1".to_vec()) }); + let pov2 = Arc::new(PoV { block_data: BlockData(b"pov2".to_vec()) }); let (result_tx, result_rx_pvf_1_1) = oneshot::channel(); host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf1".to_vec(), + pvd.clone(), + pov1.clone(), Priority::Normal, result_tx, ) @@ -1239,7 +1260,8 @@ pub(crate) mod tests { host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf1".to_vec(), + pvd.clone(), + pov1, Priority::Critical, result_tx, ) @@ -1250,7 +1272,8 @@ pub(crate) mod tests { host.execute_pvf( PvfPrepData::from_discriminator(2), TEST_EXECUTION_TIMEOUT, - b"pvf2".to_vec(), + pvd, + pov2, Priority::Normal, result_tx, ) @@ -1382,6 +1405,13 @@ pub(crate) mod tests { async fn test_prepare_done() { let mut test = Builder::default().build(); let mut host = test.host_handle(); + let pvd = Arc::new(PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }); + let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) }); // Test mixed cases of receiving execute and precheck requests // for the same PVF. @@ -1391,7 +1421,8 @@ pub(crate) mod tests { host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf2".to_vec(), + pvd.clone(), + pov.clone(), Priority::Critical, result_tx, ) @@ -1438,7 +1469,8 @@ pub(crate) mod tests { host.execute_pvf( PvfPrepData::from_discriminator(2), TEST_EXECUTION_TIMEOUT, - b"pvf2".to_vec(), + pvd, + pov, Priority::Critical, result_tx, ) @@ -1534,13 +1566,21 @@ pub(crate) mod tests { async fn test_execute_prepare_retry() { let mut test = Builder::default().build(); let mut host = test.host_handle(); + let pvd = Arc::new(PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }); + let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) }); // Submit a execute request that fails. let (result_tx, result_rx) = oneshot::channel(); host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf".to_vec(), + pvd.clone(), + pov.clone(), Priority::Critical, result_tx, ) @@ -1570,7 +1610,8 @@ pub(crate) mod tests { host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf".to_vec(), + pvd.clone(), + pov.clone(), Priority::Critical, result_tx_2, ) @@ -1592,7 +1633,8 @@ pub(crate) mod tests { host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf".to_vec(), + pvd.clone(), + pov.clone(), Priority::Critical, result_tx_3, ) @@ -1636,13 +1678,21 @@ pub(crate) mod tests { async fn test_execute_prepare_no_retry() { let mut test = Builder::default().build(); let mut host = test.host_handle(); + let pvd = Arc::new(PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }); + let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) }); // Submit an execute request that fails. let (result_tx, result_rx) = oneshot::channel(); host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf".to_vec(), + pvd.clone(), + pov.clone(), Priority::Critical, result_tx, ) @@ -1672,7 +1722,8 @@ pub(crate) mod tests { host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf".to_vec(), + pvd.clone(), + pov.clone(), Priority::Critical, result_tx_2, ) @@ -1694,7 +1745,8 @@ pub(crate) mod tests { host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf".to_vec(), + pvd.clone(), + pov.clone(), Priority::Critical, result_tx_3, ) @@ -1755,12 +1807,20 @@ pub(crate) mod tests { async fn cancellation() { let mut test = Builder::default().build(); let mut host = test.host_handle(); + let pvd = Arc::new(PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }); + let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) }); let (result_tx, result_rx) = oneshot::channel(); host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf1".to_vec(), + pvd, + pov, Priority::Normal, result_tx, ) diff --git a/polkadot/node/core/pvf/src/metrics.rs b/polkadot/node/core/pvf/src/metrics.rs index bc8d300037fe..c59cab464180 100644 --- a/polkadot/node/core/pvf/src/metrics.rs +++ b/polkadot/node/core/pvf/src/metrics.rs @@ -105,6 +105,21 @@ impl Metrics { .observe((memory_stats.peak_tracked_alloc / 1024) as f64); } } + + pub(crate) fn observe_code_size(&self, code_size: usize) { + if let Some(metrics) = &self.0 { + metrics.code_size.observe(code_size as f64); + } + } + + pub(crate) fn observe_pov_size(&self, pov_size: usize, compressed: bool) { + if let Some(metrics) = &self.0 { + metrics + .pov_size + .with_label_values(&[if compressed { "true" } else { "false" }]) + .observe(pov_size as f64); + } + } } #[derive(Clone)] @@ -129,6 +144,8 @@ struct MetricsInner { preparation_max_resident: prometheus::Histogram, // Peak allocation value, tracked by tracking-allocator preparation_peak_tracked_allocation: prometheus::Histogram, + pov_size: prometheus::HistogramVec, + code_size: prometheus::Histogram, } impl metrics::Metrics for Metrics { @@ -323,6 +340,35 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + // The following metrics was moved here from the candidate valiidation subsystem. + // Names are kept to avoid breaking dashboards and stuff. + pov_size: prometheus::register( + prometheus::HistogramVec::new( + prometheus::HistogramOpts::new( + "polkadot_parachain_candidate_validation_pov_size", + "The compressed and decompressed size of the proof of validity of a candidate", + ) + .buckets( + prometheus::exponential_buckets(16384.0, 2.0, 10) + .expect("arguments are always valid; qed"), + ), + &["compressed"], + )?, + registry, + )?, + code_size: prometheus::register( + prometheus::Histogram::with_opts( + prometheus::HistogramOpts::new( + "polkadot_parachain_candidate_validation_code_size", + "The size of the decompressed WASM validation blob used for checking a candidate", + ) + .buckets( + prometheus::exponential_buckets(16384.0, 2.0, 10) + .expect("arguments are always valid; qed"), + ), + )?, + registry, + )?, }; Ok(Metrics(Some(inner))) } diff --git a/polkadot/node/core/pvf/src/prepare/worker_interface.rs b/polkadot/node/core/pvf/src/prepare/worker_interface.rs index 22ee93319d84..d29d2717c4b6 100644 --- a/polkadot/node/core/pvf/src/prepare/worker_interface.rs +++ b/polkadot/node/core/pvf/src/prepare/worker_interface.rs @@ -211,7 +211,7 @@ async fn handle_response( // https://github.com/paritytech/polkadot-sdk/issues/2399 let PrepareWorkerSuccess { checksum: _, - stats: PrepareStats { cpu_time_elapsed, memory_stats }, + stats: PrepareStats { cpu_time_elapsed, memory_stats, observed_wasm_code_len }, } = match result.clone() { Ok(result) => result, // Timed out on the child. This should already be logged by the child. @@ -221,6 +221,8 @@ async fn handle_response( Err(err) => return Outcome::Concluded { worker, result: Err(err) }, }; + metrics.observe_code_size(observed_wasm_code_len as usize); + if cpu_time_elapsed > preparation_timeout { // The job didn't complete within the timeout. gum::warn!( @@ -267,7 +269,11 @@ async fn handle_response( result: Ok(PrepareSuccess { path: artifact_path, size, - stats: PrepareStats { cpu_time_elapsed, memory_stats: memory_stats.clone() }, + stats: PrepareStats { + cpu_time_elapsed, + memory_stats: memory_stats.clone(), + observed_wasm_code_len, + }, }), }, Err(err) => { diff --git a/polkadot/node/core/pvf/tests/it/adder.rs b/polkadot/node/core/pvf/tests/it/adder.rs index 455e8c36c88d..1a95a28fe077 100644 --- a/polkadot/node/core/pvf/tests/it/adder.rs +++ b/polkadot/node/core/pvf/tests/it/adder.rs @@ -18,29 +18,33 @@ use super::TestHost; use codec::{Decode, Encode}; +use polkadot_node_primitives::PoV; use polkadot_parachain_primitives::primitives::{ - BlockData as GenericBlockData, HeadData as GenericHeadData, RelayChainBlockNumber, - ValidationParams, + BlockData as GenericBlockData, HeadData as GenericHeadData, }; +use polkadot_primitives::PersistedValidationData; +use sp_core::H256; use test_parachain_adder::{hash_state, BlockData, HeadData}; #[tokio::test] async fn execute_good_block_on_parent() { let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; - let block_data = BlockData { state: 0, add: 512 }; + let pvd = PersistedValidationData { + parent_head: GenericHeadData(parent_head.encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(block_data.encode()) }; let host = TestHost::new().await; let ret = host .validate_candidate( test_parachain_adder::wasm_binary_unwrap(), - ValidationParams { - parent_head: GenericHeadData(parent_head.encode()), - block_data: GenericBlockData(block_data.encode()), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ) .await @@ -63,18 +67,20 @@ async fn execute_good_chain_on_parent() { for (number, add) in (0..10).enumerate() { let parent_head = HeadData { number: number as u64, parent_hash, post_state: hash_state(last_state) }; - let block_data = BlockData { state: last_state, add }; + let pvd = PersistedValidationData { + parent_head: GenericHeadData(parent_head.encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(block_data.encode()) }; let ret = host .validate_candidate( test_parachain_adder::wasm_binary_unwrap(), - ValidationParams { - parent_head: GenericHeadData(parent_head.encode()), - block_data: GenericBlockData(block_data.encode()), - relay_parent_number: number as RelayChainBlockNumber + 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ) .await @@ -94,23 +100,25 @@ async fn execute_good_chain_on_parent() { #[tokio::test] async fn execute_bad_block_on_parent() { let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; - let block_data = BlockData { state: 256, // start state is wrong. add: 256, }; + let pvd = PersistedValidationData { + parent_head: GenericHeadData(parent_head.encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(block_data.encode()) }; let host = TestHost::new().await; let _err = host .validate_candidate( test_parachain_adder::wasm_binary_unwrap(), - ValidationParams { - parent_head: GenericHeadData(parent_head.encode()), - block_data: GenericBlockData(block_data.encode()), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ) .await @@ -124,15 +132,18 @@ async fn stress_spawn() { async fn execute(host: std::sync::Arc) { let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; let block_data = BlockData { state: 0, add: 512 }; + let pvd = PersistedValidationData { + parent_head: GenericHeadData(parent_head.encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(block_data.encode()) }; let ret = host .validate_candidate( test_parachain_adder::wasm_binary_unwrap(), - ValidationParams { - parent_head: GenericHeadData(parent_head.encode()), - block_data: GenericBlockData(block_data.encode()), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ) .await @@ -161,15 +172,18 @@ async fn execute_can_run_serially() { async fn execute(host: std::sync::Arc) { let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; let block_data = BlockData { state: 0, add: 512 }; + let pvd = PersistedValidationData { + parent_head: GenericHeadData(parent_head.encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(block_data.encode()) }; let ret = host .validate_candidate( test_parachain_adder::wasm_binary_unwrap(), - ValidationParams { - parent_head: GenericHeadData(parent_head.encode()), - block_data: GenericBlockData(block_data.encode()), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ) .await diff --git a/polkadot/node/core/pvf/tests/it/main.rs b/polkadot/node/core/pvf/tests/it/main.rs index 9ad486657512..a4a085318957 100644 --- a/polkadot/node/core/pvf/tests/it/main.rs +++ b/polkadot/node/core/pvf/tests/it/main.rs @@ -17,7 +17,6 @@ //! General PVF host integration tests checking the functionality of the PVF host itself. use assert_matches::assert_matches; -use codec::Encode as _; #[cfg(all(feature = "ci-only-tests", target_os = "linux"))] use polkadot_node_core_pvf::SecurityStatus; use polkadot_node_core_pvf::{ @@ -25,10 +24,14 @@ use polkadot_node_core_pvf::{ PossiblyInvalidError, PrepareError, PrepareJobKind, PvfPrepData, ValidationError, ValidationHost, JOB_TIMEOUT_WALL_CLOCK_FACTOR, }; -use polkadot_parachain_primitives::primitives::{BlockData, ValidationParams, ValidationResult}; -use polkadot_primitives::{ExecutorParam, ExecutorParams, PvfExecKind, PvfPrepKind}; +use polkadot_node_primitives::{PoV, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT}; +use polkadot_parachain_primitives::primitives::{BlockData, ValidationResult}; +use polkadot_primitives::{ + ExecutorParam, ExecutorParams, PersistedValidationData, PvfExecKind, PvfPrepKind, +}; +use sp_core::H256; -use std::{io::Write, time::Duration}; +use std::{io::Write, sync::Arc, time::Duration}; use tokio::sync::Mutex; mod adder; @@ -80,9 +83,6 @@ impl TestHost { ) -> Result<(), PrepareError> { let (result_tx, result_rx) = futures::channel::oneshot::channel(); - let code = sp_maybe_compressed_blob::decompress(code, 16 * 1024 * 1024) - .expect("Compression works"); - self.host .lock() .await @@ -103,14 +103,12 @@ impl TestHost { async fn validate_candidate( &self, code: &[u8], - params: ValidationParams, + pvd: PersistedValidationData, + pov: PoV, executor_params: ExecutorParams, ) -> Result { let (result_tx, result_rx) = futures::channel::oneshot::channel(); - let code = sp_maybe_compressed_blob::decompress(code, 16 * 1024 * 1024) - .expect("Compression works"); - self.host .lock() .await @@ -122,7 +120,8 @@ impl TestHost { PrepareJobKind::Compilation, ), TEST_EXECUTION_TIMEOUT, - params.encode(), + Arc::new(pvd), + Arc::new(pov), polkadot_node_core_pvf::Priority::Normal, result_tx, ) @@ -159,19 +158,17 @@ async fn prepare_job_terminates_on_timeout() { #[tokio::test] async fn execute_job_terminates_on_timeout() { let host = TestHost::new().await; + let pvd = PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: BlockData(Vec::new()) }; let start = std::time::Instant::now(); let result = host - .validate_candidate( - test_parachain_halt::wasm_binary_unwrap(), - ValidationParams { - block_data: BlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, - Default::default(), - ) + .validate_candidate(test_parachain_halt::wasm_binary_unwrap(), pvd, pov, Default::default()) .await; match result { @@ -189,24 +186,23 @@ async fn execute_job_terminates_on_timeout() { async fn ensure_parallel_execution() { // Run some jobs that do not complete, thus timing out. let host = TestHost::new().await; + let pvd = PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: BlockData(Vec::new()) }; let execute_pvf_future_1 = host.validate_candidate( test_parachain_halt::wasm_binary_unwrap(), - ValidationParams { - block_data: BlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd.clone(), + pov.clone(), Default::default(), ); let execute_pvf_future_2 = host.validate_candidate( test_parachain_halt::wasm_binary_unwrap(), - ValidationParams { - block_data: BlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ); @@ -237,6 +233,13 @@ async fn execute_queue_doesnt_stall_if_workers_died() { cfg.execute_workers_max_num = 5; }) .await; + let pvd = PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: BlockData(Vec::new()) }; // Here we spawn 8 validation jobs for the `halt` PVF and share those between 5 workers. The // first five jobs should timeout and the workers killed. For the next 3 jobs a new batch of @@ -245,12 +248,8 @@ async fn execute_queue_doesnt_stall_if_workers_died() { futures::future::join_all((0u8..=8).map(|_| { host.validate_candidate( test_parachain_halt::wasm_binary_unwrap(), - ValidationParams { - block_data: BlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd.clone(), + pov.clone(), Default::default(), ) })) @@ -275,6 +274,13 @@ async fn execute_queue_doesnt_stall_with_varying_executor_params() { cfg.execute_workers_max_num = 2; }) .await; + let pvd = PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: BlockData(Vec::new()) }; let executor_params_1 = ExecutorParams::default(); let executor_params_2 = ExecutorParams::from(&[ExecutorParam::StackLogicalMax(1024)][..]); @@ -288,12 +294,8 @@ async fn execute_queue_doesnt_stall_with_varying_executor_params() { futures::future::join_all((0u8..6).map(|i| { host.validate_candidate( test_parachain_halt::wasm_binary_unwrap(), - ValidationParams { - block_data: BlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd.clone(), + pov.clone(), match i % 3 { 0 => executor_params_1.clone(), _ => executor_params_2.clone(), @@ -324,6 +326,13 @@ async fn execute_queue_doesnt_stall_with_varying_executor_params() { async fn deleting_prepared_artifact_does_not_dispute() { let host = TestHost::new().await; let cache_dir = host.cache_dir.path(); + let pvd = PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: BlockData(Vec::new()) }; let _stats = host .precheck_pvf(test_parachain_halt::wasm_binary_unwrap(), Default::default()) @@ -347,16 +356,7 @@ async fn deleting_prepared_artifact_does_not_dispute() { // Try to validate, artifact should get recreated. let result = host - .validate_candidate( - test_parachain_halt::wasm_binary_unwrap(), - ValidationParams { - block_data: BlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, - Default::default(), - ) + .validate_candidate(test_parachain_halt::wasm_binary_unwrap(), pvd, pov, Default::default()) .await; assert_matches!(result, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout))); @@ -367,6 +367,13 @@ async fn deleting_prepared_artifact_does_not_dispute() { async fn corrupted_prepared_artifact_does_not_dispute() { let host = TestHost::new().await; let cache_dir = host.cache_dir.path(); + let pvd = PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: BlockData(Vec::new()) }; let _stats = host .precheck_pvf(test_parachain_halt::wasm_binary_unwrap(), Default::default()) @@ -400,16 +407,7 @@ async fn corrupted_prepared_artifact_does_not_dispute() { // Try to validate, artifact should get removed because of the corruption. let result = host - .validate_candidate( - test_parachain_halt::wasm_binary_unwrap(), - ValidationParams { - block_data: BlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, - Default::default(), - ) + .validate_candidate(test_parachain_halt::wasm_binary_unwrap(), pvd, pov, Default::default()) .await; assert_matches!( @@ -652,3 +650,65 @@ async fn artifact_does_reprepare_on_meaningful_exec_parameter_change() { assert_eq!(cache_dir_contents.len(), 3); // new artifact has been added } + +// Checks that we cannot prepare oversized compressed code +#[tokio::test] +async fn invalid_compressed_code_fails_prechecking() { + let host = TestHost::new().await; + let raw_code = vec![2u8; VALIDATION_CODE_BOMB_LIMIT + 1]; + let validation_code = + sp_maybe_compressed_blob::compress(&raw_code, VALIDATION_CODE_BOMB_LIMIT + 1).unwrap(); + + let res = host.precheck_pvf(&validation_code, Default::default()).await; + + assert_matches!(res, Err(PrepareError::CouldNotDecompressCodeBlob(_))); +} + +// Checks that we cannot validate with oversized compressed code +#[tokio::test] +async fn invalid_compressed_code_fails_validation() { + let host = TestHost::new().await; + let pvd = PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: BlockData(Vec::new()) }; + + let raw_code = vec![2u8; VALIDATION_CODE_BOMB_LIMIT + 1]; + let validation_code = + sp_maybe_compressed_blob::compress(&raw_code, VALIDATION_CODE_BOMB_LIMIT + 1).unwrap(); + + let result = host.validate_candidate(&validation_code, pvd, pov, Default::default()).await; + + assert_matches!( + result, + Err(ValidationError::Preparation(PrepareError::CouldNotDecompressCodeBlob(_))) + ); +} + +// Checks that we cannot validate with an oversized PoV +#[tokio::test] +async fn invalid_compressed_pov_fails_validation() { + let host = TestHost::new().await; + let pvd = PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let raw_block_data = vec![1u8; POV_BOMB_LIMIT + 1]; + let block_data = + sp_maybe_compressed_blob::compress(&raw_block_data, POV_BOMB_LIMIT + 1).unwrap(); + let pov = PoV { block_data: BlockData(block_data) }; + + let result = host + .validate_candidate(test_parachain_halt::wasm_binary_unwrap(), pvd, pov, Default::default()) + .await; + + assert_matches!( + result, + Err(ValidationError::Invalid(InvalidCandidate::PoVDecompressionFailure)) + ); +} diff --git a/polkadot/node/core/pvf/tests/it/process.rs b/polkadot/node/core/pvf/tests/it/process.rs index b8fd2cdce0ce..b3023c8a45c3 100644 --- a/polkadot/node/core/pvf/tests/it/process.rs +++ b/polkadot/node/core/pvf/tests/it/process.rs @@ -23,11 +23,14 @@ use codec::Encode; use polkadot_node_core_pvf::{ InvalidCandidate, PossiblyInvalidError, PrepareError, ValidationError, }; +use polkadot_node_primitives::PoV; use polkadot_parachain_primitives::primitives::{ - BlockData as GenericBlockData, HeadData as GenericHeadData, ValidationParams, + BlockData as GenericBlockData, HeadData as GenericHeadData, }; +use polkadot_primitives::PersistedValidationData; use procfs::process; use rusty_fork::rusty_fork_test; +use sp_core::H256; use std::{future::Future, sync::Arc, time::Duration}; use test_parachain_adder::{hash_state, BlockData, HeadData}; @@ -125,15 +128,18 @@ rusty_fork_test! { test_wrapper(|host, _sid| async move { let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; let block_data = BlockData { state: 0, add: 512 }; + let pvd = PersistedValidationData { + parent_head: GenericHeadData(parent_head.encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(block_data.encode()) }; host .validate_candidate( test_parachain_adder::wasm_binary_unwrap(), - ValidationParams { - parent_head: GenericHeadData(parent_head.encode()), - block_data: GenericBlockData(block_data.encode()), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ) .await @@ -166,17 +172,20 @@ rusty_fork_test! { // Prepare the artifact ahead of time. let binary = test_parachain_halt::wasm_binary_unwrap(); host.precheck_pvf(binary, Default::default()).await.unwrap(); + let pvd = PersistedValidationData { + parent_head: GenericHeadData(HeadData::default().encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(Vec::new()) }; let (result, _) = futures::join!( // Choose an job that would normally take the entire timeout. host.validate_candidate( binary, - ValidationParams { - block_data: GenericBlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ), // Send a stop signal to pause the worker. @@ -218,17 +227,20 @@ rusty_fork_test! { // Prepare the artifact ahead of time. let binary = test_parachain_halt::wasm_binary_unwrap(); host.precheck_pvf(binary, Default::default()).await.unwrap(); + let pvd = PersistedValidationData { + parent_head: GenericHeadData(HeadData::default().encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(Vec::new()) }; let (result, _) = futures::join!( // Choose an job that would normally take the entire timeout. host.validate_candidate( binary, - ValidationParams { - block_data: GenericBlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ), // Run a future that kills the job while it's running. @@ -274,17 +286,20 @@ rusty_fork_test! { // Prepare the artifact ahead of time. let binary = test_parachain_halt::wasm_binary_unwrap(); host.precheck_pvf(binary, Default::default()).await.unwrap(); + let pvd = PersistedValidationData { + parent_head: GenericHeadData(HeadData::default().encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(Vec::new()) }; let (result, _) = futures::join!( // Choose a job that would normally take the entire timeout. host.validate_candidate( binary, - ValidationParams { - block_data: GenericBlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ), // Run a future that kills the job while it's running. @@ -342,17 +357,20 @@ rusty_fork_test! { // Prepare the artifact ahead of time. let binary = test_parachain_halt::wasm_binary_unwrap(); host.precheck_pvf(binary, Default::default()).await.unwrap(); + let pvd = PersistedValidationData { + parent_head: GenericHeadData(HeadData::default().encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(Vec::new()) }; let _ = futures::join!( // Choose a job that would normally take the entire timeout. host.validate_candidate( binary, - ValidationParams { - block_data: GenericBlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ), // Run a future that tests the thread count while the worker is running. diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 4e13d5eda76f..baaff9c7c9f6 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -466,7 +466,7 @@ pub async fn forward_events>(client: Arc

, mut hand message_capacity=2048, )] pub struct Overseer { - #[subsystem(blocking, CandidateValidationMessage, sends: [ + #[subsystem(CandidateValidationMessage, sends: [ RuntimeApiMessage, ])] candidate_validation: CandidateValidation, diff --git a/prdoc/pr_5142.prdoc b/prdoc/pr_5142.prdoc new file mode 100644 index 000000000000..4083e5bf53cd --- /dev/null +++ b/prdoc/pr_5142.prdoc @@ -0,0 +1,26 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: "Move decompression to worker processes" + +doc: + - audience: Node Dev + description: | + Candidate validation subsystem performed the PVF code decompression as well as the PoV + decompression itself which might affect the subsystem main loop performance and required + it to run on the blocking threadpool. This change moves the decompression to PVF host + workers running synchronously in separate processes. + +crates: + - name: polkadot-node-core-candidate-validation + bump: patch + - name: polkadot-overseer + bump: patch + - name: polkadot-node-core-pvf + bump: major + - name: polkadot-node-core-pvf-common + bump: major + - name: polkadot-node-core-pvf-execute-worker + bump: patch + - name: polkadot-node-core-pvf-prepare-worker + bump: patch