diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index 900d3107b034..59db8732a429 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -26,7 +26,7 @@ use polkadot_node_primitives::{ approval::{ BlockApprovalMeta, DelayTranche, IndirectAssignmentCert, IndirectSignedApprovalVote, }, - ValidationResult, APPROVAL_EXECUTION_TIMEOUT, + ValidationResult, }; use polkadot_node_subsystem::{ errors::RecoveryError, @@ -50,8 +50,8 @@ use polkadot_node_subsystem_util::{ }; use polkadot_primitives::{ ApprovalVote, BlockNumber, CandidateHash, CandidateIndex, CandidateReceipt, DisputeStatement, - GroupIndex, Hash, SessionIndex, SessionInfo, ValidDisputeStatementKind, ValidatorId, - ValidatorIndex, ValidatorPair, ValidatorSignature, + GroupIndex, Hash, PvfExecTimeoutKind, SessionIndex, SessionInfo, ValidDisputeStatementKind, + ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature, }; use sc_keystore::LocalKeystore; use sp_application_crypto::Pair; @@ -2399,7 +2399,7 @@ async fn launch_approval( validation_code, candidate.clone(), available_data.pov, - APPROVAL_EXECUTION_TIMEOUT, + PvfExecTimeoutKind::Approval, val_tx, )) .await; diff --git a/node/core/approval-voting/src/tests.rs b/node/core/approval-voting/src/tests.rs index ee13db7bcf54..396d945de6b2 100644 --- a/node/core/approval-voting/src/tests.rs +++ b/node/core/approval-voting/src/tests.rs @@ -2427,7 +2427,7 @@ pub async fn handle_double_assignment_import( }, AllMessages::CandidateValidation( CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx), - ) if timeout == APPROVAL_EXECUTION_TIMEOUT => { + ) if timeout == PvfExecTimeoutKind::Approval => { tx.send(Ok(ValidationResult::Valid(Default::default(), Default::default()))) .unwrap(); }, diff --git a/node/core/backing/src/lib.rs b/node/core/backing/src/lib.rs index d3b26458084a..520c466af6f8 100644 --- a/node/core/backing/src/lib.rs +++ b/node/core/backing/src/lib.rs @@ -32,7 +32,6 @@ use futures::{ use error::{Error, FatalResult}; use polkadot_node_primitives::{ AvailableData, InvalidCandidate, PoV, SignedFullStatement, Statement, ValidationResult, - BACKING_EXECUTION_TIMEOUT, }; use polkadot_node_subsystem::{ jaeger, @@ -50,8 +49,8 @@ use polkadot_node_subsystem_util::{ }; use polkadot_primitives::{ BackedCandidate, CandidateCommitments, CandidateHash, CandidateReceipt, CollatorId, - CommittedCandidateReceipt, CoreIndex, CoreState, Hash, Id as ParaId, SigningContext, - ValidatorId, ValidatorIndex, ValidatorSignature, ValidityAttestation, + CommittedCandidateReceipt, CoreIndex, CoreState, Hash, Id as ParaId, PvfExecTimeoutKind, + SigningContext, ValidatorId, ValidatorIndex, ValidatorSignature, ValidityAttestation, }; use sp_keystore::SyncCryptoStorePtr; use statement_table::{ @@ -650,7 +649,7 @@ async fn request_candidate_validation( .send_message(CandidateValidationMessage::ValidateFromChainState( candidate_receipt, pov, - BACKING_EXECUTION_TIMEOUT, + PvfExecTimeoutKind::Backing, tx, )) .await; diff --git a/node/core/backing/src/tests.rs b/node/core/backing/src/tests.rs index c4aac0700d42..96e8c809ec6c 100644 --- a/node/core/backing/src/tests.rs +++ b/node/core/backing/src/tests.rs @@ -32,7 +32,7 @@ use polkadot_node_subsystem::{ use polkadot_node_subsystem_test_helpers as test_helpers; use polkadot_primitives::{ CandidateDescriptor, CollatorId, GroupRotationInfo, HeadData, PersistedValidationData, - ScheduledCore, + PvfExecTimeoutKind, ScheduledCore, }; use sp_application_crypto::AppKey; use sp_keyring::Sr25519Keyring; @@ -307,7 +307,7 @@ fn backing_second_works() { timeout, tx, ) - ) if pov == pov && &candidate_receipt.descriptor == candidate.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT && candidate.commitments.hash() == candidate_receipt.commitments_hash => { + ) if pov == pov && &candidate_receipt.descriptor == candidate.descriptor() && timeout == PvfExecTimeoutKind::Backing && candidate.commitments.hash() == candidate_receipt.commitments_hash => { tx.send(Ok( ValidationResult::Valid(CandidateCommitments { head_data: expected_head_data.clone(), @@ -453,7 +453,7 @@ fn backing_works() { timeout, tx, ) - ) if pov == pov && c.descriptor() == candidate_a.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT && c.commitments_hash == candidate_a_commitments_hash=> { + ) if pov == pov && c.descriptor() == candidate_a.descriptor() && timeout == PvfExecTimeoutKind::Backing && c.commitments_hash == candidate_a_commitments_hash=> { tx.send(Ok( ValidationResult::Valid(CandidateCommitments { head_data: expected_head_data.clone(), @@ -625,7 +625,7 @@ fn backing_works_while_validation_ongoing() { timeout, tx, ) - ) if pov == pov && c.descriptor() == candidate_a.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT && candidate_a_commitments_hash == c.commitments_hash => { + ) if pov == pov && c.descriptor() == candidate_a.descriptor() && timeout == PvfExecTimeoutKind::Backing && candidate_a_commitments_hash == c.commitments_hash => { // we never validate the candidate. our local node // shouldn't issue any statements. std::mem::forget(tx); @@ -777,7 +777,7 @@ fn backing_misbehavior_works() { timeout, tx, ) - ) if pov == pov && c.descriptor() == candidate_a.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT && candidate_a_commitments_hash == c.commitments_hash => { + ) if pov == pov && c.descriptor() == candidate_a.descriptor() && timeout == PvfExecTimeoutKind::Backing && candidate_a_commitments_hash == c.commitments_hash => { tx.send(Ok( ValidationResult::Valid(CandidateCommitments { head_data: expected_head_data.clone(), @@ -921,7 +921,7 @@ fn backing_dont_second_invalid() { timeout, tx, ) - ) if pov == pov && c.descriptor() == candidate_a.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT => { + ) if pov == pov && c.descriptor() == candidate_a.descriptor() && timeout == PvfExecTimeoutKind::Backing => { tx.send(Ok(ValidationResult::Invalid(InvalidCandidate::BadReturn))).unwrap(); } ); @@ -950,7 +950,7 @@ fn backing_dont_second_invalid() { timeout, tx, ) - ) if pov == pov && c.descriptor() == candidate_b.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT => { + ) if pov == pov && c.descriptor() == candidate_b.descriptor() && timeout == PvfExecTimeoutKind::Backing => { tx.send(Ok( ValidationResult::Valid(CandidateCommitments { head_data: expected_head_data.clone(), @@ -1065,7 +1065,7 @@ fn backing_second_after_first_fails_works() { timeout, tx, ) - ) if pov == pov && c.descriptor() == candidate.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT && c.commitments_hash == candidate.commitments.hash() => { + ) if pov == pov && c.descriptor() == candidate.descriptor() && timeout == PvfExecTimeoutKind::Backing && c.commitments_hash == candidate.commitments.hash() => { tx.send(Ok(ValidationResult::Invalid(InvalidCandidate::BadReturn))).unwrap(); } ); @@ -1191,7 +1191,7 @@ fn backing_works_after_failed_validation() { timeout, tx, ) - ) if pov == pov && c.descriptor() == candidate.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT && c.commitments_hash == candidate.commitments.hash() => { + ) if pov == pov && c.descriptor() == candidate.descriptor() && timeout == PvfExecTimeoutKind::Backing && c.commitments_hash == candidate.commitments.hash() => { tx.send(Err(ValidationFailed("Internal test error".into()))).unwrap(); } ); @@ -1544,7 +1544,7 @@ fn retry_works() { timeout, _tx, ) - ) if pov == pov && c.descriptor() == candidate.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT && c.commitments_hash == candidate.commitments.hash() + ) if pov == pov && c.descriptor() == candidate.descriptor() && timeout == PvfExecTimeoutKind::Backing && c.commitments_hash == candidate.commitments.hash() ); virtual_overseer }); diff --git a/node/core/candidate-validation/src/lib.rs b/node/core/candidate-validation/src/lib.rs index cf84486f322a..fdd6ff002dd1 100644 --- a/node/core/candidate-validation/src/lib.rs +++ b/node/core/candidate-validation/src/lib.rs @@ -24,7 +24,7 @@ #![warn(missing_docs)] use polkadot_node_core_pvf::{ - InvalidCandidate as WasmInvalidCandidate, PrepareError, PrepareStats, PvfWithExecutorParams, + InvalidCandidate as WasmInvalidCandidate, PrepareError, PrepareStats, PvfPrepData, ValidationError, ValidationHost, }; use polkadot_node_primitives::{ @@ -43,7 +43,8 @@ use polkadot_node_subsystem_util::executor_params_at_relay_parent; use polkadot_parachain::primitives::{ValidationParams, ValidationResult as WasmValidationResult}; use polkadot_primitives::{ vstaging::ExecutorParams, CandidateCommitments, CandidateDescriptor, CandidateReceipt, Hash, - OccupiedCoreAssumption, PersistedValidationData, ValidationCode, ValidationCodeHash, + OccupiedCoreAssumption, PersistedValidationData, PvfExecTimeoutKind, PvfPrepTimeoutKind, + ValidationCode, ValidationCodeHash, }; use parity_scale_codec::Encode; @@ -68,6 +69,13 @@ const PVF_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(3); #[cfg(test)] const PVF_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200); +// Default PVF timeouts. Must never be changed! Use executor environment parameters in +// `session_info` pallet to adjust them. See also `PvfTimeoutKind` docs. +const DEFAULT_PRECHECK_PREPARATION_TIMEOUT: Duration = Duration::from_secs(60); +const DEFAULT_LENIENT_PREPARATION_TIMEOUT: Duration = Duration::from_secs(360); +const DEFAULT_BACKING_EXECUTION_TIMEOUT: Duration = Duration::from_secs(2); +const DEFAULT_APPROVAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(12); + /// Configuration for the candidate validation subsystem #[derive(Clone)] pub struct Config { @@ -330,18 +338,20 @@ where return PreCheckOutcome::Invalid }; - let pvf_with_params = match sp_maybe_compressed_blob::decompress( + let timeout = pvf_prep_timeout(&executor_params, PvfPrepTimeoutKind::Precheck); + + let pvf = match sp_maybe_compressed_blob::decompress( &validation_code.0, VALIDATION_CODE_BOMB_LIMIT, ) { - Ok(code) => PvfWithExecutorParams::from_code(code.into_owned(), executor_params), + Ok(code) => PvfPrepData::from_code(code.into_owned(), executor_params, timeout), Err(e) => { gum::debug!(target: LOG_TARGET, err=?e, "precheck: cannot decompress validation code"); return PreCheckOutcome::Invalid }, }; - match validation_backend.precheck_pvf(pvf_with_params).await { + match validation_backend.precheck_pvf(pvf).await { Ok(_) => PreCheckOutcome::Valid, Err(prepare_err) => if prepare_err.is_deterministic() { @@ -465,7 +475,7 @@ async fn validate_from_chain_state( validation_host: ValidationHost, candidate_receipt: CandidateReceipt, pov: Arc, - timeout: Duration, + exec_timeout_kind: PvfExecTimeoutKind, metrics: &Metrics, ) -> Result where @@ -485,7 +495,7 @@ where validation_code, candidate_receipt.clone(), pov, - timeout, + exec_timeout_kind, metrics, ) .await; @@ -521,7 +531,7 @@ async fn validate_candidate_exhaustive( validation_code: ValidationCode, candidate_receipt: CandidateReceipt, pov: Arc, - timeout: Duration, + exec_timeout_kind: PvfExecTimeoutKind, metrics: &Metrics, ) -> Result where @@ -606,7 +616,7 @@ where let result = validation_backend .validate_candidate_with_retry( raw_validation_code.to_vec(), - timeout, + pvf_exec_timeout(&executor_params, exec_timeout_kind), params, executor_params, ) @@ -667,8 +677,8 @@ trait ValidationBackend { /// Tries executing a PVF a single time (no retries). async fn validate_candidate( &mut self, - pvf_with_params: PvfWithExecutorParams, - timeout: Duration, + pvf: PvfPrepData, + exec_timeout: Duration, encoded_params: Vec, ) -> Result; @@ -677,16 +687,16 @@ trait ValidationBackend { async fn validate_candidate_with_retry( &mut self, raw_validation_code: Vec, - timeout: Duration, + exec_timeout: Duration, params: ValidationParams, executor_params: ExecutorParams, ) -> Result { // Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap. - let pvf_with_params = - PvfWithExecutorParams::from_code(raw_validation_code, executor_params); + let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepTimeoutKind::Lenient); + let pvf = PvfPrepData::from_code(raw_validation_code, executor_params, prep_timeout); let mut validation_result = - self.validate_candidate(pvf_with_params.clone(), timeout, params.encode()).await; + self.validate_candidate(pvf.clone(), exec_timeout, params.encode()).await; // If we get an AmbiguousWorkerDeath error, retry once after a brief delay, on the // assumption that the conditions that caused this error may have been transient. Note that @@ -699,23 +709,19 @@ trait ValidationBackend { gum::warn!( target: LOG_TARGET, - ?pvf_with_params, + ?pvf, "Re-trying failed candidate validation due to AmbiguousWorkerDeath." ); // Encode the params again when re-trying. We expect the retry case to be relatively // rare, and we want to avoid unconditionally cloning data. - validation_result = - self.validate_candidate(pvf_with_params, timeout, params.encode()).await; + validation_result = self.validate_candidate(pvf, exec_timeout, params.encode()).await; } validation_result } - async fn precheck_pvf( - &mut self, - pvf_with_params: PvfWithExecutorParams, - ) -> Result; + async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result; } #[async_trait] @@ -723,16 +729,14 @@ impl ValidationBackend for ValidationHost { /// Tries executing a PVF a single time (no retries). async fn validate_candidate( &mut self, - pvf_with_params: PvfWithExecutorParams, - timeout: Duration, + pvf: PvfPrepData, + exec_timeout: Duration, encoded_params: Vec, ) -> Result { let priority = polkadot_node_core_pvf::Priority::Normal; let (tx, rx) = oneshot::channel(); - if let Err(err) = - self.execute_pvf(pvf_with_params, timeout, encoded_params, priority, tx).await - { + if let Err(err) = self.execute_pvf(pvf, exec_timeout, encoded_params, priority, tx).await { return Err(ValidationError::InternalError(format!( "cannot send pvf to the validation host: {:?}", err @@ -743,12 +747,9 @@ impl ValidationBackend for ValidationHost { .map_err(|_| ValidationError::InternalError("validation was cancelled".into()))? } - async fn precheck_pvf( - &mut self, - pvf_with_params: PvfWithExecutorParams, - ) -> Result { + async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result { let (tx, rx) = oneshot::channel(); - if let Err(err) = self.precheck_pvf(pvf_with_params, tx).await { + if let Err(err) = self.precheck_pvf(pvf, tx).await { // Return an IO error if there was an error communicating with the host. return Err(PrepareError::IoErr(err)) } @@ -788,3 +789,23 @@ fn perform_basic_checks( Ok(()) } + +fn pvf_prep_timeout(executor_params: &ExecutorParams, kind: PvfPrepTimeoutKind) -> Duration { + if let Some(timeout) = executor_params.pvf_prep_timeout(kind) { + return timeout + } + match kind { + PvfPrepTimeoutKind::Precheck => DEFAULT_PRECHECK_PREPARATION_TIMEOUT, + PvfPrepTimeoutKind::Lenient => DEFAULT_LENIENT_PREPARATION_TIMEOUT, + } +} + +fn pvf_exec_timeout(executor_params: &ExecutorParams, kind: PvfExecTimeoutKind) -> Duration { + if let Some(timeout) = executor_params.pvf_exec_timeout(kind) { + return timeout + } + match kind { + PvfExecTimeoutKind::Backing => DEFAULT_BACKING_EXECUTION_TIMEOUT, + PvfExecTimeoutKind::Approval => DEFAULT_APPROVAL_EXECUTION_TIMEOUT, + } +} diff --git a/node/core/candidate-validation/src/tests.rs b/node/core/candidate-validation/src/tests.rs index f69fc924e659..9e920f0e9746 100644 --- a/node/core/candidate-validation/src/tests.rs +++ b/node/core/candidate-validation/src/tests.rs @@ -396,7 +396,7 @@ impl MockValidateCandidateBackend { impl ValidationBackend for MockValidateCandidateBackend { async fn validate_candidate( &mut self, - _pvf_with_params: PvfWithExecutorParams, + _pvf: PvfPrepData, _timeout: Duration, _encoded_params: Vec, ) -> Result { @@ -408,10 +408,7 @@ impl ValidationBackend for MockValidateCandidateBackend { result } - async fn precheck_pvf( - &mut self, - _pvf_with_params: PvfWithExecutorParams, - ) -> Result { + async fn precheck_pvf(&mut self, _pvf: PvfPrepData) -> Result { unreachable!() } } @@ -476,7 +473,7 @@ fn candidate_validation_ok_is_ok() { validation_code, candidate_receipt, Arc::new(pov), - Duration::from_secs(0), + PvfExecTimeoutKind::Backing, &metrics, ) }) @@ -535,7 +532,7 @@ fn candidate_validation_bad_return_is_invalid() { validation_code, candidate_receipt, Arc::new(pov), - Duration::from_secs(0), + PvfExecTimeoutKind::Backing, &metrics, ) }); @@ -606,7 +603,7 @@ fn candidate_validation_one_ambiguous_error_is_valid() { validation_code, candidate_receipt, Arc::new(pov), - Duration::from_secs(0), + PvfExecTimeoutKind::Backing, &metrics, ) }) @@ -666,7 +663,7 @@ fn candidate_validation_multiple_ambiguous_errors_is_invalid() { validation_code, candidate_receipt, Arc::new(pov), - Duration::from_secs(0), + PvfExecTimeoutKind::Backing, &metrics, ) }) @@ -718,7 +715,7 @@ fn candidate_validation_timeout_is_internal_error() { validation_code, candidate_receipt, Arc::new(pov), - Duration::from_secs(0), + PvfExecTimeoutKind::Backing, &metrics, ) }); @@ -770,7 +767,7 @@ fn candidate_validation_commitment_hash_mismatch_is_invalid() { validation_code, candidate_receipt, Arc::new(pov), - Duration::from_secs(0), + PvfExecTimeoutKind::Backing, &metrics, ) }) @@ -821,7 +818,7 @@ fn candidate_validation_code_mismatch_is_invalid() { validation_code, candidate_receipt, Arc::new(pov), - Duration::from_secs(0), + PvfExecTimeoutKind::Backing, &Default::default(), )) .unwrap(); @@ -884,7 +881,7 @@ fn compressed_code_works() { validation_code, candidate_receipt, Arc::new(pov), - Duration::from_secs(0), + PvfExecTimeoutKind::Backing, &metrics, ) }); @@ -937,7 +934,7 @@ fn code_decompression_failure_is_error() { validation_code, candidate_receipt, Arc::new(pov), - Duration::from_secs(0), + PvfExecTimeoutKind::Backing, &Default::default(), )); @@ -990,7 +987,7 @@ fn pov_decompression_failure_is_invalid() { validation_code, candidate_receipt, Arc::new(pov), - Duration::from_secs(0), + PvfExecTimeoutKind::Backing, &Default::default(), )); @@ -1011,17 +1008,14 @@ impl MockPreCheckBackend { impl ValidationBackend for MockPreCheckBackend { async fn validate_candidate( &mut self, - _pvf_with_params: PvfWithExecutorParams, + _pvf: PvfPrepData, _timeout: Duration, _encoded_params: Vec, ) -> Result { unreachable!() } - async fn precheck_pvf( - &mut self, - _pvf_with_params: PvfWithExecutorParams, - ) -> Result { + async fn precheck_pvf(&mut self, _pvf: PvfPrepData) -> Result { self.result.clone() } } diff --git a/node/core/dispute-coordinator/src/participation/mod.rs b/node/core/dispute-coordinator/src/participation/mod.rs index f813b216b6ad..51ad52f1bace 100644 --- a/node/core/dispute-coordinator/src/participation/mod.rs +++ b/node/core/dispute-coordinator/src/participation/mod.rs @@ -25,13 +25,15 @@ use futures::{ #[cfg(test)] use futures_timer::Delay; -use polkadot_node_primitives::{ValidationResult, APPROVAL_EXECUTION_TIMEOUT}; +use polkadot_node_primitives::ValidationResult; use polkadot_node_subsystem::{ messages::{AvailabilityRecoveryMessage, CandidateValidationMessage}, overseer, ActiveLeavesUpdate, RecoveryError, }; use polkadot_node_subsystem_util::runtime::get_validation_code_by_hash; -use polkadot_primitives::{BlockNumber, CandidateHash, CandidateReceipt, Hash, SessionIndex}; +use polkadot_primitives::{ + BlockNumber, CandidateHash, CandidateReceipt, Hash, PvfExecTimeoutKind, SessionIndex, +}; use crate::LOG_TARGET; @@ -348,7 +350,7 @@ async fn participate( validation_code, req.candidate_receipt().clone(), available_data.pov, - APPROVAL_EXECUTION_TIMEOUT, + PvfExecTimeoutKind::Approval, validation_tx, )) .await; diff --git a/node/core/dispute-coordinator/src/participation/tests.rs b/node/core/dispute-coordinator/src/participation/tests.rs index 273c27261081..134324f69e26 100644 --- a/node/core/dispute-coordinator/src/participation/tests.rs +++ b/node/core/dispute-coordinator/src/participation/tests.rs @@ -120,7 +120,7 @@ pub async fn participation_full_happy_path( ctx_handle.recv().await, AllMessages::CandidateValidation( CandidateValidationMessage::ValidateFromExhaustive(_, _, candidate_receipt, _, timeout, tx) - ) if timeout == APPROVAL_EXECUTION_TIMEOUT => { + ) if timeout == PvfExecTimeoutKind::Approval => { if expected_commitments_hash != candidate_receipt.commitments_hash { tx.send(Ok(ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch))).unwrap(); } else { @@ -454,7 +454,7 @@ fn cast_invalid_vote_if_validation_fails_or_is_invalid() { ctx_handle.recv().await, AllMessages::CandidateValidation( CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx) - ) if timeout == APPROVAL_EXECUTION_TIMEOUT => { + ) if timeout == PvfExecTimeoutKind::Approval => { tx.send(Ok(ValidationResult::Invalid(InvalidCandidate::Timeout))).unwrap(); }, "overseer did not receive candidate validation message", @@ -491,7 +491,7 @@ fn cast_invalid_vote_if_commitments_dont_match() { ctx_handle.recv().await, AllMessages::CandidateValidation( CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx) - ) if timeout == APPROVAL_EXECUTION_TIMEOUT => { + ) if timeout == PvfExecTimeoutKind::Approval => { tx.send(Ok(ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch))).unwrap(); }, "overseer did not receive candidate validation message", @@ -528,7 +528,7 @@ fn cast_valid_vote_if_validation_passes() { ctx_handle.recv().await, AllMessages::CandidateValidation( CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx) - ) if timeout == APPROVAL_EXECUTION_TIMEOUT => { + ) if timeout == PvfExecTimeoutKind::Approval => { tx.send(Ok(ValidationResult::Valid(dummy_candidate_commitments(None), PersistedValidationData::default()))).unwrap(); }, "overseer did not receive candidate validation message", diff --git a/node/core/pvf/src/execute/queue.rs b/node/core/pvf/src/execute/queue.rs index 9f5b0451dc6c..9a89c46e7a14 100644 --- a/node/core/pvf/src/execute/queue.rs +++ b/node/core/pvf/src/execute/queue.rs @@ -30,7 +30,6 @@ use futures::{ stream::{FuturesUnordered, StreamExt as _}, Future, FutureExt, }; -use polkadot_node_primitives::BACKING_EXECUTION_TIMEOUT; use polkadot_primitives::vstaging::{ExecutorParams, ExecutorParamsHash}; use slotmap::HopSlotMap; use std::{ @@ -45,8 +44,7 @@ use std::{ /// re-spawn a new worker to execute the job immediately. /// To make any sense and not to break things, the value should be greater than minimal execution /// timeout in use, and less than the block time. -const MAX_KEEP_WAITING: Duration = - Duration::from_millis(BACKING_EXECUTION_TIMEOUT.as_millis() as u64 * 2); +const MAX_KEEP_WAITING: Duration = Duration::from_secs(4); slotmap::new_key_type! { struct Worker; } @@ -54,7 +52,7 @@ slotmap::new_key_type! { struct Worker; } pub enum ToQueue { Enqueue { artifact: ArtifactPathId, - execution_timeout: Duration, + exec_timeout: Duration, params: Vec, executor_params: ExecutorParams, result_tx: ResultSender, @@ -63,7 +61,7 @@ pub enum ToQueue { struct ExecuteJob { artifact: ArtifactPathId, - execution_timeout: Duration, + exec_timeout: Duration, params: Vec, executor_params: ExecutorParams, result_tx: ResultSender, @@ -261,8 +259,7 @@ async fn purge_dead(metrics: &Metrics, workers: &mut Workers) { } fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) { - let ToQueue::Enqueue { artifact, execution_timeout, params, executor_params, result_tx } = - to_queue; + let ToQueue::Enqueue { artifact, exec_timeout, params, executor_params, result_tx } = to_queue; gum::debug!( target: LOG_TARGET, validation_code_hash = ?artifact.id.code_hash, @@ -271,7 +268,7 @@ fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) { queue.metrics.execute_enqueued(); let job = ExecuteJob { artifact, - execution_timeout, + exec_timeout, params, executor_params, result_tx, @@ -457,13 +454,9 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) { queue.mux.push( async move { let _timer = execution_timer; - let outcome = super::worker::start_work( - idle, - job.artifact.clone(), - job.execution_timeout, - job.params, - ) - .await; + let outcome = + super::worker::start_work(idle, job.artifact.clone(), job.exec_timeout, job.params) + .await; QueueEvent::StartWork(worker, outcome, job.artifact.id, job.result_tx) } .boxed(), diff --git a/node/core/pvf/src/executor_intf.rs b/node/core/pvf/src/executor_intf.rs index 95f31f89a333..a1b5f9a4886c 100644 --- a/node/core/pvf/src/executor_intf.rs +++ b/node/core/pvf/src/executor_intf.rs @@ -126,6 +126,7 @@ fn params_to_wasmtime_semantics(par: &ExecutorParams) -> Result stack_limit.logical_max = *slm, ExecutorParam::StackNativeMax(snm) => stack_limit.native_stack_max = *snm, ExecutorParam::PrecheckingMaxMemory(_) => (), // TODO: Not implemented yet + ExecutorParam::PvfPrepTimeout(_, _) | ExecutorParam::PvfExecTimeout(_, _) => (), // Not used here } } sem.deterministic_stack_limit = Some(stack_limit); diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index 93a4a59c832d..d4071064d41a 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -25,7 +25,7 @@ use crate::{ error::PrepareError, execute, metrics::Metrics, - prepare, PrepareResult, Priority, PvfWithExecutorParams, ValidationError, LOG_TARGET, + prepare, PrepareResult, Priority, PvfPrepData, ValidationError, LOG_TARGET, }; use always_assert::never; use futures::{ @@ -40,17 +40,6 @@ use std::{ time::{Duration, SystemTime}, }; -/// For prechecking requests, the time period after which the preparation worker is considered -/// unresponsive and will be killed. -// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric. -pub const PRECHECK_PREPARATION_TIMEOUT: Duration = Duration::from_secs(60); - -/// For execution and heads-up requests, the time period after which the preparation worker is -/// considered unresponsive and will be killed. More lenient than the timeout for prechecking to -/// prevent honest validators from timing out on valid PVFs. -// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric. -pub const LENIENT_PREPARATION_TIMEOUT: Duration = Duration::from_secs(360); - /// The time period after which a failed preparation artifact is considered ready to be retried. /// Note that we will only retry if another request comes in after this cooldown has passed. #[cfg(not(test))] @@ -84,11 +73,11 @@ impl ValidationHost { /// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down. pub async fn precheck_pvf( &mut self, - pvf_with_params: PvfWithExecutorParams, + pvf: PvfPrepData, result_tx: PrepareResultSender, ) -> Result<(), String> { self.to_host_tx - .send(ToHost::PrecheckPvf { pvf_with_params, result_tx }) + .send(ToHost::PrecheckPvf { pvf, result_tx }) .await .map_err(|_| "the inner loop hung up".to_string()) } @@ -102,16 +91,16 @@ impl ValidationHost { /// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down. pub async fn execute_pvf( &mut self, - pvf_with_params: PvfWithExecutorParams, - execution_timeout: Duration, + pvf: PvfPrepData, + exec_timeout: Duration, params: Vec, priority: Priority, result_tx: ResultSender, ) -> Result<(), String> { self.to_host_tx .send(ToHost::ExecutePvf(ExecutePvfInputs { - pvf_with_params, - execution_timeout, + pvf, + exec_timeout, params, priority, result_tx, @@ -126,10 +115,7 @@ impl ValidationHost { /// situations this function should return immediately. /// /// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down. - pub async fn heads_up( - &mut self, - active_pvfs: Vec, - ) -> Result<(), String> { + pub async fn heads_up(&mut self, active_pvfs: Vec) -> Result<(), String> { self.to_host_tx .send(ToHost::HeadsUp { active_pvfs }) .await @@ -138,14 +124,14 @@ impl ValidationHost { } enum ToHost { - PrecheckPvf { pvf_with_params: PvfWithExecutorParams, result_tx: PrepareResultSender }, + PrecheckPvf { pvf: PvfPrepData, result_tx: PrepareResultSender }, ExecutePvf(ExecutePvfInputs), - HeadsUp { active_pvfs: Vec }, + HeadsUp { active_pvfs: Vec }, } struct ExecutePvfInputs { - pvf_with_params: PvfWithExecutorParams, - execution_timeout: Duration, + pvf: PvfPrepData, + exec_timeout: Duration, params: Vec, priority: Priority, result_tx: ResultSender, @@ -267,7 +253,7 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future, executor_params: ExecutorParams, result_tx: ResultSender, @@ -282,13 +268,13 @@ impl AwaitingPrepare { fn add( &mut self, artifact_id: ArtifactId, - execution_timeout: Duration, + exec_timeout: Duration, params: Vec, executor_params: ExecutorParams, result_tx: ResultSender, ) { self.0.entry(artifact_id).or_default().push(PendingExecutionRequest { - execution_timeout, + exec_timeout, params, executor_params, result_tx, @@ -427,8 +413,8 @@ async fn handle_to_host( to_host: ToHost, ) -> Result<(), Fatal> { match to_host { - ToHost::PrecheckPvf { pvf_with_params, result_tx } => { - handle_precheck_pvf(artifacts, prepare_queue, pvf_with_params, result_tx).await?; + ToHost::PrecheckPvf { pvf, result_tx } => { + handle_precheck_pvf(artifacts, prepare_queue, pvf, result_tx).await?; }, ToHost::ExecutePvf(inputs) => { handle_execute_pvf( @@ -450,16 +436,17 @@ async fn handle_to_host( /// Handles PVF prechecking requests. /// -/// This tries to prepare the PVF by compiling the WASM blob within a given timeout ([`PRECHECK_PREPARATION_TIMEOUT`]). +/// This tries to prepare the PVF by compiling the WASM blob within a timeout set in +/// `PvfPrepData`. /// /// If the prepare job failed previously, we may retry it under certain conditions. async fn handle_precheck_pvf( artifacts: &mut Artifacts, prepare_queue: &mut mpsc::Sender, - pvf_with_params: PvfWithExecutorParams, + pvf: PvfPrepData, result_sender: PrepareResultSender, ) -> Result<(), Fatal> { - let artifact_id = pvf_with_params.as_artifact_id(); + let artifact_id = pvf.as_artifact_id(); if let Some(state) = artifacts.artifact_state_mut(&artifact_id) { match state { @@ -477,15 +464,8 @@ async fn handle_precheck_pvf( } } else { artifacts.insert_preparing(artifact_id, vec![result_sender]); - send_prepare( - prepare_queue, - prepare::ToQueue::Enqueue { - priority: Priority::Normal, - pvf_with_params, - preparation_timeout: PRECHECK_PREPARATION_TIMEOUT, - }, - ) - .await?; + send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf }) + .await?; } Ok(()) } @@ -507,9 +487,8 @@ async fn handle_execute_pvf( awaiting_prepare: &mut AwaitingPrepare, inputs: ExecutePvfInputs, ) -> Result<(), Fatal> { - let ExecutePvfInputs { pvf_with_params, execution_timeout, params, priority, result_tx } = - inputs; - let artifact_id = pvf_with_params.as_artifact_id(); + let ExecutePvfInputs { pvf, exec_timeout, params, priority, result_tx } = inputs; + let artifact_id = pvf.as_artifact_id(); if let Some(state) = artifacts.artifact_state_mut(&artifact_id) { match state { @@ -521,9 +500,9 @@ async fn handle_execute_pvf( execute_queue, execute::ToQueue::Enqueue { artifact: ArtifactPathId::new(artifact_id, cache_path), - execution_timeout, + exec_timeout, params, - executor_params: (*pvf_with_params.executor_params()).clone(), + executor_params: (*pvf.executor_params()).clone(), result_tx, }, ) @@ -532,9 +511,9 @@ async fn handle_execute_pvf( ArtifactState::Preparing { .. } => { awaiting_prepare.add( artifact_id, - execution_timeout, + exec_timeout, params, - (*pvf_with_params.executor_params()).clone(), + (*pvf.executor_params()).clone(), result_tx, ); }, @@ -542,7 +521,7 @@ async fn handle_execute_pvf( if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) { gum::warn!( target: LOG_TARGET, - ?pvf_with_params, + ?pvf, ?artifact_id, ?last_time_failed, %num_failures, @@ -556,22 +535,15 @@ async fn handle_execute_pvf( waiting_for_response: Vec::new(), num_failures: *num_failures, }; - let executor_params = (*pvf_with_params.executor_params()).clone(); - send_prepare( - prepare_queue, - prepare::ToQueue::Enqueue { - priority, - pvf_with_params, - preparation_timeout: LENIENT_PREPARATION_TIMEOUT, - }, - ) - .await?; + let executor_params = (*pvf.executor_params()).clone(); + send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }) + .await?; // Add an execution request that will wait to run after this prepare job has // finished. awaiting_prepare.add( artifact_id, - execution_timeout, + exec_timeout, params, executor_params, result_tx, @@ -584,20 +556,12 @@ async fn handle_execute_pvf( } else { // Artifact is unknown: register it and enqueue a job with the corresponding priority and // PVF. - let executor_params = (*pvf_with_params.executor_params()).clone(); + let executor_params = (*pvf.executor_params()).clone(); artifacts.insert_preparing(artifact_id.clone(), Vec::new()); - send_prepare( - prepare_queue, - prepare::ToQueue::Enqueue { - priority, - pvf_with_params, - preparation_timeout: LENIENT_PREPARATION_TIMEOUT, - }, - ) - .await?; + send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?; // Add an execution request that will wait to run after this prepare job has finished. - awaiting_prepare.add(artifact_id, execution_timeout, params, executor_params, result_tx); + awaiting_prepare.add(artifact_id, exec_timeout, params, executor_params, result_tx); } Ok(()) @@ -606,7 +570,7 @@ async fn handle_execute_pvf( async fn handle_heads_up( artifacts: &mut Artifacts, prepare_queue: &mut mpsc::Sender, - active_pvfs: Vec, + active_pvfs: Vec, ) -> Result<(), Fatal> { let now = SystemTime::now(); @@ -642,8 +606,7 @@ async fn handle_heads_up( prepare_queue, prepare::ToQueue::Enqueue { priority: Priority::Normal, - pvf_with_params: active_pvf, - preparation_timeout: LENIENT_PREPARATION_TIMEOUT, + pvf: active_pvf, }, ) .await?; @@ -656,11 +619,7 @@ async fn handle_heads_up( send_prepare( prepare_queue, - prepare::ToQueue::Enqueue { - priority: Priority::Normal, - pvf_with_params: active_pvf, - preparation_timeout: LENIENT_PREPARATION_TIMEOUT, - }, + prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf: active_pvf }, ) .await?; } @@ -722,7 +681,7 @@ async fn handle_prepare_done( // It's finally time to dispatch all the execution requests that were waiting for this artifact // to be prepared. let pending_requests = awaiting_prepare.take(&artifact_id); - for PendingExecutionRequest { execution_timeout, params, executor_params, result_tx } in + for PendingExecutionRequest { exec_timeout, params, executor_params, result_tx } in pending_requests { if result_tx.is_canceled() { @@ -741,7 +700,7 @@ async fn handle_prepare_done( execute_queue, execute::ToQueue::Enqueue { artifact: ArtifactPathId::new(artifact_id.clone(), cache_path), - execution_timeout, + exec_timeout, params, executor_params, result_tx, @@ -858,13 +817,14 @@ fn pulse_every(interval: std::time::Duration) -> impl futures::Stream } #[cfg(test)] -mod tests { +pub(crate) mod tests { use super::*; use crate::{prepare::PrepareStats, InvalidCandidate, PrepareError}; use assert_matches::assert_matches; use futures::future::BoxFuture; const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3); + pub(crate) const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(30); #[tokio::test] async fn pulse_test() { @@ -882,7 +842,7 @@ mod tests { /// Creates a new PVF which artifact id can be uniquely identified by the given number. fn artifact_id(descriminator: u32) -> ArtifactId { - PvfWithExecutorParams::from_discriminator(descriminator).as_artifact_id() + PvfPrepData::from_discriminator(descriminator).as_artifact_id() } fn artifact_path(descriminator: u32) -> PathBuf { @@ -1091,7 +1051,7 @@ mod tests { let mut test = builder.build(); let mut host = test.host_handle(); - host.heads_up(vec![PvfWithExecutorParams::from_discriminator(1)]).await.unwrap(); + host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap(); let to_sweeper_rx = &mut test.to_sweeper_rx; run_until( @@ -1105,7 +1065,7 @@ mod tests { // Extend TTL for the first artifact and make sure we don't receive another file removal // request. - host.heads_up(vec![PvfWithExecutorParams::from_discriminator(1)]).await.unwrap(); + host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap(); test.poll_ensure_to_sweeper_is_empty().await; } @@ -1116,7 +1076,7 @@ mod tests { let (result_tx, result_rx_pvf_1_1) = oneshot::channel(); host.execute_pvf( - PvfWithExecutorParams::from_discriminator(1), + PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, b"pvf1".to_vec(), Priority::Normal, @@ -1127,7 +1087,7 @@ mod tests { let (result_tx, result_rx_pvf_1_2) = oneshot::channel(); host.execute_pvf( - PvfWithExecutorParams::from_discriminator(1), + PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, b"pvf1".to_vec(), Priority::Critical, @@ -1138,7 +1098,7 @@ mod tests { let (result_tx, result_rx_pvf_2) = oneshot::channel(); host.execute_pvf( - PvfWithExecutorParams::from_discriminator(2), + PvfPrepData::from_discriminator(2), TEST_EXECUTION_TIMEOUT, b"pvf2".to_vec(), Priority::Normal, @@ -1216,9 +1176,7 @@ mod tests { // First, test a simple precheck request. let (result_tx, result_rx) = oneshot::channel(); - host.precheck_pvf(PvfWithExecutorParams::from_discriminator(1), result_tx) - .await - .unwrap(); + host.precheck_pvf(PvfPrepData::from_discriminator(1), result_tx).await.unwrap(); // The queue received the prepare request. assert_matches!( @@ -1242,9 +1200,7 @@ mod tests { let mut precheck_receivers = Vec::new(); for _ in 0..3 { let (result_tx, result_rx) = oneshot::channel(); - host.precheck_pvf(PvfWithExecutorParams::from_discriminator(2), result_tx) - .await - .unwrap(); + host.precheck_pvf(PvfPrepData::from_discriminator(2), result_tx).await.unwrap(); precheck_receivers.push(result_rx); } // Received prepare request. @@ -1279,7 +1235,7 @@ mod tests { // Send PVF for the execution and request the prechecking for it. let (result_tx, result_rx_execute) = oneshot::channel(); host.execute_pvf( - PvfWithExecutorParams::from_discriminator(1), + PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, b"pvf2".to_vec(), Priority::Critical, @@ -1294,9 +1250,7 @@ mod tests { ); let (result_tx, result_rx) = oneshot::channel(); - host.precheck_pvf(PvfWithExecutorParams::from_discriminator(1), result_tx) - .await - .unwrap(); + host.precheck_pvf(PvfPrepData::from_discriminator(1), result_tx).await.unwrap(); // Suppose the preparation failed, the execution queue is empty and both // "clients" receive their results. @@ -1318,15 +1272,13 @@ mod tests { let mut precheck_receivers = Vec::new(); for _ in 0..3 { let (result_tx, result_rx) = oneshot::channel(); - host.precheck_pvf(PvfWithExecutorParams::from_discriminator(2), result_tx) - .await - .unwrap(); + host.precheck_pvf(PvfPrepData::from_discriminator(2), result_tx).await.unwrap(); precheck_receivers.push(result_rx); } let (result_tx, _result_rx_execute) = oneshot::channel(); host.execute_pvf( - PvfWithExecutorParams::from_discriminator(2), + PvfPrepData::from_discriminator(2), TEST_EXECUTION_TIMEOUT, b"pvf2".to_vec(), Priority::Critical, @@ -1366,9 +1318,7 @@ mod tests { // Submit a precheck request that fails. let (result_tx, result_rx) = oneshot::channel(); - host.precheck_pvf(PvfWithExecutorParams::from_discriminator(1), result_tx) - .await - .unwrap(); + host.precheck_pvf(PvfPrepData::from_discriminator(1), result_tx).await.unwrap(); // The queue received the prepare request. assert_matches!( @@ -1390,7 +1340,7 @@ mod tests { // Submit another precheck request. let (result_tx_2, result_rx_2) = oneshot::channel(); - host.precheck_pvf(PvfWithExecutorParams::from_discriminator(1), result_tx_2) + host.precheck_pvf(PvfPrepData::from_discriminator(1), result_tx_2) .await .unwrap(); @@ -1406,7 +1356,7 @@ mod tests { // Submit another precheck request. let (result_tx_3, result_rx_3) = oneshot::channel(); - host.precheck_pvf(PvfWithExecutorParams::from_discriminator(1), result_tx_3) + host.precheck_pvf(PvfPrepData::from_discriminator(1), result_tx_3) .await .unwrap(); @@ -1428,7 +1378,7 @@ mod tests { // Submit a execute request that fails. let (result_tx, result_rx) = oneshot::channel(); host.execute_pvf( - PvfWithExecutorParams::from_discriminator(1), + PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, b"pvf".to_vec(), Priority::Critical, @@ -1458,7 +1408,7 @@ mod tests { // Submit another execute request. We shouldn't try to prepare again, yet. let (result_tx_2, result_rx_2) = oneshot::channel(); host.execute_pvf( - PvfWithExecutorParams::from_discriminator(1), + PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, b"pvf".to_vec(), Priority::Critical, @@ -1480,7 +1430,7 @@ mod tests { // Submit another execute request. let (result_tx_3, result_rx_3) = oneshot::channel(); host.execute_pvf( - PvfWithExecutorParams::from_discriminator(1), + PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, b"pvf".to_vec(), Priority::Critical, @@ -1530,7 +1480,7 @@ mod tests { // Submit an execute request that fails. let (result_tx, result_rx) = oneshot::channel(); host.execute_pvf( - PvfWithExecutorParams::from_discriminator(1), + PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, b"pvf".to_vec(), Priority::Critical, @@ -1563,7 +1513,7 @@ mod tests { // Submit another execute request. let (result_tx_2, result_rx_2) = oneshot::channel(); host.execute_pvf( - PvfWithExecutorParams::from_discriminator(1), + PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, b"pvf".to_vec(), Priority::Critical, @@ -1588,7 +1538,7 @@ mod tests { // Submit another execute request. let (result_tx_3, result_rx_3) = oneshot::channel(); host.execute_pvf( - PvfWithExecutorParams::from_discriminator(1), + PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, b"pvf".to_vec(), Priority::Critical, @@ -1615,7 +1565,7 @@ mod tests { let mut host = test.host_handle(); // Submit a heads-up request that fails. - host.heads_up(vec![PvfWithExecutorParams::from_discriminator(1)]).await.unwrap(); + host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap(); // The queue received the prepare request. assert_matches!( @@ -1632,7 +1582,7 @@ mod tests { .unwrap(); // Submit another heads-up request. - host.heads_up(vec![PvfWithExecutorParams::from_discriminator(1)]).await.unwrap(); + host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap(); // Assert the prepare queue is empty. test.poll_ensure_to_prepare_queue_is_empty().await; @@ -1641,7 +1591,7 @@ mod tests { futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await; // Submit another heads-up request. - host.heads_up(vec![PvfWithExecutorParams::from_discriminator(1)]).await.unwrap(); + host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap(); // Assert the prepare queue contains the request. assert_matches!( @@ -1657,7 +1607,7 @@ mod tests { let (result_tx, result_rx) = oneshot::channel(); host.execute_pvf( - PvfWithExecutorParams::from_discriminator(1), + PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, b"pvf1".to_vec(), Priority::Normal, diff --git a/node/core/pvf/src/lib.rs b/node/core/pvf/src/lib.rs index 9462b4cab19d..8c40bbb8b939 100644 --- a/node/core/pvf/src/lib.rs +++ b/node/core/pvf/src/lib.rs @@ -110,7 +110,7 @@ pub use sp_tracing; pub use error::{InvalidCandidate, PrepareError, PrepareResult, ValidationError}; pub use prepare::PrepareStats; pub use priority::Priority; -pub use pvf::PvfWithExecutorParams; +pub use pvf::PvfPrepData; pub use host::{start, Config, ValidationHost}; pub use metrics::Metrics; diff --git a/node/core/pvf/src/metrics.rs b/node/core/pvf/src/metrics.rs index 5995261d5c22..72e6d01e7a2f 100644 --- a/node/core/pvf/src/metrics.rs +++ b/node/core/pvf/src/metrics.rs @@ -183,9 +183,9 @@ impl metrics::Metrics for Metrics { "Time spent in preparing PVF artifacts in seconds", ) .buckets(vec![ - // This is synchronized with the PRECHECK_PREPARATION_TIMEOUT=60s - // and LENIENT_PREPARATION_TIMEOUT=360s constants found in - // src/prepare/worker.rs + // This is synchronized with the `DEFAULT_PRECHECK_PREPARATION_TIMEOUT=60s` + // and `DEFAULT_LENIENT_PREPARATION_TIMEOUT=360s` constants found in + // node/core/candidate-validation/src/lib.rs 0.1, 0.5, 1.0, @@ -209,8 +209,9 @@ impl metrics::Metrics for Metrics { "polkadot_pvf_execution_time", "Time spent in executing PVFs", ).buckets(vec![ - // This is synchronized with `APPROVAL_EXECUTION_TIMEOUT` and - // `BACKING_EXECUTION_TIMEOUT` constants in `node/primitives/src/lib.rs` + // This is synchronized with `DEFAULT_APPROVAL_EXECUTION_TIMEOUT` and + // `DEFAULT_BACKING_EXECUTION_TIMEOUT` constants in + // node/core/candidate-validation/src/lib.rs 0.01, 0.025, 0.05, diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index 5d1bb248182e..8587fcb1a6b6 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -18,7 +18,7 @@ use super::worker::{self, Outcome}; use crate::{ error::{PrepareError, PrepareResult}, metrics::Metrics, - pvf::PvfWithExecutorParams, + pvf::PvfPrepData, worker_common::{IdleWorker, WorkerHandle}, LOG_TARGET, }; @@ -65,12 +65,7 @@ pub enum ToPool { /// /// In either case, the worker is considered busy and no further `StartWork` messages should be /// sent until either `Concluded` or `Rip` message is received. - StartWork { - worker: Worker, - pvf_with_params: PvfWithExecutorParams, - artifact_path: PathBuf, - preparation_timeout: Duration, - }, + StartWork { worker: Worker, pvf: PvfPrepData, artifact_path: PathBuf }, } /// A message sent from pool to its client. @@ -214,7 +209,7 @@ fn handle_to_pool( metrics.prepare_worker().on_begin_spawn(); mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed()); }, - ToPool::StartWork { worker, pvf_with_params, artifact_path, preparation_timeout } => { + ToPool::StartWork { worker, pvf, artifact_path } => { if let Some(data) = spawned.get_mut(worker) { if let Some(idle) = data.idle.take() { let preparation_timer = metrics.time_preparation(); @@ -223,10 +218,9 @@ fn handle_to_pool( metrics.clone(), worker, idle, - pvf_with_params, + pvf, cache_path.to_owned(), artifact_path, - preparation_timeout, preparation_timer, ) .boxed(), @@ -272,21 +266,12 @@ async fn start_work_task( metrics: Metrics, worker: Worker, idle: IdleWorker, - pvf_with_params: PvfWithExecutorParams, + pvf: PvfPrepData, cache_path: PathBuf, artifact_path: PathBuf, - preparation_timeout: Duration, _preparation_timer: Option, ) -> PoolEvent { - let outcome = worker::start_work( - &metrics, - idle, - pvf_with_params, - &cache_path, - artifact_path, - preparation_timeout, - ) - .await; + let outcome = worker::start_work(&metrics, idle, pvf, &cache_path, artifact_path).await; PoolEvent::StartWork(worker, outcome) } diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs index 8eee1289ab18..38db463a33b7 100644 --- a/node/core/pvf/src/prepare/queue.rs +++ b/node/core/pvf/src/prepare/queue.rs @@ -18,17 +18,18 @@ use super::pool::{self, Worker}; use crate::{ - artifacts::ArtifactId, metrics::Metrics, PrepareResult, Priority, PvfWithExecutorParams, - LOG_TARGET, + artifacts::ArtifactId, metrics::Metrics, PrepareResult, Priority, PvfPrepData, LOG_TARGET, }; use always_assert::{always, never}; use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt}; use std::{ collections::{HashMap, VecDeque}, path::PathBuf, - time::Duration, }; +#[cfg(test)] +use std::time::Duration; + /// A request to pool. #[derive(Debug)] pub enum ToQueue { @@ -36,11 +37,7 @@ pub enum ToQueue { /// /// Note that it is incorrect to enqueue the same PVF again without first receiving the /// [`FromQueue`] response. - Enqueue { - priority: Priority, - pvf_with_params: PvfWithExecutorParams, - preparation_timeout: Duration, - }, + Enqueue { priority: Priority, pvf: PvfPrepData }, } /// A response from queue. @@ -85,9 +82,7 @@ slotmap::new_key_type! { pub struct Job; } struct JobData { /// The priority of this job. Can be bumped. priority: Priority, - pvf_with_params: PvfWithExecutorParams, - /// The timeout for the preparation job. - preparation_timeout: Duration, + pvf: PvfPrepData, worker: Option, } @@ -215,8 +210,8 @@ impl Queue { async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fatal> { match to_queue { - ToQueue::Enqueue { priority, pvf_with_params, preparation_timeout } => { - handle_enqueue(queue, priority, pvf_with_params, preparation_timeout).await?; + ToQueue::Enqueue { priority, pvf } => { + handle_enqueue(queue, priority, pvf).await?; }, } Ok(()) @@ -225,19 +220,18 @@ async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fat async fn handle_enqueue( queue: &mut Queue, priority: Priority, - pvf_with_params: PvfWithExecutorParams, - preparation_timeout: Duration, + pvf: PvfPrepData, ) -> Result<(), Fatal> { gum::debug!( target: LOG_TARGET, - validation_code_hash = ?pvf_with_params.code_hash(), + validation_code_hash = ?pvf.code_hash(), ?priority, - ?preparation_timeout, + preparation_timeout = ?pvf.prep_timeout, "PVF is enqueued for preparation.", ); queue.metrics.prepare_enqueued(); - let artifact_id = pvf_with_params.as_artifact_id(); + let artifact_id = pvf.as_artifact_id(); if never!( queue.artifact_id_to_job.contains_key(&artifact_id), "second Enqueue sent for a known artifact" @@ -254,10 +248,7 @@ async fn handle_enqueue( return Ok(()) } - let job = - queue - .jobs - .insert(JobData { priority, pvf_with_params, preparation_timeout, worker: None }); + let job = queue.jobs.insert(JobData { priority, pvf, worker: None }); queue.artifact_id_to_job.insert(artifact_id, job); if let Some(available) = find_idle_worker(queue) { @@ -348,7 +339,7 @@ async fn handle_worker_concluded( // this can't be None; // qed. let job_data = never_none!(queue.jobs.remove(job)); - let artifact_id = job_data.pvf_with_params.as_artifact_id(); + let artifact_id = job_data.pvf.as_artifact_id(); queue.artifact_id_to_job.remove(&artifact_id); @@ -434,7 +425,7 @@ async fn spawn_extra_worker(queue: &mut Queue, critical: bool) -> Result<(), Fat async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal> { let job_data = &mut queue.jobs[job]; - let artifact_id = job_data.pvf_with_params.as_artifact_id(); + let artifact_id = job_data.pvf.as_artifact_id(); let artifact_path = artifact_id.path(&queue.cache_path); job_data.worker = Some(worker); @@ -443,12 +434,7 @@ async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal send_pool( &mut queue.to_pool_tx, - pool::ToPool::StartWork { - worker, - pvf_with_params: job_data.pvf_with_params.clone(), - artifact_path, - preparation_timeout: job_data.preparation_timeout, - }, + pool::ToPool::StartWork { worker, pvf: job_data.pvf.clone(), artifact_path }, ) .await?; @@ -503,9 +489,7 @@ pub fn start( mod tests { use super::*; use crate::{ - error::PrepareError, - host::{LENIENT_PREPARATION_TIMEOUT, PRECHECK_PREPARATION_TIMEOUT}, - prepare::PrepareStats, + error::PrepareError, host::tests::TEST_PREPARATION_TIMEOUT, prepare::PrepareStats, }; use assert_matches::assert_matches; use futures::{future::BoxFuture, FutureExt}; @@ -513,8 +497,8 @@ mod tests { use std::task::Poll; /// Creates a new PVF which artifact id can be uniquely identified by the given number. - fn pvf_with_params(descriminator: u32) -> PvfWithExecutorParams { - PvfWithExecutorParams::from_discriminator(descriminator) + fn pvf(discriminator: u32) -> PvfPrepData { + PvfPrepData::from_discriminator(discriminator) } async fn run_until( @@ -621,11 +605,7 @@ mod tests { async fn properly_concludes() { let mut test = Test::new(2, 2); - test.send_queue(ToQueue::Enqueue { - priority: Priority::Normal, - pvf_with_params: pvf_with_params(1), - preparation_timeout: PRECHECK_PREPARATION_TIMEOUT, - }); + test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(1) }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); let w = test.workers.insert(()); @@ -636,10 +616,7 @@ mod tests { result: Ok(PrepareStats::default()), }); - assert_eq!( - test.poll_and_recv_from_queue().await.artifact_id, - pvf_with_params(1).as_artifact_id() - ); + assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id()); } #[tokio::test] @@ -647,22 +624,12 @@ mod tests { let mut test = Test::new(2, 3); let priority = Priority::Normal; - let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; - test.send_queue(ToQueue::Enqueue { - priority, - pvf_with_params: PvfWithExecutorParams::from_discriminator(1), - preparation_timeout, - }); - test.send_queue(ToQueue::Enqueue { - priority, - pvf_with_params: PvfWithExecutorParams::from_discriminator(2), - preparation_timeout, - }); + test.send_queue(ToQueue::Enqueue { priority, pvf: PvfPrepData::from_discriminator(1) }); + test.send_queue(ToQueue::Enqueue { priority, pvf: PvfPrepData::from_discriminator(2) }); // Start a non-precheck preparation for this one. test.send_queue(ToQueue::Enqueue { priority, - pvf_with_params: PvfWithExecutorParams::from_discriminator(3), - preparation_timeout: LENIENT_PREPARATION_TIMEOUT, + pvf: PvfPrepData::from_discriminator_and_timeout(3, TEST_PREPARATION_TIMEOUT * 3), }); // Receive only two spawns. @@ -690,8 +657,7 @@ mod tests { // Enqueue a critical job. test.send_queue(ToQueue::Enqueue { priority: Priority::Critical, - pvf_with_params: PvfWithExecutorParams::from_discriminator(4), - preparation_timeout, + pvf: PvfPrepData::from_discriminator(4), }); // 2 out of 2 are working, but there is a critical job incoming. That means that spawning @@ -702,12 +668,10 @@ mod tests { #[tokio::test] async fn cull_unwanted() { let mut test = Test::new(1, 2); - let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, - pvf_with_params: PvfWithExecutorParams::from_discriminator(1), - preparation_timeout, + pvf: PvfPrepData::from_discriminator(1), }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); let w1 = test.workers.insert(()); @@ -717,8 +681,7 @@ mod tests { // Enqueue a critical job, which warrants spawning over the soft limit. test.send_queue(ToQueue::Enqueue { priority: Priority::Critical, - pvf_with_params: PvfWithExecutorParams::from_discriminator(2), - preparation_timeout, + pvf: PvfPrepData::from_discriminator(2), }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); @@ -742,22 +705,12 @@ mod tests { let mut test = Test::new(2, 2); let priority = Priority::Normal; - let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; - test.send_queue(ToQueue::Enqueue { - priority, - pvf_with_params: PvfWithExecutorParams::from_discriminator(1), - preparation_timeout, - }); - test.send_queue(ToQueue::Enqueue { - priority, - pvf_with_params: PvfWithExecutorParams::from_discriminator(2), - preparation_timeout, - }); + test.send_queue(ToQueue::Enqueue { priority, pvf: PvfPrepData::from_discriminator(1) }); + test.send_queue(ToQueue::Enqueue { priority, pvf: PvfPrepData::from_discriminator(2) }); // Start a non-precheck preparation for this one. test.send_queue(ToQueue::Enqueue { priority, - pvf_with_params: PvfWithExecutorParams::from_discriminator(3), - preparation_timeout: LENIENT_PREPARATION_TIMEOUT, + pvf: PvfPrepData::from_discriminator_and_timeout(3, TEST_PREPARATION_TIMEOUT * 3), }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); @@ -782,10 +735,7 @@ mod tests { // Since there is still work, the queue requested one extra worker to spawn to handle the // remaining enqueued work items. assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); - assert_eq!( - test.poll_and_recv_from_queue().await.artifact_id, - pvf_with_params(1).as_artifact_id() - ); + assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id()); } #[tokio::test] @@ -794,8 +744,7 @@ mod tests { test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, - pvf_with_params: PvfWithExecutorParams::from_discriminator(1), - preparation_timeout: PRECHECK_PREPARATION_TIMEOUT, + pvf: PvfPrepData::from_discriminator(1), }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); @@ -819,8 +768,7 @@ mod tests { test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, - pvf_with_params: PvfWithExecutorParams::from_discriminator(1), - preparation_timeout: PRECHECK_PREPARATION_TIMEOUT, + pvf: PvfPrepData::from_discriminator(1), }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index 015cd2e836ce..962ad2742bf8 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -24,7 +24,7 @@ use crate::{ error::{PrepareError, PrepareResult}, metrics::Metrics, prepare::PrepareStats, - pvf::PvfWithExecutorParams, + pvf::PvfPrepData, worker_common::{ bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes, spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle, @@ -84,10 +84,9 @@ pub enum Outcome { pub async fn start_work( metrics: &Metrics, worker: IdleWorker, - pvf_with_params: PvfWithExecutorParams, + pvf: PvfPrepData, cache_path: &Path, artifact_path: PathBuf, - preparation_timeout: Duration, ) -> Outcome { let IdleWorker { stream, pid } = worker; @@ -99,9 +98,8 @@ pub async fn start_work( ); with_tmp_file(stream, pid, cache_path, |tmp_file, mut stream| async move { - if let Err(err) = - send_request(&mut stream, pvf_with_params, &tmp_file, preparation_timeout).await - { + let preparation_timeout = pvf.prep_timeout; + if let Err(err) = send_request(&mut stream, pvf, &tmp_file).await { gum::warn!( target: LOG_TARGET, worker_pid = %pid, @@ -273,42 +271,30 @@ where async fn send_request( stream: &mut UnixStream, - pvf_with_params: PvfWithExecutorParams, + pvf: PvfPrepData, tmp_file: &Path, - preparation_timeout: Duration, ) -> io::Result<()> { - framed_send(stream, &pvf_with_params.encode()).await?; + framed_send(stream, &pvf.encode()).await?; framed_send(stream, path_to_bytes(tmp_file)).await?; - framed_send(stream, &preparation_timeout.encode()).await?; Ok(()) } -async fn recv_request( - stream: &mut UnixStream, -) -> io::Result<(PvfWithExecutorParams, PathBuf, Duration)> { - let pvf_with_params = framed_recv(stream).await?; - let pvf_with_params = - PvfWithExecutorParams::decode(&mut &pvf_with_params[..]).map_err(|e| { - io::Error::new( - io::ErrorKind::Other, - format!("prepare pvf recv_request: failed to decode PvfWithExecutorParams: {}", e), - ) - })?; - let tmp_file = framed_recv(stream).await?; - let tmp_file = bytes_to_path(&tmp_file).ok_or_else(|| { +async fn recv_request(stream: &mut UnixStream) -> io::Result<(PvfPrepData, PathBuf)> { + let pvf = framed_recv(stream).await?; + let pvf = PvfPrepData::decode(&mut &pvf[..]).map_err(|e| { io::Error::new( io::ErrorKind::Other, - "prepare pvf recv_request: non utf-8 artifact path".to_string(), + format!("prepare pvf recv_request: failed to decode PvfPrepData: {}", e), ) })?; - let preparation_timeout = framed_recv(stream).await?; - let preparation_timeout = Duration::decode(&mut &preparation_timeout[..]).map_err(|e| { + let tmp_file = framed_recv(stream).await?; + let tmp_file = bytes_to_path(&tmp_file).ok_or_else(|| { io::Error::new( io::ErrorKind::Other, - format!("prepare pvf recv_request: failed to decode duration: {:?}", e), + "prepare pvf recv_request: non utf-8 artifact path".to_string(), ) })?; - Ok((pvf_with_params, tmp_file, preparation_timeout)) + Ok((pvf, tmp_file)) } async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<()> { @@ -360,7 +346,7 @@ pub fn worker_entrypoint(socket_path: &str) { worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move { loop { let worker_pid = std::process::id(); - let (pvf_with_params, dest, preparation_timeout) = recv_request(&mut stream).await?; + let (pvf, dest) = recv_request(&mut stream).await?; gum::debug!( target: LOG_TARGET, %worker_pid, @@ -368,6 +354,7 @@ pub fn worker_entrypoint(socket_path: &str) { ); let cpu_time_start = ProcessTime::now(); + let preparation_timeout = pvf.prep_timeout; // Run the memory tracker. #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] @@ -385,7 +372,7 @@ pub fn worker_entrypoint(socket_path: &str) { // Spawn another thread for preparation. let prepare_fut = rt_handle .spawn_blocking(move || { - let result = prepare_artifact(pvf_with_params); + let result = prepare_artifact(pvf); // Get the `ru_maxrss` stat. If supported, call getrusage for the thread. #[cfg(target_os = "linux")] @@ -467,16 +454,14 @@ pub fn worker_entrypoint(socket_path: &str) { }); } -fn prepare_artifact( - pvf_with_params: PvfWithExecutorParams, -) -> Result { +fn prepare_artifact(pvf: PvfPrepData) -> Result { panic::catch_unwind(|| { - let blob = match crate::executor_intf::prevalidate(&pvf_with_params.code()) { + let blob = match crate::executor_intf::prevalidate(&pvf.code()) { Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))), Ok(b) => b, }; - match crate::executor_intf::prepare(blob, &pvf_with_params.executor_params()) { + match crate::executor_intf::prepare(blob, &pvf.executor_params()) { Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)), Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))), } diff --git a/node/core/pvf/src/pvf.rs b/node/core/pvf/src/pvf.rs index 439e36a373e3..3a40b972f09a 100644 --- a/node/core/pvf/src/pvf.rs +++ b/node/core/pvf/src/pvf.rs @@ -23,26 +23,39 @@ use std::{ cmp::{Eq, PartialEq}, fmt, sync::Arc, + time::Duration, }; -/// A struct that carries code of a parachain validation function, its hash, and a corresponding -/// set of executor parameters. +#[cfg(test)] +use crate::host::tests::TEST_PREPARATION_TIMEOUT; + +/// A struct that carries the exhaustive set of data to prepare an artifact out of plain +/// Wasm binary /// /// Should be cheap to clone. #[derive(Clone, Encode, Decode)] -pub struct PvfWithExecutorParams { +pub struct PvfPrepData { + /// Wasm code (uncompressed) pub(crate) code: Arc>, + /// Wasm code hash pub(crate) code_hash: ValidationCodeHash, + /// Executor environment parameters for the session for which artifact is prepared pub(crate) executor_params: Arc, + /// Preparation timeout + pub(crate) prep_timeout: Duration, } -impl PvfWithExecutorParams { +impl PvfPrepData { /// Returns an instance of the PVF out of the given PVF code and executor params. - pub fn from_code(code: Vec, executor_params: ExecutorParams) -> Self { + pub fn from_code( + code: Vec, + executor_params: ExecutorParams, + prep_timeout: Duration, + ) -> Self { let code = Arc::new(code); let code_hash = blake2_256(&code).into(); let executor_params = Arc::new(executor_params); - Self { code, code_hash, executor_params } + Self { code, code_hash, executor_params, prep_timeout } } /// Returns artifact ID that corresponds to the PVF with given executor params @@ -67,27 +80,32 @@ impl PvfWithExecutorParams { /// Creates a structure for tests #[cfg(test)] - pub(crate) fn from_discriminator(num: u32) -> Self { + pub(crate) fn from_discriminator_and_timeout(num: u32, timeout: Duration) -> Self { let descriminator_buf = num.to_le_bytes().to_vec(); - Self::from_code(descriminator_buf, ExecutorParams::default()) + Self::from_code(descriminator_buf, ExecutorParams::default(), timeout) + } + + #[cfg(test)] + pub(crate) fn from_discriminator(num: u32) -> Self { + Self::from_discriminator_and_timeout(num, TEST_PREPARATION_TIMEOUT) } } -impl fmt::Debug for PvfWithExecutorParams { +impl fmt::Debug for PvfPrepData { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, - "Pvf {{ code, code_hash: {:?}, executor_params: {:?} }}", - self.code_hash, self.executor_params + "Pvf {{ code, code_hash: {:?}, executor_params: {:?}, prep_timeout: {:?} }}", + self.code_hash, self.executor_params, self.prep_timeout ) } } -impl PartialEq for PvfWithExecutorParams { +impl PartialEq for PvfPrepData { fn eq(&self, other: &Self) -> bool { self.code_hash == other.code_hash && self.executor_params.hash() == other.executor_params.hash() } } -impl Eq for PvfWithExecutorParams {} +impl Eq for PvfPrepData {} diff --git a/node/core/pvf/tests/it/main.rs b/node/core/pvf/tests/it/main.rs index 9fc329d59132..c35a638ba43b 100644 --- a/node/core/pvf/tests/it/main.rs +++ b/node/core/pvf/tests/it/main.rs @@ -17,8 +17,8 @@ use assert_matches::assert_matches; use parity_scale_codec::Encode as _; use polkadot_node_core_pvf::{ - start, Config, InvalidCandidate, Metrics, PvfWithExecutorParams, ValidationError, - ValidationHost, JOB_TIMEOUT_WALL_CLOCK_FACTOR, + start, Config, InvalidCandidate, Metrics, PvfPrepData, ValidationError, ValidationHost, + JOB_TIMEOUT_WALL_CLOCK_FACTOR, }; use polkadot_parachain::primitives::{BlockData, ValidationParams, ValidationResult}; use polkadot_primitives::vstaging::{ExecutorParam, ExecutorParams}; @@ -30,6 +30,7 @@ mod worker_common; const PUPPET_EXE: &str = env!("CARGO_BIN_EXE_puppet_worker"); const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3); +const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(3); struct TestHost { _cache_dir: tempfile::TempDir, @@ -69,7 +70,7 @@ impl TestHost { .lock() .await .execute_pvf( - PvfWithExecutorParams::from_code(code.into(), executor_params), + PvfPrepData::from_code(code.into(), executor_params, TEST_PREPARATION_TIMEOUT), TEST_EXECUTION_TIMEOUT, params.encode(), polkadot_node_core_pvf::Priority::Normal, diff --git a/node/overseer/examples/minimal-example.rs b/node/overseer/examples/minimal-example.rs index 31c38eea07e8..ac81a231daa0 100644 --- a/node/overseer/examples/minimal-example.rs +++ b/node/overseer/examples/minimal-example.rs @@ -32,7 +32,7 @@ use polkadot_overseer::{ gen::{FromOrchestra, SpawnedSubsystem}, HeadSupportsParachains, SubsystemError, }; -use polkadot_primitives::{CandidateReceipt, Hash}; +use polkadot_primitives::{CandidateReceipt, Hash, PvfExecTimeoutKind}; struct AlwaysSupportsParachains; @@ -76,7 +76,7 @@ impl Subsystem1 { let msg = CandidateValidationMessage::ValidateFromChainState( candidate_receipt, PoV { block_data: BlockData(Vec::new()) }.into(), - Default::default(), + PvfExecTimeoutKind::Backing, tx, ); ctx.send_message(msg).await; diff --git a/node/overseer/src/tests.rs b/node/overseer/src/tests.rs index f119cf1c809d..bc26402aedea 100644 --- a/node/overseer/src/tests.rs +++ b/node/overseer/src/tests.rs @@ -30,8 +30,8 @@ use polkadot_node_subsystem_types::{ ActivatedLeaf, LeafStatus, }; use polkadot_primitives::{ - CandidateHash, CandidateReceipt, CollatorPair, InvalidDisputeStatementKind, SessionIndex, - ValidDisputeStatementKind, ValidatorIndex, + CandidateHash, CandidateReceipt, CollatorPair, InvalidDisputeStatementKind, PvfExecTimeoutKind, + SessionIndex, ValidDisputeStatementKind, ValidatorIndex, }; use crate::{ @@ -106,7 +106,7 @@ where ctx.send_message(CandidateValidationMessage::ValidateFromChainState( candidate_receipt, PoV { block_data: BlockData(Vec::new()) }.into(), - Default::default(), + PvfExecTimeoutKind::Backing, tx, )) .await; @@ -779,7 +779,7 @@ fn test_candidate_validation_msg() -> CandidateValidationMessage { CandidateValidationMessage::ValidateFromChainState( candidate_receipt, pov, - Duration::default(), + PvfExecTimeoutKind::Backing, sender, ) } diff --git a/node/primitives/src/lib.rs b/node/primitives/src/lib.rs index 46964e119b33..18b7fa18a0c8 100644 --- a/node/primitives/src/lib.rs +++ b/node/primitives/src/lib.rs @@ -22,7 +22,7 @@ #![deny(missing_docs)] -use std::{pin::Pin, time::Duration}; +use std::pin::Pin; use bounded_vec::BoundedVec; use futures::Future; @@ -64,20 +64,6 @@ pub const VALIDATION_CODE_BOMB_LIMIT: usize = (MAX_CODE_SIZE * 4u32) as usize; /// The bomb limit for decompressing PoV blobs. pub const POV_BOMB_LIMIT: usize = (MAX_POV_SIZE * 4u32) as usize; -/// The amount of time to spend on execution during backing. -pub const BACKING_EXECUTION_TIMEOUT: Duration = Duration::from_secs(2); - -/// The amount of time to spend on execution during approval or disputes. -/// -/// This is deliberately much longer than the backing execution timeout to -/// ensure that in the absence of extremely large disparities between hardware, -/// blocks that pass backing are considered executable by approval checkers or -/// dispute participants. -/// -/// NOTE: If this value is increased significantly, also check the dispute coordinator to consider -/// candidates longer into finalization: `DISPUTE_CANDIDATE_LIFETIME_AFTER_FINALIZATION`. -pub const APPROVAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(12); - /// How many blocks after finalization an information about backed/included candidate should be /// kept. /// diff --git a/node/subsystem-types/src/messages.rs b/node/subsystem-types/src/messages.rs index 1ef5cea7941a..e5b9db9e5e97 100644 --- a/node/subsystem-types/src/messages.rs +++ b/node/subsystem-types/src/messages.rs @@ -43,15 +43,14 @@ use polkadot_primitives::{ CandidateHash, CandidateIndex, CandidateReceipt, CollatorId, CommittedCandidateReceipt, CoreState, DisputeState, GroupIndex, GroupRotationInfo, Hash, Header as BlockHeader, Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, MultiDisputeStatementSet, - OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement, SessionIndex, SessionInfo, - SignedAvailabilityBitfield, SignedAvailabilityBitfields, ValidationCode, ValidationCodeHash, - ValidatorId, ValidatorIndex, ValidatorSignature, + OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement, PvfExecTimeoutKind, + SessionIndex, SessionInfo, SignedAvailabilityBitfield, SignedAvailabilityBitfields, + ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, }; use polkadot_statement_table::v2::Misbehavior; use std::{ collections::{BTreeMap, HashMap, HashSet}, sync::Arc, - time::Duration, }; /// Network events as transmitted to other subsystems, wrapped in their message types. @@ -121,7 +120,7 @@ pub enum CandidateValidationMessage { CandidateReceipt, Arc, /// Execution timeout - Duration, + PvfExecTimeoutKind, oneshot::Sender>, ), /// Validate a candidate with provided, exhaustive parameters for validation. @@ -139,7 +138,7 @@ pub enum CandidateValidationMessage { CandidateReceipt, Arc, /// Execution timeout - Duration, + PvfExecTimeoutKind, oneshot::Sender>, ), /// Try to compile the given validation code and send back diff --git a/primitives/src/lib.rs b/primitives/src/lib.rs index c166b91b8bde..e242c7c0800c 100644 --- a/primitives/src/lib.rs +++ b/primitives/src/lib.rs @@ -46,16 +46,16 @@ pub use v2::{ Header, HrmpChannelId, Id, InboundDownwardMessage, InboundHrmpMessage, IndexedVec, InherentData, InvalidDisputeStatementKind, Moment, MultiDisputeStatementSet, Nonce, OccupiedCore, OccupiedCoreAssumption, OutboundHrmpMessage, ParathreadClaim, ParathreadEntry, - PersistedValidationData, PvfCheckStatement, RuntimeMetricLabel, RuntimeMetricLabelValue, - RuntimeMetricLabelValues, RuntimeMetricLabels, RuntimeMetricOp, RuntimeMetricUpdate, - ScheduledCore, ScrapedOnChainVotes, SessionIndex, SessionInfo, Signature, Signed, - SignedAvailabilityBitfield, SignedAvailabilityBitfields, SignedStatement, SigningContext, Slot, - UncheckedSigned, UncheckedSignedAvailabilityBitfield, UncheckedSignedAvailabilityBitfields, - UncheckedSignedStatement, UpgradeGoAhead, UpgradeRestriction, UpwardMessage, - ValidDisputeStatementKind, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, - ValidatorSignature, ValidityAttestation, ValidityError, ASSIGNMENT_KEY_TYPE_ID, - LOWEST_PUBLIC_ID, MAX_CODE_SIZE, MAX_HEAD_DATA_SIZE, MAX_POV_SIZE, - PARACHAINS_INHERENT_IDENTIFIER, PARACHAIN_KEY_TYPE_ID, + PersistedValidationData, PvfCheckStatement, PvfExecTimeoutKind, PvfPrepTimeoutKind, + RuntimeMetricLabel, RuntimeMetricLabelValue, RuntimeMetricLabelValues, RuntimeMetricLabels, + RuntimeMetricOp, RuntimeMetricUpdate, ScheduledCore, ScrapedOnChainVotes, SessionIndex, + SessionInfo, Signature, Signed, SignedAvailabilityBitfield, SignedAvailabilityBitfields, + SignedStatement, SigningContext, Slot, UncheckedSigned, UncheckedSignedAvailabilityBitfield, + UncheckedSignedAvailabilityBitfields, UncheckedSignedStatement, UpgradeGoAhead, + UpgradeRestriction, UpwardMessage, ValidDisputeStatementKind, ValidationCode, + ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, ValidityAttestation, + ValidityError, ASSIGNMENT_KEY_TYPE_ID, LOWEST_PUBLIC_ID, MAX_CODE_SIZE, MAX_HEAD_DATA_SIZE, + MAX_POV_SIZE, PARACHAINS_INHERENT_IDENTIFIER, PARACHAIN_KEY_TYPE_ID, }; #[cfg(feature = "std")] diff --git a/primitives/src/v2/mod.rs b/primitives/src/v2/mod.rs index 9191ebea5b2c..27ad3fb278c5 100644 --- a/primitives/src/v2/mod.rs +++ b/primitives/src/v2/mod.rs @@ -1689,6 +1689,33 @@ impl PvfCheckStatement { } } +/// Type discriminator for PVF preparation timeouts +#[derive(Encode, Decode, TypeInfo, Clone, Copy, Debug, PartialEq, Eq)] +pub enum PvfPrepTimeoutKind { + /// For prechecking requests, the time period after which the preparation worker is considered + /// unresponsive and will be killed. + Precheck, + + /// For execution and heads-up requests, the time period after which the preparation worker is + /// considered unresponsive and will be killed. More lenient than the timeout for prechecking + /// to prevent honest validators from timing out on valid PVFs. + Lenient, +} + +/// Type discriminator for PVF execution timeouts +#[derive(Encode, Decode, TypeInfo, Clone, Copy, Debug, PartialEq, Eq)] +pub enum PvfExecTimeoutKind { + /// The amount of time to spend on execution during backing. + Backing, + + /// The amount of time to spend on execution during approval or disputes. + /// + /// This should be much longer than the backing execution timeout to ensure that in the + /// absence of extremely large disparities between hardware, blocks that pass backing are + /// considered executable by approval checkers or dispute participants. + Approval, +} + #[cfg(test)] mod tests { use super::*; diff --git a/primitives/src/vstaging/executor_params.rs b/primitives/src/vstaging/executor_params.rs index 831d95bc1f5c..56d3e446d0c5 100644 --- a/primitives/src/vstaging/executor_params.rs +++ b/primitives/src/vstaging/executor_params.rs @@ -21,24 +21,34 @@ //! by the first element of the vector). Decoding to a usable semantics structure is //! done in `polkadot-node-core-pvf`. -use crate::{BlakeTwo256, HashT as _}; +use crate::{BlakeTwo256, HashT as _, PvfExecTimeoutKind, PvfPrepTimeoutKind}; use parity_scale_codec::{Decode, Encode}; use polkadot_core_primitives::Hash; use scale_info::TypeInfo; -use sp_std::{ops::Deref, vec, vec::Vec}; +use sp_std::{ops::Deref, time::Duration, vec, vec::Vec}; /// The different executor parameters for changing the execution environment semantics. #[derive(Clone, Debug, Encode, Decode, PartialEq, Eq, TypeInfo)] pub enum ExecutorParam { /// Maximum number of memory pages (64KiB bytes per page) the executor can allocate. + #[codec(index = 1)] MaxMemoryPages(u32), /// Wasm logical stack size limit (max. number of Wasm values on stack) + #[codec(index = 2)] StackLogicalMax(u32), /// Executor machine stack size limit, in bytes + #[codec(index = 3)] StackNativeMax(u32), /// Max. amount of memory the preparation worker is allowed to use during /// pre-checking, in bytes + #[codec(index = 4)] PrecheckingMaxMemory(u64), + /// PVF preparation timeouts, millisec + #[codec(index = 5)] + PvfPrepTimeout(PvfPrepTimeoutKind, u64), + /// PVF execution timeouts, millisec + #[codec(index = 6)] + PvfExecTimeout(PvfExecTimeoutKind, u64), } /// Unit type wrapper around [`type@Hash`] that represents an execution parameter set hash. @@ -92,6 +102,30 @@ impl ExecutorParams { pub fn hash(&self) -> ExecutorParamsHash { ExecutorParamsHash(BlakeTwo256::hash(&self.encode())) } + + /// Returns a PVF preparation timeout, if any + pub fn pvf_prep_timeout(&self, kind: PvfPrepTimeoutKind) -> Option { + for param in &self.0 { + if let ExecutorParam::PvfPrepTimeout(k, timeout) = param { + if kind == *k { + return Some(Duration::from_millis(*timeout)) + } + } + } + None + } + + /// Returns a PVF execution timeout, if any + pub fn pvf_exec_timeout(&self, kind: PvfExecTimeoutKind) -> Option { + for param in &self.0 { + if let ExecutorParam::PvfExecTimeout(k, timeout) = param { + if kind == *k { + return Some(Duration::from_millis(*timeout)) + } + } + } + None + } } impl Deref for ExecutorParams {