diff --git a/Cargo.lock b/Cargo.lock index 7d9e4bf725ca..ccd410f1415b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12421,15 +12421,20 @@ name = "polkadot-node-core-pvf-prepare-worker" version = "1.0.0" dependencies = [ "cfg-if", + "criterion 0.4.0", "libc", "parity-scale-codec", "polkadot-node-core-pvf-common", "polkadot-primitives", "rayon", + "rococo-runtime", "sc-executor-common", "sc-executor-wasmtime", + "sp-maybe-compressed-blob", "tikv-jemalloc-ctl", + "tikv-jemallocator", "tracing-gum", + "tracking-allocator", ] [[package]] @@ -19442,6 +19447,10 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "tracking-allocator" +version = "1.0.0" + [[package]] name = "trie-bench" version = "0.38.0" diff --git a/polkadot/node/core/pvf/common/src/error.rs b/polkadot/node/core/pvf/common/src/error.rs index 6fdd06057c8b..82b56562d8cc 100644 --- a/polkadot/node/core/pvf/common/src/error.rs +++ b/polkadot/node/core/pvf/common/src/error.rs @@ -23,27 +23,36 @@ use std::fmt; pub type PrepareResult = Result; /// An error that occurred during the prepare part of the PVF pipeline. +// Codec indexes are intended to stabilize pre-encoded payloads (see `OOM_PAYLOAD` below) #[derive(Debug, Clone, Encode, Decode)] pub enum PrepareError { /// During the prevalidation stage of preparation an issue was found with the PVF. + #[codec(index = 0)] Prevalidation(String), /// Compilation failed for the given PVF. + #[codec(index = 1)] Preparation(String), /// Instantiation of the WASM module instance failed. + #[codec(index = 2)] RuntimeConstruction(String), /// An unexpected panic has occurred in the preparation worker. + #[codec(index = 3)] Panic(String), /// Failed to prepare the PVF due to the time limit. + #[codec(index = 4)] TimedOut, /// An IO error occurred. This state is reported by either the validation host or by the /// worker. + #[codec(index = 5)] IoErr(String), /// The temporary file for the artifact could not be created at the given cache path. This /// state is reported by the validation host (not by the worker). + #[codec(index = 6)] CreateTmpFileErr(String), /// The response from the worker is received, but the file cannot be renamed (moved) to the /// final destination location. This state is reported by the validation host (not by the /// worker). + #[codec(index = 7)] RenameTmpFileErr { err: String, // Unfortunately `PathBuf` doesn't implement `Encode`/`Decode`, so we do a fallible @@ -51,12 +60,19 @@ pub enum PrepareError { src: Option, dest: Option, }, + /// Memory limit reached + #[codec(index = 8)] + OutOfMemory, /// The response from the worker is received, but the worker cache could not be cleared. The /// worker has to be killed to avoid jobs having access to data from other jobs. This state is /// reported by the validation host (not by the worker). + #[codec(index = 9)] ClearWorkerDir(String), } +/// Pre-encoded length-prefixed `PrepareResult::Err(PrepareError::OutOfMemory)` +pub const OOM_PAYLOAD: &[u8] = b"\x02\x00\x00\x00\x00\x00\x00\x00\x01\x08"; + impl PrepareError { /// Returns whether this is a deterministic error, i.e. one that should trigger reliably. Those /// errors depend on the PVF itself and the sc-executor/wasmtime logic. @@ -67,7 +83,7 @@ impl PrepareError { pub fn is_deterministic(&self) -> bool { use PrepareError::*; match self { - Prevalidation(_) | Preparation(_) | Panic(_) => true, + Prevalidation(_) | Preparation(_) | Panic(_) | OutOfMemory => true, TimedOut | IoErr(_) | CreateTmpFileErr(_) | @@ -92,6 +108,7 @@ impl fmt::Display for PrepareError { CreateTmpFileErr(err) => write!(f, "prepare: error creating tmp file: {}", err), RenameTmpFileErr { err, src, dest } => write!(f, "prepare: error renaming tmp file ({:?} -> {:?}): {}", src, dest, err), + OutOfMemory => write!(f, "prepare: out of memory"), ClearWorkerDir(err) => write!(f, "prepare: error clearing worker cache: {}", err), } } @@ -147,3 +164,11 @@ impl fmt::Display for InternalValidationError { } } } + +#[test] +fn pre_encoded_payloads() { + let oom_enc = PrepareResult::Err(PrepareError::OutOfMemory).encode(); + let mut oom_payload = oom_enc.len().to_le_bytes().to_vec(); + oom_payload.extend(oom_enc); + assert_eq!(oom_payload, OOM_PAYLOAD); +} diff --git a/polkadot/node/core/pvf/common/src/executor_intf.rs b/polkadot/node/core/pvf/common/src/executor_intf.rs index 58d2a90371cf..3a1d3ac1ba07 100644 --- a/polkadot/node/core/pvf/common/src/executor_intf.rs +++ b/polkadot/node/core/pvf/common/src/executor_intf.rs @@ -166,15 +166,36 @@ pub fn params_to_wasmtime_semantics(par: &ExecutorParams) -> Result stack_limit.logical_max = *slm, ExecutorParam::StackNativeMax(snm) => stack_limit.native_stack_max = *snm, ExecutorParam::WasmExtBulkMemory => sem.wasm_bulk_memory = true, - // TODO: Not implemented yet; . - ExecutorParam::PrecheckingMaxMemory(_) => (), - ExecutorParam::PvfPrepTimeout(_, _) | ExecutorParam::PvfExecTimeout(_, _) => (), /* Not used here */ + ExecutorParam::PrecheckingMaxMemory(_) | + ExecutorParam::PvfPrepTimeout(_, _) | + ExecutorParam::PvfExecTimeout(_, _) => (), /* Not used here */ } } sem.deterministic_stack_limit = Some(stack_limit); Ok(sem) } +/// Runs the prevalidation on the given code. Returns a [`RuntimeBlob`] if it succeeds. +pub fn prevalidate(code: &[u8]) -> Result { + let blob = RuntimeBlob::new(code)?; + // It's assumed this function will take care of any prevalidation logic + // that needs to be done. + // + // Do nothing for now. + Ok(blob) +} + +/// Runs preparation on the given runtime blob. If successful, it returns a serialized compiled +/// artifact which can then be used to pass into `Executor::execute` after writing it to the disk. +pub fn prepare( + blob: RuntimeBlob, + executor_params: &ExecutorParams, +) -> Result, sc_executor_common::error::WasmError> { + let semantics = params_to_wasmtime_semantics(executor_params) + .map_err(|e| sc_executor_common::error::WasmError::Other(e))?; + sc_executor_wasmtime::prepare_runtime_artifact(blob, &semantics) +} + /// Available host functions. We leave out: /// /// 1. storage related stuff (PVF doesn't have a notion of a persistent storage/trie) diff --git a/polkadot/node/core/pvf/common/src/prepare.rs b/polkadot/node/core/pvf/common/src/prepare.rs index c205eddfb8b1..4436ebe4861e 100644 --- a/polkadot/node/core/pvf/common/src/prepare.rs +++ b/polkadot/node/core/pvf/common/src/prepare.rs @@ -29,12 +29,14 @@ pub struct PrepareStats { /// supported by the OS, `ru_maxrss`. #[derive(Clone, Debug, Default, Encode, Decode)] pub struct MemoryStats { - /// Memory stats from `tikv_jemalloc_ctl`. + /// Memory stats from `tikv_jemalloc_ctl`, polling-based and not very precise. #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] pub memory_tracker_stats: Option, /// `ru_maxrss` from `getrusage`. `None` if an error occurred. #[cfg(target_os = "linux")] pub max_rss: Option, + /// Peak allocation in bytes measured by tracking allocator + pub peak_tracked_alloc: u64, } /// Statistics of collected memory metrics. diff --git a/polkadot/node/core/pvf/prepare-worker/Cargo.toml b/polkadot/node/core/pvf/prepare-worker/Cargo.toml index e5a08f8a153d..dae88afc555d 100644 --- a/polkadot/node/core/pvf/prepare-worker/Cargo.toml +++ b/polkadot/node/core/pvf/prepare-worker/Cargo.toml @@ -11,7 +11,9 @@ cfg-if = "1.0" gum = { package = "tracing-gum", path = "../../../gum" } libc = "0.2.139" rayon = "1.5.1" +tracking-allocator = { path = "../../../tracking-allocator" } tikv-jemalloc-ctl = { version = "0.5.0", optional = true } +tikv-jemallocator = { version = "0.5.0", optional = true } parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] } @@ -22,11 +24,22 @@ sc-executor-common = { path = "../../../../../substrate/client/executor/common" sc-executor-wasmtime = { path = "../../../../../substrate/client/executor/wasmtime" } [target.'cfg(target_os = "linux")'.dependencies] +tikv-jemallocator = "0.5.0" tikv-jemalloc-ctl = "0.5.0" [features] builder = [] jemalloc-allocator = [ "dep:tikv-jemalloc-ctl", + "dep:tikv-jemallocator", "polkadot-node-core-pvf-common/jemalloc-allocator", ] + +[dev-dependencies] +criterion = { version = "0.4.0", default-features = false, features = ["cargo_bench_support"] } +rococo-runtime = { path = "../../../../runtime/rococo" } +sp-maybe-compressed-blob = { path = "../../../../../substrate/primitives/maybe-compressed-blob" } + +[[bench]] +name = "prepare_rococo_runtime" +harness = false diff --git a/polkadot/node/core/pvf/prepare-worker/benches/prepare_rococo_runtime.rs b/polkadot/node/core/pvf/prepare-worker/benches/prepare_rococo_runtime.rs new file mode 100644 index 000000000000..ba2568cd80cc --- /dev/null +++ b/polkadot/node/core/pvf/prepare-worker/benches/prepare_rococo_runtime.rs @@ -0,0 +1,65 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use criterion::{criterion_group, criterion_main, Criterion, SamplingMode}; +use polkadot_node_core_pvf_common::{ + executor_intf::{prepare, prevalidate}, + prepare::PrepareJobKind, + pvf::PvfPrepData, +}; +use polkadot_primitives::ExecutorParams; +use std::time::Duration; + +fn do_prepare_runtime(pvf: PvfPrepData) { + let blob = match prevalidate(&pvf.code()) { + Err(err) => panic!("{:?}", err), + Ok(b) => b, + }; + + match prepare(blob, &pvf.executor_params()) { + Ok(_) => (), + Err(err) => panic!("{:?}", err), + } +} + +fn prepare_rococo_runtime(c: &mut Criterion) { + let blob = rococo_runtime::WASM_BINARY.unwrap(); + let pvf = match sp_maybe_compressed_blob::decompress(&blob, 64 * 1024 * 1024) { + Ok(code) => PvfPrepData::from_code( + code.into_owned(), + ExecutorParams::default(), + Duration::from_secs(360), + PrepareJobKind::Compilation, + ), + Err(e) => { + panic!("Cannot decompress blob: {:?}", e); + }, + }; + + let mut group = c.benchmark_group("rococo"); + group.sampling_mode(SamplingMode::Flat); + group.sample_size(20); + group.measurement_time(Duration::from_secs(240)); + group.bench_function("prepare Rococo runtime", |b| { + // `PvfPrepData` is designed to be cheap to clone, so cloning shouldn't affect the + // benchmark accuracy + b.iter(|| do_prepare_runtime(pvf.clone())) + }); + group.finish(); +} + +criterion_group!(preparation, prepare_rococo_runtime); +criterion_main!(preparation); diff --git a/polkadot/node/core/pvf/prepare-worker/src/executor_intf.rs b/polkadot/node/core/pvf/prepare-worker/src/executor_intf.rs deleted file mode 100644 index 1f88f6a6dd6e..000000000000 --- a/polkadot/node/core/pvf/prepare-worker/src/executor_intf.rs +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright (C) Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! Interface to the Substrate Executor - -use polkadot_node_core_pvf_common::executor_intf::params_to_wasmtime_semantics; -use polkadot_primitives::ExecutorParams; -use sc_executor_common::runtime_blob::RuntimeBlob; - -/// Runs the prevalidation on the given code. Returns a [`RuntimeBlob`] if it succeeds. -pub fn prevalidate(code: &[u8]) -> Result { - let blob = RuntimeBlob::new(code)?; - // It's assumed this function will take care of any prevalidation logic - // that needs to be done. - // - // Do nothing for now. - Ok(blob) -} - -/// Runs preparation on the given runtime blob. If successful, it returns a serialized compiled -/// artifact which can then be used to pass into `Executor::execute` after writing it to the disk. -pub fn prepare( - blob: RuntimeBlob, - executor_params: &ExecutorParams, -) -> Result, sc_executor_common::error::WasmError> { - let semantics = params_to_wasmtime_semantics(executor_params) - .map_err(|e| sc_executor_common::error::WasmError::Other(e))?; - sc_executor_wasmtime::prepare_runtime_artifact(blob, &semantics) -} diff --git a/polkadot/node/core/pvf/prepare-worker/src/lib.rs b/polkadot/node/core/pvf/prepare-worker/src/lib.rs index 926d9cabe182..37a4dd06075e 100644 --- a/polkadot/node/core/pvf/prepare-worker/src/lib.rs +++ b/polkadot/node/core/pvf/prepare-worker/src/lib.rs @@ -16,10 +16,9 @@ //! Contains the logic for preparing PVFs. Used by the polkadot-prepare-worker binary. -mod executor_intf; mod memory_stats; -pub use executor_intf::{prepare, prevalidate}; +use polkadot_node_core_pvf_common::executor_intf::{prepare, prevalidate}; // NOTE: Initializing logging in e.g. tests will not have an effect in the workers, as they are // separate spawned processes. Run with e.g. `RUST_LOG=parachain::pvf-prepare-worker=trace`. @@ -31,7 +30,7 @@ use crate::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread use crate::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop}; use parity_scale_codec::{Decode, Encode}; use polkadot_node_core_pvf_common::{ - error::{PrepareError, PrepareResult}, + error::{PrepareError, PrepareResult, OOM_PAYLOAD}, executor_intf::create_runtime_from_artifact_bytes, framed_recv_blocking, framed_send_blocking, prepare::{MemoryStats, PrepareJobKind, PrepareStats}, @@ -46,11 +45,24 @@ use polkadot_node_core_pvf_common::{ use polkadot_primitives::ExecutorParams; use std::{ fs, io, - os::unix::net::UnixStream, + os::{ + fd::{AsRawFd, RawFd}, + unix::net::UnixStream, + }, path::PathBuf, sync::{mpsc::channel, Arc}, time::Duration, }; +use tracking_allocator::TrackingAllocator; + +#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] +#[global_allocator] +static ALLOC: TrackingAllocator = + TrackingAllocator(tikv_jemallocator::Jemalloc); + +#[cfg(not(any(target_os = "linux", feature = "jemalloc-allocator")))] +#[global_allocator] +static ALLOC: TrackingAllocator = TrackingAllocator(std::alloc::System); /// Contains the bytes for a successfully compiled artifact. pub struct CompiledArtifact(Vec); @@ -83,6 +95,44 @@ fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<( framed_send_blocking(stream, &result.encode()) } +fn start_memory_tracking(fd: RawFd, limit: Option) { + unsafe { + // SAFETY: Inside the failure handler, the allocator is locked and no allocations or + // deallocations are possible. For Linux, that always holds for the code below, so it's + // safe. For MacOS, that technically holds at the time of writing, but there are no future + // guarantees. + // The arguments of unsafe `libc` calls are valid, the payload validity is covered with + // a test. + ALLOC.start_tracking( + limit, + Some(Box::new(move || { + #[cfg(target_os = "linux")] + { + // Syscalls never allocate or deallocate, so this is safe. + libc::syscall(libc::SYS_write, fd, OOM_PAYLOAD.as_ptr(), OOM_PAYLOAD.len()); + libc::syscall(libc::SYS_close, fd); + libc::syscall(libc::SYS_exit, 1); + } + #[cfg(not(target_os = "linux"))] + { + // Syscalls are not available on MacOS, so we have to use `libc` wrappers. + // Technicaly, there may be allocations inside, although they shouldn't be + // there. In that case, we'll see deadlocks on MacOS after the OOM condition + // triggered. As we consider running a validator on MacOS unsafe, and this + // code is only run by a validator, it's a lesser evil. + libc::write(fd, OOM_PAYLOAD.as_ptr().cast(), OOM_PAYLOAD.len()); + libc::close(fd); + std::process::exit(1); + } + })), + ); + } +} + +fn end_memory_tracking() -> isize { + ALLOC.end_tracking() +} + /// The entrypoint that the spawned prepare worker should start with. /// /// # Parameters @@ -172,6 +222,22 @@ pub fn worker_entrypoint( Arc::clone(&condvar), WaitOutcome::TimedOut, )?; + + start_memory_tracking( + stream.as_raw_fd(), + executor_params.prechecking_max_memory().map(|v| { + v.try_into().unwrap_or_else(|_| { + gum::warn!( + LOG_TARGET, + %worker_pid, + "Illegal pre-checking max memory value {} discarded", + v, + ); + 0 + }) + }), + ); + // Spawn another thread for preparation. let prepare_thread = thread::spawn_worker_thread( "prepare thread", @@ -207,6 +273,17 @@ pub fn worker_entrypoint( let outcome = thread::wait_for_threads(condvar); + let peak_alloc = { + let peak = end_memory_tracking(); + gum::debug!( + target: LOG_TARGET, + %worker_pid, + "prepare job peak allocation is {} bytes", + peak, + ); + peak + }; + let result = match outcome { WaitOutcome::Finished => { let _ = cpu_time_monitor_tx.send(()); @@ -238,6 +315,14 @@ pub fn worker_entrypoint( memory_tracker_stats, #[cfg(target_os = "linux")] max_rss: extract_max_rss_stat(max_rss, worker_pid), + // Negative peak allocation values are legit; they are narrow + // corner cases and shouldn't affect overall statistics + // significantly + peak_tracked_alloc: if peak_alloc > 0 { + peak_alloc as u64 + } else { + 0u64 + }, }; // Write the serialized artifact into a temp file. diff --git a/polkadot/node/core/pvf/src/metrics.rs b/polkadot/node/core/pvf/src/metrics.rs index 3d792793498b..7fd876cf1740 100644 --- a/polkadot/node/core/pvf/src/metrics.rs +++ b/polkadot/node/core/pvf/src/metrics.rs @@ -93,6 +93,10 @@ impl Metrics { metrics.preparation_max_resident.observe(max_resident_kb); metrics.preparation_max_allocated.observe(max_allocated_kb); } + + metrics + .preparation_peak_tracked_allocation + .observe((memory_stats.peak_tracked_alloc / 1024) as f64); } } } @@ -110,10 +114,14 @@ struct MetricsInner { execution_time: prometheus::Histogram, #[cfg(target_os = "linux")] preparation_max_rss: prometheus::Histogram, + // Max. allocated memory, tracked by Jemallocator, polling-based #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] preparation_max_allocated: prometheus::Histogram, + // Max. resident memory, tracked by Jemallocator, polling-based #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] preparation_max_resident: prometheus::Histogram, + // Peak allocation value, tracked by tracking-allocator + preparation_peak_tracked_allocation: prometheus::Histogram, } impl metrics::Metrics for Metrics { @@ -271,6 +279,18 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + preparation_peak_tracked_allocation: prometheus::register( + prometheus::Histogram::with_opts( + prometheus::HistogramOpts::new( + "polkadot_pvf_preparation_peak_tracked_allocation", + "peak allocation observed for preparation (in kilobytes)", + ).buckets( + prometheus::exponential_buckets(8192.0, 2.0, 10) + .expect("arguments are always valid; qed"), + ), + )?, + registry, + )?, }; Ok(Metrics(Some(inner))) } diff --git a/polkadot/node/core/pvf/src/prepare/pool.rs b/polkadot/node/core/pvf/src/prepare/pool.rs index 7933b0319a6f..6bb6ca5b6445 100644 --- a/polkadot/node/core/pvf/src/prepare/pool.rs +++ b/polkadot/node/core/pvf/src/prepare/pool.rs @@ -399,6 +399,20 @@ fn handle_mux( )?; } + Ok(()) + }, + Outcome::OutOfMemory => { + if attempt_retire(metrics, spawned, worker) { + reply( + from_pool, + FromPool::Concluded { + worker, + rip: true, + result: Err(PrepareError::OutOfMemory), + }, + )?; + } + Ok(()) }, } diff --git a/polkadot/node/core/pvf/src/prepare/worker_intf.rs b/polkadot/node/core/pvf/src/prepare/worker_intf.rs index 1f6ab0b76556..0e50caf1feb5 100644 --- a/polkadot/node/core/pvf/src/prepare/worker_intf.rs +++ b/polkadot/node/core/pvf/src/prepare/worker_intf.rs @@ -98,6 +98,8 @@ pub enum Outcome { /// /// This doesn't return an idle worker instance, thus this worker is no longer usable. IoErr(String), + /// The worker ran out of memory and is aborting. The worker should be ripped. + OutOfMemory, } /// Given the idle token of a worker and parameters of work, communicates with the worker and @@ -234,6 +236,7 @@ async fn handle_response( Ok(result) => result, // Timed out on the child. This should already be logged by the child. Err(PrepareError::TimedOut) => return Outcome::TimedOut, + Err(PrepareError::OutOfMemory) => return Outcome::OutOfMemory, Err(_) => return Outcome::Concluded { worker, result }, }; diff --git a/polkadot/node/core/pvf/src/testing.rs b/polkadot/node/core/pvf/src/testing.rs index 169b55d34b56..4c038896f7f9 100644 --- a/polkadot/node/core/pvf/src/testing.rs +++ b/polkadot/node/core/pvf/src/testing.rs @@ -36,8 +36,8 @@ pub fn validate_candidate( code: &[u8], params: &[u8], ) -> Result, Box> { + use polkadot_node_core_pvf_common::executor_intf::{prepare, prevalidate}; use polkadot_node_core_pvf_execute_worker::execute_artifact; - use polkadot_node_core_pvf_prepare_worker::{prepare, prevalidate}; let code = sp_maybe_compressed_blob::decompress(code, 10 * 1024 * 1024) .expect("Decompressing code failed"); diff --git a/polkadot/node/core/pvf/tests/it/adder.rs b/polkadot/node/core/pvf/tests/it/adder.rs index e8d8a9a6b63e..9a7ddcb40890 100644 --- a/polkadot/node/core/pvf/tests/it/adder.rs +++ b/polkadot/node/core/pvf/tests/it/adder.rs @@ -14,6 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +//! PVF host integration tests checking the chain production pipeline. + use super::TestHost; use adder::{hash_state, BlockData, HeadData}; use parity_scale_codec::{Decode, Encode}; diff --git a/polkadot/node/core/pvf/tests/it/main.rs b/polkadot/node/core/pvf/tests/it/main.rs index a69a488adb98..f4fd7f802f5e 100644 --- a/polkadot/node/core/pvf/tests/it/main.rs +++ b/polkadot/node/core/pvf/tests/it/main.rs @@ -14,6 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +//! General PVF host integration tests checking the functionality of the PVF host itself. + use assert_matches::assert_matches; use parity_scale_codec::Encode as _; use polkadot_node_core_pvf::{ @@ -22,13 +24,10 @@ use polkadot_node_core_pvf::{ JOB_TIMEOUT_WALL_CLOCK_FACTOR, }; use polkadot_parachain_primitives::primitives::{BlockData, ValidationParams, ValidationResult}; -use polkadot_primitives::ExecutorParams; +use polkadot_primitives::{ExecutorParam, ExecutorParams}; #[cfg(target_os = "linux")] use rusty_fork::rusty_fork_test; -#[cfg(feature = "ci-only-tests")] -use polkadot_primitives::ExecutorParam; - use std::time::Duration; use tokio::sync::Mutex; @@ -434,6 +433,43 @@ async fn deleting_prepared_artifact_does_not_dispute() { } } +// This test checks if the adder parachain runtime can be prepared with 10Mb preparation memory +// limit enforced. At the moment of writing, the limit if far enough to prepare the PVF. If it +// starts failing, either Wasmtime version has changed, or the PVF code itself has changed, and +// more memory is required now. Multi-threaded preparation, if ever enabled, may also affect +// memory consumption. +#[tokio::test] +async fn prechecking_within_memory_limits() { + let host = TestHost::new().await; + let result = host + .precheck_pvf( + ::adder::wasm_binary_unwrap(), + ExecutorParams::from(&[ExecutorParam::PrecheckingMaxMemory(10 * 1024 * 1024)][..]), + ) + .await; + + assert_matches!(result, Ok(_)); +} + +// This test checks if the adder parachain runtime can be prepared with 512Kb preparation memory +// limit enforced. At the moment of writing, the limit if not enough to prepare the PVF, and the +// preparation is supposed to generate an error. If the test starts failing, either Wasmtime +// version has changed, or the PVF code itself has changed, and less memory is required now. +#[tokio::test] +async fn prechecking_out_of_memory() { + use polkadot_node_core_pvf::PrepareError; + + let host = TestHost::new().await; + let result = host + .precheck_pvf( + ::adder::wasm_binary_unwrap(), + ExecutorParams::from(&[ExecutorParam::PrecheckingMaxMemory(512 * 1024)][..]), + ) + .await; + + assert_matches!(result, Err(PrepareError::OutOfMemory)); +} + // With one worker, run multiple preparation jobs serially. They should not conflict. #[tokio::test] async fn prepare_can_run_serially() { diff --git a/polkadot/node/malus/Cargo.toml b/polkadot/node/malus/Cargo.toml index 9ce725f16822..f52f0cc0282f 100644 --- a/polkadot/node/malus/Cargo.toml +++ b/polkadot/node/malus/Cargo.toml @@ -48,7 +48,7 @@ erasure = { package = "polkadot-erasure-coding", path = "../../erasure-coding" } rand = "0.8.5" # Required for worker binaries to build. -polkadot-node-core-pvf-common = { path = "../core/pvf/common", features = ["test-utils"] } +polkadot-node-core-pvf-common = { path = "../core/pvf/common" } polkadot-node-core-pvf-execute-worker = { path = "../core/pvf/execute-worker" } polkadot-node-core-pvf-prepare-worker = { path = "../core/pvf/prepare-worker" } diff --git a/polkadot/node/tracking-allocator/Cargo.toml b/polkadot/node/tracking-allocator/Cargo.toml new file mode 100644 index 000000000000..2ea3f69c7d0d --- /dev/null +++ b/polkadot/node/tracking-allocator/Cargo.toml @@ -0,0 +1,6 @@ +[package] +name = "tracking-allocator" +description = "Tracking allocator to control the amount of memory consumed by the process" +version = "1.0.0" +authors.workspace = true +edition.workspace = true diff --git a/polkadot/node/tracking-allocator/src/lib.rs b/polkadot/node/tracking-allocator/src/lib.rs new file mode 100644 index 000000000000..8441bd1ffafa --- /dev/null +++ b/polkadot/node/tracking-allocator/src/lib.rs @@ -0,0 +1,242 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Tracking/limiting global allocator. Calculates the peak allocation between two checkpoints for +//! the whole process. Accepts an optional limit and a failure handler which is called if the limit +//! is overflown. + +use core::{ + alloc::{GlobalAlloc, Layout}, + ops::{Deref, DerefMut}, +}; +use std::{ + cell::UnsafeCell, + ptr::null_mut, + sync::atomic::{AtomicBool, Ordering}, +}; + +struct Spinlock { + lock: AtomicBool, + data: UnsafeCell, +} + +struct SpinlockGuard<'a, T: 'a> { + lock: &'a Spinlock, +} + +// SAFETY: We require that the data inside of the `SpinLock` is `Send`, so it can be sent +// and accessed by any thread as long as it's accessed by only one thread at a time. +// The `SpinLock` provides an exclusive lock over it, so it guarantees that multiple +// threads cannot access it at the same time, hence it implements `Sync` (that is, it can be +// accessed concurrently from multiple threads, even though the `T` itself might not +// necessarily be `Sync` too). +unsafe impl Sync for Spinlock {} + +impl Spinlock { + pub const fn new(t: T) -> Spinlock { + Spinlock { lock: AtomicBool::new(false), data: UnsafeCell::new(t) } + } + + #[inline] + pub fn lock(&self) -> SpinlockGuard { + loop { + // Try to acquire the lock. + if self + .lock + .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + { + return SpinlockGuard { lock: self } + } + // We failed to acquire the lock; wait until it's unlocked. + // + // In theory this should result in less coherency traffic as unlike `compare_exchange` + // it is a read-only operation, so multiple cores can execute it simultaneously + // without taking an exclusive lock over the cache line. + while self.lock.load(Ordering::Relaxed) { + std::hint::spin_loop(); + } + } + } + + #[inline] + fn unlock(&self) { + self.lock.store(false, Ordering::Release); + } +} + +impl Deref for SpinlockGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + // SAFETY: It is safe to dereference a guard to the `UnsafeCell` underlying data as the + // presence of the guard means the data is already locked. + unsafe { &*self.lock.data.get() } + } +} + +impl DerefMut for SpinlockGuard<'_, T> { + fn deref_mut(&mut self) -> &mut T { + // SAFETY: Same as for `Deref::deref`. + unsafe { &mut *self.lock.data.get() } + } +} + +impl Drop for SpinlockGuard<'_, T> { + fn drop(&mut self) { + self.lock.unlock(); + } +} + +struct TrackingAllocatorData { + current: isize, + peak: isize, + limit: isize, + failure_handler: Option>, +} + +impl TrackingAllocatorData { + fn start_tracking( + mut guard: SpinlockGuard, + limit: isize, + failure_handler: Option>, + ) { + guard.current = 0; + guard.peak = 0; + guard.limit = limit; + // Cannot drop it yet, as it would trigger a deallocation + let old_handler = guard.failure_handler.take(); + guard.failure_handler = failure_handler; + drop(guard); + drop(old_handler); + } + + fn end_tracking(mut guard: SpinlockGuard) -> isize { + let peak = guard.peak; + guard.limit = 0; + // Cannot drop it yet, as it would trigger a deallocation + let old_handler = guard.failure_handler.take(); + drop(guard); + drop(old_handler); + peak + } + + #[inline] + fn track_and_check_limits( + mut guard: SpinlockGuard, + alloc: isize, + ) -> Option> { + guard.current += alloc; + if guard.current > guard.peak { + guard.peak = guard.current; + } + if guard.limit == 0 || guard.peak <= guard.limit { + None + } else { + Some(guard) + } + } +} + +static ALLOCATOR_DATA: Spinlock = + Spinlock::new(TrackingAllocatorData { current: 0, peak: 0, limit: 0, failure_handler: None }); + +pub struct TrackingAllocator(pub A); + +impl TrackingAllocator { + /// Start tracking memory allocations and deallocations. + /// + /// # Safety + /// + /// Failure handler is called with the allocator being in the locked state. Thus, no + /// allocations or deallocations are allowed inside the failure handler; otherwise, a + /// deadlock will occur. + pub unsafe fn start_tracking( + &self, + limit: Option, + failure_handler: Option>, + ) { + TrackingAllocatorData::start_tracking( + ALLOCATOR_DATA.lock(), + limit.unwrap_or(0), + failure_handler, + ); + } + + /// End tracking and return the peak allocation value in bytes (as `isize`). Peak allocation + /// value is not guaranteed to be neither non-zero nor positive. + pub fn end_tracking(&self) -> isize { + TrackingAllocatorData::end_tracking(ALLOCATOR_DATA.lock()) + } +} + +#[cold] +#[inline(never)] +unsafe fn fail_allocation(guard: SpinlockGuard) -> *mut u8 { + if let Some(failure_handler) = &guard.failure_handler { + failure_handler() + } + null_mut() +} + +unsafe impl GlobalAlloc for TrackingAllocator { + // SAFETY: + // * The wrapped methods are as safe as the underlying allocator implementation is + + #[inline] + unsafe fn alloc(&self, layout: Layout) -> *mut u8 { + let guard = ALLOCATOR_DATA.lock(); + if let Some(guard) = + TrackingAllocatorData::track_and_check_limits(guard, layout.size() as isize) + { + fail_allocation(guard) + } else { + self.0.alloc(layout) + } + } + + #[inline] + unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 { + let guard = ALLOCATOR_DATA.lock(); + if let Some(guard) = + TrackingAllocatorData::track_and_check_limits(guard, layout.size() as isize) + { + fail_allocation(guard) + } else { + self.0.alloc_zeroed(layout) + } + } + + #[inline] + unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) -> () { + let guard = ALLOCATOR_DATA.lock(); + TrackingAllocatorData::track_and_check_limits(guard, -(layout.size() as isize)); + self.0.dealloc(ptr, layout) + } + + #[inline] + unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 { + let guard = ALLOCATOR_DATA.lock(); + if let Some(guard) = TrackingAllocatorData::track_and_check_limits( + guard, + (new_size as isize) - (layout.size() as isize), + ) { + fail_allocation(guard) + } else { + self.0.realloc(ptr, layout, new_size) + } + } +} diff --git a/polkadot/primitives/src/v6/executor_params.rs b/polkadot/primitives/src/v6/executor_params.rs index 8ea0889bd23b..bb9980f68796 100644 --- a/polkadot/primitives/src/v6/executor_params.rs +++ b/polkadot/primitives/src/v6/executor_params.rs @@ -197,6 +197,16 @@ impl ExecutorParams { None } + /// Returns pre-checking memory limit, if any + pub fn prechecking_max_memory(&self) -> Option { + for param in &self.0 { + if let ExecutorParam::PrecheckingMaxMemory(limit) = param { + return Some(*limit) + } + } + None + } + /// Check params coherence. pub fn check_consistency(&self) -> Result<(), ExecutorParamError> { use ExecutorParam::*; diff --git a/polkadot/src/main.rs b/polkadot/src/main.rs index 5986d8cea7bb..1a96bf8fb00f 100644 --- a/polkadot/src/main.rs +++ b/polkadot/src/main.rs @@ -24,7 +24,7 @@ use color_eyre::eyre; /// `memory_stats::MemoryAllocationTracker`. #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] #[global_allocator] -pub static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; +static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; fn main() -> eyre::Result<()> { color_eyre::install()?;