Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move PVF code and PoV decompression to PVF host workers #5142

Merged
merged 16 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion polkadot/node/core/candidate-validation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ gum = { workspace = true, default-features = true }

sp-keystore = { workspace = true }
sp-application-crypto = { workspace = true }
sp-maybe-compressed-blob = { workspace = true, default-features = true }
codec = { features = ["bit-vec", "derive"], workspace = true }

polkadot-primitives = { workspace = true, default-features = true }
Expand All @@ -36,5 +35,6 @@ sp-keyring = { workspace = true, default-features = true }
futures = { features = ["thread-pool"], workspace = true }
assert_matches = { workspace = true }
polkadot-node-subsystem-test-helpers = { workspace = true }
sp-maybe-compressed-blob = { workspace = true, default-features = true }
sp-core = { workspace = true, default-features = true }
polkadot-primitives-test-helpers = { workspace = true }
134 changes: 47 additions & 87 deletions polkadot/node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ use polkadot_node_core_pvf::{
InternalValidationError, InvalidCandidate as WasmInvalidCandidate, PossiblyInvalidError,
PrepareError, PrepareJobKind, PvfPrepData, ValidationError, ValidationHost,
};
use polkadot_node_primitives::{
BlockData, InvalidCandidate, PoV, ValidationResult, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT,
};
use polkadot_node_primitives::{InvalidCandidate, PoV, ValidationResult};
use polkadot_node_subsystem::{
errors::RuntimeApiError,
messages::{
Expand All @@ -41,9 +39,7 @@ use polkadot_node_subsystem::{
};
use polkadot_node_subsystem_util as util;
use polkadot_overseer::ActiveLeavesUpdate;
use polkadot_parachain_primitives::primitives::{
ValidationParams, ValidationResult as WasmValidationResult,
};
use polkadot_parachain_primitives::primitives::ValidationResult as WasmValidationResult;
use polkadot_primitives::{
executor_params::{
DEFAULT_APPROVAL_EXECUTION_TIMEOUT, DEFAULT_BACKING_EXECUTION_TIMEOUT,
Expand Down Expand Up @@ -504,21 +500,12 @@ where
continue;
};

let pvf = match sp_maybe_compressed_blob::decompress(
&validation_code.0,
VALIDATION_CODE_BOMB_LIMIT,
) {
Ok(code) => PvfPrepData::from_code(
code.into_owned(),
executor_params.clone(),
timeout,
PrepareJobKind::Prechecking,
),
Err(e) => {
gum::debug!(target: LOG_TARGET, err=?e, "cannot decompress validation code");
continue
},
};
let pvf = PvfPrepData::from_code(
validation_code.0,
executor_params.clone(),
timeout,
PrepareJobKind::Prechecking,
);

active_pvfs.push(pvf);
processed_code_hashes.push(code_hash);
Expand Down Expand Up @@ -651,21 +638,12 @@ where

let timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Precheck);

let pvf = match sp_maybe_compressed_blob::decompress(
&validation_code.0,
VALIDATION_CODE_BOMB_LIMIT,
) {
Ok(code) => PvfPrepData::from_code(
code.into_owned(),
executor_params,
timeout,
PrepareJobKind::Prechecking,
),
Err(e) => {
gum::debug!(target: LOG_TARGET, err=?e, "precheck: cannot decompress validation code");
return PreCheckOutcome::Invalid
},
};
let pvf = PvfPrepData::from_code(
validation_code.0,
executor_params,
timeout,
PrepareJobKind::Prechecking,
);

match validation_backend.precheck_pvf(pvf).await {
Ok(_) => PreCheckOutcome::Valid,
Expand Down Expand Up @@ -873,49 +851,15 @@ async fn validate_candidate_exhaustive(
return Ok(ValidationResult::Invalid(e))
}

let raw_validation_code = match sp_maybe_compressed_blob::decompress(
&validation_code.0,
VALIDATION_CODE_BOMB_LIMIT,
) {
Ok(code) => code,
Err(e) => {
gum::info!(target: LOG_TARGET, ?para_id, err=?e, "Invalid candidate (validation code)");

// Code already passed pre-checking, if decompression fails now this most likely means
// some local corruption happened.
return Err(ValidationFailed("Code decompression failed".to_string()))
},
};
metrics.observe_code_size(raw_validation_code.len());

metrics.observe_pov_size(pov.block_data.0.len(), true);
let raw_block_data =
match sp_maybe_compressed_blob::decompress(&pov.block_data.0, POV_BOMB_LIMIT) {
Ok(block_data) => BlockData(block_data.to_vec()),
Err(e) => {
gum::info!(target: LOG_TARGET, ?para_id, err=?e, "Invalid candidate (PoV code)");

// If the PoV is invalid, the candidate certainly is.
return Ok(ValidationResult::Invalid(InvalidCandidate::PoVDecompressionFailure))
},
};
metrics.observe_pov_size(raw_block_data.0.len(), false);

let params = ValidationParams {
parent_head: persisted_validation_data.parent_head.clone(),
block_data: raw_block_data,
relay_parent_number: persisted_validation_data.relay_parent_number,
relay_parent_storage_root: persisted_validation_data.relay_parent_storage_root,
};

let persisted_validation_data = Arc::new(persisted_validation_data);
let result = match exec_kind {
// Retry is disabled to reduce the chance of nondeterministic blocks getting backed and
// honest backers getting slashed.
PvfExecKind::Backing => {
let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare);
let exec_timeout = pvf_exec_timeout(&executor_params, exec_kind);
let pvf = PvfPrepData::from_code(
raw_validation_code.to_vec(),
validation_code.0,
executor_params,
prep_timeout,
PrepareJobKind::Compilation,
Expand All @@ -925,17 +869,19 @@ async fn validate_candidate_exhaustive(
.validate_candidate(
pvf,
exec_timeout,
params.encode(),
persisted_validation_data.clone(),
pov,
polkadot_node_core_pvf::Priority::Normal,
)
.await
},
PvfExecKind::Approval =>
validation_backend
.validate_candidate_with_retry(
raw_validation_code.to_vec(),
validation_code.0,
pvf_exec_timeout(&executor_params, exec_kind),
params,
persisted_validation_data.clone(),
pov,
executor_params,
PVF_APPROVAL_EXECUTION_RETRY_DELAY,
polkadot_node_core_pvf::Priority::Critical,
Expand All @@ -961,6 +907,8 @@ async fn validate_candidate_exhaustive(
Ok(ValidationResult::Invalid(InvalidCandidate::Timeout)),
Err(ValidationError::Invalid(WasmInvalidCandidate::WorkerReportedInvalid(e))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(e))),
Err(ValidationError::Invalid(WasmInvalidCandidate::PoVDecompressionFailure)) =>
Ok(ValidationResult::Invalid(InvalidCandidate::PoVDecompressionFailure)),
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(
"ambiguous worker death".to_string(),
Expand Down Expand Up @@ -1007,7 +955,7 @@ async fn validate_candidate_exhaustive(
// invalid.
Ok(ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch))
} else {
Ok(ValidationResult::Valid(outputs, persisted_validation_data))
Ok(ValidationResult::Valid(outputs, (*persisted_validation_data).clone()))
}
},
}
Expand All @@ -1020,7 +968,8 @@ trait ValidationBackend {
&mut self,
pvf: PvfPrepData,
exec_timeout: Duration,
encoded_params: Vec<u8>,
pvd: Arc<PersistedValidationData>,
pov: Arc<PoV>,
// The priority for the preparation job.
prepare_priority: polkadot_node_core_pvf::Priority,
) -> Result<WasmValidationResult, ValidationError>;
Expand All @@ -1035,9 +984,10 @@ trait ValidationBackend {
/// preparation.
async fn validate_candidate_with_retry(
&mut self,
raw_validation_code: Vec<u8>,
code: Vec<u8>,
exec_timeout: Duration,
params: ValidationParams,
pvd: Arc<PersistedValidationData>,
pov: Arc<PoV>,
executor_params: ExecutorParams,
retry_delay: Duration,
// The priority for the preparation job.
Expand All @@ -1046,7 +996,7 @@ trait ValidationBackend {
let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare);
// Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap.
let pvf = PvfPrepData::from_code(
raw_validation_code,
code,
executor_params,
prep_timeout,
PrepareJobKind::Compilation,
Expand All @@ -1057,7 +1007,13 @@ trait ValidationBackend {

// Use `Priority::Critical` as finality trumps parachain liveliness.
let mut validation_result = self
.validate_candidate(pvf.clone(), exec_timeout, params.encode(), prepare_priority)
.validate_candidate(
pvf.clone(),
exec_timeout,
pvd.clone(),
pov.clone(),
prepare_priority,
)
.await;
if validation_result.is_ok() {
return validation_result
Expand Down Expand Up @@ -1130,10 +1086,14 @@ trait ValidationBackend {
validation_result
);

// Encode the params again when re-trying. We expect the retry case to be relatively
// rare, and we want to avoid unconditionally cloning data.
validation_result = self
.validate_candidate(pvf.clone(), new_timeout, params.encode(), prepare_priority)
.validate_candidate(
pvf.clone(),
new_timeout,
pvd.clone(),
pov.clone(),
prepare_priority,
)
.await;
}
}
Expand All @@ -1153,13 +1113,13 @@ impl ValidationBackend for ValidationHost {
&mut self,
pvf: PvfPrepData,
exec_timeout: Duration,
encoded_params: Vec<u8>,
pvd: Arc<PersistedValidationData>,
pov: Arc<PoV>,
// The priority for the preparation job.
prepare_priority: polkadot_node_core_pvf::Priority,
) -> Result<WasmValidationResult, ValidationError> {
let (tx, rx) = oneshot::channel();
if let Err(err) =
self.execute_pvf(pvf, exec_timeout, encoded_params, prepare_priority, tx).await
if let Err(err) = self.execute_pvf(pvf, exec_timeout, pvd, pov, prepare_priority, tx).await
{
return Err(InternalValidationError::HostCommunication(format!(
"cannot send pvf to the validation host, it might have shut down: {:?}",
Expand Down
44 changes: 0 additions & 44 deletions polkadot/node/core/candidate-validation/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ pub(crate) struct MetricsInner {
pub(crate) validate_from_chain_state: prometheus::Histogram,
pub(crate) validate_from_exhaustive: prometheus::Histogram,
pub(crate) validate_candidate_exhaustive: prometheus::Histogram,
pub(crate) pov_size: prometheus::HistogramVec,
pub(crate) code_size: prometheus::Histogram,
}

/// Candidate validation metrics.
Expand Down Expand Up @@ -70,21 +68,6 @@ impl Metrics {
.as_ref()
.map(|metrics| metrics.validate_candidate_exhaustive.start_timer())
}

pub fn observe_code_size(&self, code_size: usize) {
if let Some(metrics) = &self.0 {
metrics.code_size.observe(code_size as f64);
}
}

pub fn observe_pov_size(&self, pov_size: usize, compressed: bool) {
if let Some(metrics) = &self.0 {
metrics
.pov_size
.with_label_values(&[if compressed { "true" } else { "false" }])
.observe(pov_size as f64);
}
}
}

impl metrics::Metrics for Metrics {
Expand Down Expand Up @@ -121,33 +104,6 @@ impl metrics::Metrics for Metrics {
))?,
registry,
)?,
pov_size: prometheus::register(
prometheus::HistogramVec::new(
prometheus::HistogramOpts::new(
"polkadot_parachain_candidate_validation_pov_size",
"The compressed and decompressed size of the proof of validity of a candidate",
)
.buckets(
prometheus::exponential_buckets(16384.0, 2.0, 10)
.expect("arguments are always valid; qed"),
),
&["compressed"],
)?,
registry,
)?,
code_size: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_parachain_candidate_validation_code_size",
"The size of the decompressed WASM validation blob used for checking a candidate",
)
.buckets(
prometheus::exponential_buckets(16384.0, 2.0, 10)
.expect("arguments are always valid; qed"),
),
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
Expand Down
Loading
Loading