diff --git a/Cargo.lock b/Cargo.lock
index 7bf5215b6dec..951f2548d34d 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -7575,9 +7575,9 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67"
[[package]]
name = "libc"
-version = "0.2.152"
+version = "0.2.153"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7"
+checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd"
[[package]]
name = "libflate"
@@ -13303,7 +13303,6 @@ dependencies = [
"slotmap",
"sp-core",
"sp-maybe-compressed-blob",
- "sp-wasm-interface 20.0.0",
"tempfile",
"test-parachain-adder",
"test-parachain-halt",
@@ -13340,7 +13339,6 @@ name = "polkadot-node-core-pvf-common"
version = "7.0.0"
dependencies = [
"assert_matches",
- "cfg-if",
"cpu-time",
"futures",
"landlock",
diff --git a/polkadot/node/core/pvf/Cargo.toml b/polkadot/node/core/pvf/Cargo.toml
index a0233d6b7517..8bfe2baa42fd 100644
--- a/polkadot/node/core/pvf/Cargo.toml
+++ b/polkadot/node/core/pvf/Cargo.toml
@@ -17,8 +17,7 @@ cfg-if = "1.0"
futures = "0.3.30"
futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../../gum" }
-is_executable = "1.0.1"
-libc = "0.2.152"
+is_executable = { version = "1.0.1", optional = true }
pin-project = "1.0.9"
rand = "0.8.5"
slotmap = "1.0"
@@ -26,7 +25,9 @@ tempfile = "3.3.0"
thiserror = { workspace = true }
tokio = { version = "1.24.2", features = ["fs", "process"] }
-parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] }
+parity-scale-codec = { version = "3.6.1", default-features = false, features = [
+ "derive",
+] }
polkadot-parachain-primitives = { path = "../../../parachain" }
polkadot-core-primitives = { path = "../../../core-primitives" }
@@ -37,14 +38,16 @@ polkadot-node-subsystem = { path = "../../subsystem" }
polkadot-primitives = { path = "../../../primitives" }
sp-core = { path = "../../../../substrate/primitives/core" }
-sp-wasm-interface = { path = "../../../../substrate/primitives/wasm-interface" }
-sp-maybe-compressed-blob = { path = "../../../../substrate/primitives/maybe-compressed-blob" }
+sp-maybe-compressed-blob = { path = "../../../../substrate/primitives/maybe-compressed-blob", optional = true }
polkadot-node-core-pvf-prepare-worker = { path = "prepare-worker", optional = true }
polkadot-node-core-pvf-execute-worker = { path = "execute-worker", optional = true }
[dev-dependencies]
assert_matches = "1.4.0"
-criterion = { version = "0.4.0", default-features = false, features = ["async_tokio", "cargo_bench_support"] }
+criterion = { version = "0.4.0", default-features = false, features = [
+ "async_tokio",
+ "cargo_bench_support",
+] }
hex-literal = "0.4.1"
polkadot-node-core-pvf-common = { path = "common", features = ["test-utils"] }
@@ -57,6 +60,7 @@ adder = { package = "test-parachain-adder", path = "../../../parachain/test-para
halt = { package = "test-parachain-halt", path = "../../../parachain/test-parachains/halt" }
[target.'cfg(target_os = "linux")'.dev-dependencies]
+libc = "0.2.153"
procfs = "0.16.0"
rusty-fork = "0.3.0"
sc-sysinfo = { path = "../../../../substrate/client/sysinfo" }
@@ -70,6 +74,8 @@ ci-only-tests = []
jemalloc-allocator = ["polkadot-node-core-pvf-common/jemalloc-allocator"]
# This feature is used to export test code to other crates without putting it in the production build.
test-utils = [
- "polkadot-node-core-pvf-execute-worker",
- "polkadot-node-core-pvf-prepare-worker",
+ "dep:is_executable",
+ "dep:polkadot-node-core-pvf-execute-worker",
+ "dep:polkadot-node-core-pvf-prepare-worker",
+ "dep:sp-maybe-compressed-blob",
]
diff --git a/polkadot/node/core/pvf/common/Cargo.toml b/polkadot/node/core/pvf/common/Cargo.toml
index f3eb9d919aae..e1ce6e79cb99 100644
--- a/polkadot/node/core/pvf/common/Cargo.toml
+++ b/polkadot/node/core/pvf/common/Cargo.toml
@@ -10,14 +10,16 @@ license.workspace = true
workspace = true
[dependencies]
-cfg-if = "1.0"
cpu-time = "1.0.0"
futures = "0.3.30"
gum = { package = "tracing-gum", path = "../../../gum" }
libc = "0.2.152"
+nix = { version = "0.27.1", features = ["resource", "sched"] }
thiserror = { workspace = true }
-parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] }
+parity-scale-codec = { version = "3.6.1", default-features = false, features = [
+ "derive",
+] }
polkadot-parachain-primitives = { path = "../../../../parachain" }
polkadot-primitives = { path = "../../../../primitives" }
@@ -34,7 +36,6 @@ sp-tracing = { path = "../../../../../substrate/primitives/tracing" }
[target.'cfg(target_os = "linux")'.dependencies]
landlock = "0.3.0"
-nix = { version = "0.27.1", features = ["sched"] }
[target.'cfg(all(target_os = "linux", target_arch = "x86_64"))'.dependencies]
seccompiler = "0.4.0"
diff --git a/polkadot/node/core/pvf/common/src/error.rs b/polkadot/node/core/pvf/common/src/error.rs
index cf274044456f..adeb40c0b195 100644
--- a/polkadot/node/core/pvf/common/src/error.rs
+++ b/polkadot/node/core/pvf/common/src/error.rs
@@ -136,6 +136,9 @@ pub enum InternalValidationError {
/// Could not find or open compiled artifact file.
#[error("validation: could not find or open compiled artifact file: {0}")]
CouldNotOpenFile(String),
+ /// Could not create a pipe between the worker and a child process.
+ #[error("validation: could not create pipe: {0}")]
+ CouldNotCreatePipe(String),
/// Host could not clear the worker cache after a job.
#[error("validation: host could not clear the worker cache ({path:?}) after a job: {err}")]
CouldNotClearWorkerDir {
diff --git a/polkadot/node/core/pvf/common/src/execute.rs b/polkadot/node/core/pvf/common/src/execute.rs
index 18c97b03cbcd..ae6096cacec4 100644
--- a/polkadot/node/core/pvf/common/src/execute.rs
+++ b/polkadot/node/core/pvf/common/src/execute.rs
@@ -30,35 +30,36 @@ pub struct Handshake {
/// The response from the execution worker.
#[derive(Debug, Encode, Decode)]
-pub enum WorkerResponse {
- /// The job completed successfully.
- Ok {
- /// The result of parachain validation.
- result_descriptor: ValidationResult,
- /// The amount of CPU time taken by the job.
- duration: Duration,
- },
- /// The candidate is invalid.
- InvalidCandidate(String),
- /// Instantiation of the WASM module instance failed during an execution.
- /// Possibly related to local issues or dirty node update. May be retried with re-preparation.
- RuntimeConstruction(String),
+pub struct WorkerResponse {
+ /// The response from the execute job process.
+ pub job_response: JobResponse,
+ /// The amount of CPU time taken by the job.
+ pub duration: Duration,
+}
+
+/// An error occurred in the worker process.
+#[derive(thiserror::Error, Debug, Clone, Encode, Decode)]
+pub enum WorkerError {
/// The job timed out.
+ #[error("The job timed out")]
JobTimedOut,
/// The job process has died. We must kill the worker just in case.
///
/// We cannot treat this as an internal error because malicious code may have killed the job.
/// We still retry it, because in the non-malicious case it is likely spurious.
+ #[error("The job process (pid {job_pid}) has died: {err}")]
JobDied { err: String, job_pid: i32 },
/// An unexpected error occurred in the job process, e.g. failing to spawn a thread, panic,
/// etc.
///
/// Because malicious code can cause a job error, we must not treat it as an internal error. We
/// still retry it, because in the non-malicious case it is likely spurious.
- JobError(String),
+ #[error("An unexpected error occurred in the job process: {0}")]
+ JobError(#[from] JobError),
/// Some internal error occurred.
- InternalError(InternalValidationError),
+ #[error("An internal error occurred: {0}")]
+ InternalError(#[from] InternalValidationError),
}
/// The result of a job on the execution worker.
@@ -101,7 +102,7 @@ impl JobResponse {
/// An unexpected error occurred in the execution job process. Because this comes from the job,
/// which executes untrusted code, this error must likewise be treated as untrusted. That is, we
/// cannot raise an internal error based on this.
-#[derive(thiserror::Error, Debug, Encode, Decode)]
+#[derive(thiserror::Error, Clone, Debug, Encode, Decode)]
pub enum JobError {
#[error("The job timed out")]
TimedOut,
@@ -114,4 +115,7 @@ pub enum JobError {
CouldNotSpawnThread(String),
#[error("An error occurred in the CPU time monitor thread: {0}")]
CpuTimeMonitorThread(String),
+ /// Since the job can return any exit status it wants, we have to treat this as untrusted.
+ #[error("Unexpected exit status: {0}")]
+ UnexpectedExitStatus(i32),
}
diff --git a/polkadot/node/core/pvf/common/src/lib.rs b/polkadot/node/core/pvf/common/src/lib.rs
index 15097dbd3af5..0cd928201639 100644
--- a/polkadot/node/core/pvf/common/src/lib.rs
+++ b/polkadot/node/core/pvf/common/src/lib.rs
@@ -15,6 +15,7 @@
// along with Polkadot. If not, see .
//! Contains functionality related to PVFs that is shared by the PVF host and the PVF workers.
+#![deny(unused_crate_dependencies)]
pub mod error;
pub mod execute;
diff --git a/polkadot/node/core/pvf/common/src/pvf.rs b/polkadot/node/core/pvf/common/src/pvf.rs
index 340dffe07c3f..5f248f49b9a3 100644
--- a/polkadot/node/core/pvf/common/src/pvf.rs
+++ b/polkadot/node/core/pvf/common/src/pvf.rs
@@ -18,12 +18,7 @@ use crate::prepare::PrepareJobKind;
use parity_scale_codec::{Decode, Encode};
use polkadot_parachain_primitives::primitives::ValidationCodeHash;
use polkadot_primitives::ExecutorParams;
-use std::{
- cmp::{Eq, PartialEq},
- fmt,
- sync::Arc,
- time::Duration,
-};
+use std::{fmt, sync::Arc, time::Duration};
/// A struct that carries the exhaustive set of data to prepare an artifact out of plain
/// Wasm binary
diff --git a/polkadot/node/core/pvf/common/src/worker/mod.rs b/polkadot/node/core/pvf/common/src/worker/mod.rs
index d7c95d9e7047..67e7bece407d 100644
--- a/polkadot/node/core/pvf/common/src/worker/mod.rs
+++ b/polkadot/node/core/pvf/common/src/worker/mod.rs
@@ -18,10 +18,13 @@
pub mod security;
-use crate::{framed_recv_blocking, SecurityStatus, WorkerHandshake, LOG_TARGET};
+use crate::{
+ framed_recv_blocking, framed_send_blocking, SecurityStatus, WorkerHandshake, LOG_TARGET,
+};
use cpu_time::ProcessTime;
use futures::never::Never;
-use parity_scale_codec::Decode;
+use nix::{errno::Errno, sys::resource::Usage};
+use parity_scale_codec::{Decode, Encode};
use std::{
any::Any,
fmt::{self},
@@ -58,8 +61,6 @@ macro_rules! decl_worker_main {
$crate::sp_tracing::try_init_simple();
- let worker_pid = std::process::id();
-
let args = std::env::args().collect::>();
if args.len() == 1 {
print_help($expected_command);
@@ -548,6 +549,81 @@ fn recv_worker_handshake(stream: &mut UnixStream) -> io::Result
Ok(worker_handshake)
}
+/// Calculate the total CPU time from the given `usage` structure, returned from
+/// [`nix::sys::resource::getrusage`], and calculates the total CPU time spent, including both user
+/// and system time.
+///
+/// # Arguments
+///
+/// - `rusage`: Contains resource usage information.
+///
+/// # Returns
+///
+/// Returns a `Duration` representing the total CPU time.
+pub fn get_total_cpu_usage(rusage: Usage) -> Duration {
+ let micros = (((rusage.user_time().tv_sec() + rusage.system_time().tv_sec()) * 1_000_000) +
+ (rusage.system_time().tv_usec() + rusage.user_time().tv_usec()) as i64) as u64;
+
+ return Duration::from_micros(micros)
+}
+
+/// Get a job response.
+pub fn recv_child_response(
+ received_data: &mut io::BufReader<&[u8]>,
+ context: &'static str,
+) -> io::Result
+where
+ T: Decode,
+{
+ let response_bytes = framed_recv_blocking(received_data)?;
+ T::decode(&mut response_bytes.as_slice()).map_err(|e| {
+ io::Error::new(
+ io::ErrorKind::Other,
+ format!("{} pvf recv_child_response: decode error: {}", context, e),
+ )
+ })
+}
+
+pub fn send_result(
+ stream: &mut UnixStream,
+ result: Result,
+ worker_info: &WorkerInfo,
+) -> io::Result<()>
+where
+ T: std::fmt::Debug,
+ E: std::fmt::Debug + std::fmt::Display,
+ Result: Encode,
+{
+ if let Err(ref err) = result {
+ gum::warn!(
+ target: LOG_TARGET,
+ ?worker_info,
+ "worker: error occurred: {}",
+ err
+ );
+ }
+ gum::trace!(
+ target: LOG_TARGET,
+ ?worker_info,
+ "worker: sending result to host: {:?}",
+ result
+ );
+
+ framed_send_blocking(stream, &result.encode()).map_err(|err| {
+ gum::warn!(
+ target: LOG_TARGET,
+ ?worker_info,
+ "worker: error occurred sending result to host: {}",
+ err
+ );
+ err
+ })
+}
+
+pub fn stringify_errno(context: &'static str, errno: Errno) -> String {
+ format!("{}: {}: {}", context, errno, io::Error::last_os_error())
+}
+
/// Functionality related to threads spawned by the workers.
///
/// The motivation for this module is to coordinate worker threads without using async Rust.
diff --git a/polkadot/node/core/pvf/execute-worker/src/lib.rs b/polkadot/node/core/pvf/execute-worker/src/lib.rs
index bd7e76010a6d..55f5290bd87e 100644
--- a/polkadot/node/core/pvf/execute-worker/src/lib.rs
+++ b/polkadot/node/core/pvf/execute-worker/src/lib.rs
@@ -16,6 +16,9 @@
//! Contains the logic for executing PVFs. Used by the polkadot-execute-worker binary.
+#![deny(unused_crate_dependencies)]
+#![warn(missing_docs)]
+
pub use polkadot_node_core_pvf_common::{
error::ExecuteError, executor_interface::execute_artifact,
};
@@ -36,11 +39,12 @@ use nix::{
use parity_scale_codec::{Decode, Encode};
use polkadot_node_core_pvf_common::{
error::InternalValidationError,
- execute::{Handshake, JobError, JobResponse, JobResult, WorkerResponse},
+ execute::{Handshake, JobError, JobResponse, JobResult, WorkerError, WorkerResponse},
executor_interface::params_to_wasmtime_semantics,
framed_recv_blocking, framed_send_blocking,
worker::{
- cpu_time_monitor_loop, pipe2_cloexec, run_worker, stringify_panic_payload,
+ cpu_time_monitor_loop, get_total_cpu_usage, pipe2_cloexec, recv_child_response, run_worker,
+ send_result, stringify_errno, stringify_panic_payload,
thread::{self, WaitOutcome},
PipeFd, WorkerInfo, WorkerKind,
},
@@ -93,8 +97,14 @@ fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec, Duration)> {
Ok((params, execution_timeout))
}
-fn send_response(stream: &mut UnixStream, response: WorkerResponse) -> io::Result<()> {
- framed_send_blocking(stream, &response.encode())
+/// Sends an error to the host and returns the original error wrapped in `io::Error`.
+macro_rules! map_and_send_err {
+ ($error:expr, $err_constructor:expr, $stream:expr, $worker_info:expr) => {{
+ let err: WorkerError = $err_constructor($error.to_string()).into();
+ let io_err = io::Error::new(io::ErrorKind::Other, err.to_string());
+ let _ = send_result::($stream, Err(err), $worker_info);
+ io_err
+ }};
}
/// The entrypoint that the spawned execute worker should start with.
@@ -110,8 +120,6 @@ fn send_response(stream: &mut UnixStream, response: WorkerResponse) -> io::Resul
/// check is not necessary.
///
/// - `worker_version`: see above
-///
-/// - `security_status`: contains the detected status of security features.
pub fn worker_entrypoint(
socket_path: PathBuf,
worker_dir_path: PathBuf,
@@ -127,13 +135,28 @@ pub fn worker_entrypoint(
|mut stream, worker_info, security_status| {
let artifact_path = worker_dir::execute_artifact(&worker_info.worker_dir_path);
- let Handshake { executor_params } = recv_execute_handshake(&mut stream)?;
+ let Handshake { executor_params } =
+ recv_execute_handshake(&mut stream).map_err(|e| {
+ map_and_send_err!(
+ e,
+ InternalValidationError::HostCommunication,
+ &mut stream,
+ worker_info
+ )
+ })?;
let executor_params: Arc = Arc::new(executor_params);
let execute_thread_stack_size = max_stack_size(&executor_params);
loop {
- let (params, execution_timeout) = recv_request(&mut stream)?;
+ let (params, execution_timeout) = recv_request(&mut stream).map_err(|e| {
+ map_and_send_err!(
+ e,
+ InternalValidationError::HostCommunication,
+ &mut stream,
+ worker_info
+ )
+ })?;
gum::debug!(
target: LOG_TARGET,
?worker_info,
@@ -143,27 +166,34 @@ pub fn worker_entrypoint(
);
// Get the artifact bytes.
- let compiled_artifact_blob = match std::fs::read(&artifact_path) {
- Ok(bytes) => bytes,
- Err(err) => {
- let response = WorkerResponse::InternalError(
- InternalValidationError::CouldNotOpenFile(err.to_string()),
- );
- send_response(&mut stream, response)?;
- continue
- },
- };
-
- let (pipe_read_fd, pipe_write_fd) = pipe2_cloexec()?;
-
- let usage_before = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) {
- Ok(usage) => usage,
- Err(errno) => {
- let response = internal_error_from_errno("getrusage before", errno);
- send_response(&mut stream, response)?;
- continue
- },
- };
+ let compiled_artifact_blob = std::fs::read(&artifact_path).map_err(|e| {
+ map_and_send_err!(
+ e,
+ InternalValidationError::CouldNotOpenFile,
+ &mut stream,
+ worker_info
+ )
+ })?;
+
+ let (pipe_read_fd, pipe_write_fd) = pipe2_cloexec().map_err(|e| {
+ map_and_send_err!(
+ e,
+ InternalValidationError::CouldNotCreatePipe,
+ &mut stream,
+ worker_info
+ )
+ })?;
+
+ let usage_before = nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN)
+ .map_err(|errno| {
+ let e = stringify_errno("getrusage before", errno);
+ map_and_send_err!(
+ e,
+ InternalValidationError::Kernel,
+ &mut stream,
+ worker_info
+ )
+ })?;
let stream_fd = stream.as_raw_fd();
let compiled_artifact_blob = Arc::new(compiled_artifact_blob);
@@ -222,7 +252,7 @@ pub fn worker_entrypoint(
"worker: sending result to host: {:?}",
result
);
- send_response(&mut stream, result)?;
+ send_result(&mut stream, result, worker_info)?;
}
},
);
@@ -270,7 +300,7 @@ fn handle_clone(
worker_info: &WorkerInfo,
have_unshare_newuser: bool,
usage_before: Usage,
-) -> io::Result {
+) -> io::Result> {
use polkadot_node_core_pvf_common::worker::security;
// SAFETY: new process is spawned within a single threaded process. This invariant
@@ -301,7 +331,8 @@ fn handle_clone(
usage_before,
execution_timeout,
),
- Err(security::clone::Error::Clone(errno)) => Ok(internal_error_from_errno("clone", errno)),
+ Err(security::clone::Error::Clone(errno)) =>
+ Ok(Err(internal_error_from_errno("clone", errno))),
}
}
@@ -316,7 +347,7 @@ fn handle_fork(
execute_worker_stack_size: usize,
worker_info: &WorkerInfo,
usage_before: Usage,
-) -> io::Result {
+) -> io::Result> {
// SAFETY: new process is spawned within a single threaded process. This invariant
// is enforced by tests.
match unsafe { nix::unistd::fork() } {
@@ -338,7 +369,7 @@ fn handle_fork(
usage_before,
execution_timeout,
),
- Err(errno) => Ok(internal_error_from_errno("fork", errno)),
+ Err(errno) => Ok(Err(internal_error_from_errno("fork", errno))),
}
}
@@ -483,11 +514,11 @@ fn handle_parent_process(
job_pid: Pid,
usage_before: Usage,
timeout: Duration,
-) -> io::Result {
+) -> io::Result> {
// the read end will wait until all write ends have been closed,
// this drop is necessary to avoid deadlock
if let Err(errno) = nix::unistd::close(pipe_write_fd) {
- return Ok(internal_error_from_errno("closing pipe write fd", errno));
+ return Ok(Err(internal_error_from_errno("closing pipe write fd", errno)));
};
// SAFETY: pipe_read_fd is an open and owned file descriptor at this point.
@@ -512,7 +543,7 @@ fn handle_parent_process(
let usage_after = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) {
Ok(usage) => usage,
- Err(errno) => return Ok(internal_error_from_errno("getrusage after", errno)),
+ Err(errno) => return Ok(Err(internal_error_from_errno("getrusage after", errno))),
};
// Using `getrusage` is needed to check whether child has timedout since we cannot rely on
@@ -530,32 +561,25 @@ fn handle_parent_process(
cpu_tv.as_millis(),
timeout.as_millis(),
);
- return Ok(WorkerResponse::JobTimedOut)
+ return Ok(Err(WorkerError::JobTimedOut))
}
match status {
Ok(WaitStatus::Exited(_, exit_status)) => {
let mut reader = io::BufReader::new(received_data.as_slice());
- let result = match recv_child_response(&mut reader) {
- Ok(result) => result,
- Err(err) => return Ok(WorkerResponse::JobError(err.to_string())),
- };
+ let result = recv_child_response(&mut reader, "execute")?;
match result {
- Ok(JobResponse::Ok { result_descriptor }) => {
+ Ok(job_response) => {
// The exit status should have been zero if no error occurred.
if exit_status != 0 {
- return Ok(WorkerResponse::JobError(format!(
- "unexpected exit status: {}",
- exit_status
- )))
+ return Ok(Err(WorkerError::JobError(JobError::UnexpectedExitStatus(
+ exit_status,
+ ))));
}
- Ok(WorkerResponse::Ok { result_descriptor, duration: cpu_tv })
+ Ok(Ok(WorkerResponse { job_response, duration: cpu_tv }))
},
- Ok(JobResponse::InvalidCandidate(err)) => Ok(WorkerResponse::InvalidCandidate(err)),
- Ok(JobResponse::RuntimeConstruction(err)) =>
- Ok(WorkerResponse::RuntimeConstruction(err)),
Err(job_error) => {
gum::warn!(
target: LOG_TARGET,
@@ -565,9 +589,9 @@ fn handle_parent_process(
job_error,
);
if matches!(job_error, JobError::TimedOut) {
- Ok(WorkerResponse::JobTimedOut)
+ Ok(Err(WorkerError::JobTimedOut))
} else {
- Ok(WorkerResponse::JobError(job_error.to_string()))
+ Ok(Err(WorkerError::JobError(job_error.into())))
}
},
}
@@ -576,50 +600,21 @@ fn handle_parent_process(
//
// The job gets SIGSYS on seccomp violations, but this signal may have been sent for some
// other reason, so we still need to check for seccomp violations elsewhere.
- Ok(WaitStatus::Signaled(_pid, signal, _core_dump)) => Ok(WorkerResponse::JobDied {
+ Ok(WaitStatus::Signaled(_pid, signal, _core_dump)) => Ok(Err(WorkerError::JobDied {
err: format!("received signal: {signal:?}"),
job_pid: job_pid.as_raw(),
- }),
- Err(errno) => Ok(internal_error_from_errno("waitpid", errno)),
+ })),
+ Err(errno) => Ok(Err(internal_error_from_errno("waitpid", errno))),
// It is within an attacker's power to send an unexpected exit status. So we cannot treat
// this as an internal error (which would make us abstain), but must vote against.
- Ok(unexpected_wait_status) => Ok(WorkerResponse::JobDied {
+ Ok(unexpected_wait_status) => Ok(Err(WorkerError::JobDied {
err: format!("unexpected status from wait: {unexpected_wait_status:?}"),
job_pid: job_pid.as_raw(),
- }),
+ })),
}
}
-/// Calculate the total CPU time from the given `usage` structure, returned from
-/// [`nix::sys::resource::getrusage`], and calculates the total CPU time spent, including both user
-/// and system time.
-///
-/// # Arguments
-///
-/// - `rusage`: Contains resource usage information.
-///
-/// # Returns
-///
-/// Returns a `Duration` representing the total CPU time.
-fn get_total_cpu_usage(rusage: Usage) -> Duration {
- let micros = (((rusage.user_time().tv_sec() + rusage.system_time().tv_sec()) * 1_000_000) +
- (rusage.system_time().tv_usec() + rusage.user_time().tv_usec()) as i64) as u64;
-
- return Duration::from_micros(micros)
-}
-
-/// Get a job response.
-fn recv_child_response(received_data: &mut io::BufReader<&[u8]>) -> io::Result {
- let response_bytes = framed_recv_blocking(received_data)?;
- JobResult::decode(&mut response_bytes.as_slice()).map_err(|e| {
- io::Error::new(
- io::ErrorKind::Other,
- format!("execute pvf recv_child_response: decode error: {}", e),
- )
- })
-}
-
/// Write a job response to the pipe and exit process after.
///
/// # Arguments
@@ -638,15 +633,10 @@ fn send_child_response(pipe_write: &mut PipeFd, response: JobResult) -> ! {
}
}
-fn internal_error_from_errno(context: &'static str, errno: Errno) -> WorkerResponse {
- WorkerResponse::InternalError(InternalValidationError::Kernel(format!(
- "{}: {}: {}",
- context,
- errno,
- io::Error::last_os_error()
- )))
+fn internal_error_from_errno(context: &'static str, errno: Errno) -> WorkerError {
+ WorkerError::InternalError(InternalValidationError::Kernel(stringify_errno(context, errno)))
}
fn job_error_from_errno(context: &'static str, errno: Errno) -> JobResult {
- Err(JobError::Kernel(format!("{}: {}: {}", context, errno, io::Error::last_os_error())))
+ Err(JobError::Kernel(stringify_errno(context, errno)))
}
diff --git a/polkadot/node/core/pvf/prepare-worker/src/lib.rs b/polkadot/node/core/pvf/prepare-worker/src/lib.rs
index 82a56107ef53..d1b218f48ae8 100644
--- a/polkadot/node/core/pvf/prepare-worker/src/lib.rs
+++ b/polkadot/node/core/pvf/prepare-worker/src/lib.rs
@@ -26,7 +26,6 @@ const LOG_TARGET: &str = "parachain::pvf-prepare-worker";
use crate::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread};
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
use crate::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop};
-use libc;
use nix::{
errno::Errno,
sys::{
@@ -48,7 +47,8 @@ use polkadot_node_core_pvf_common::{
prepare::{MemoryStats, PrepareJobKind, PrepareStats, PrepareWorkerSuccess},
pvf::PvfPrepData,
worker::{
- cpu_time_monitor_loop, run_worker, stringify_panic_payload,
+ cpu_time_monitor_loop, get_total_cpu_usage, recv_child_response, run_worker, send_result,
+ stringify_errno, stringify_panic_payload,
thread::{self, spawn_worker_thread, WaitOutcome},
WorkerKind,
},
@@ -117,11 +117,6 @@ fn recv_request(stream: &mut UnixStream) -> io::Result {
Ok(pvf)
}
-/// Send a worker response.
-fn send_response(stream: &mut UnixStream, result: PrepareWorkerResult) -> io::Result<()> {
- framed_send_blocking(stream, &result.encode())
-}
-
fn start_memory_tracking(fd: RawFd, limit: Option) {
unsafe {
// SAFETY: Inside the failure handler, the allocator is locked and no allocations or
@@ -178,8 +173,6 @@ fn end_memory_tracking() -> isize {
///
/// - `worker_version`: see above
///
-/// - `security_status`: contains the detected status of security features.
-///
/// # Flow
///
/// This runs the following in a loop:
@@ -233,8 +226,9 @@ pub fn worker_entrypoint(
let usage_before = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) {
Ok(usage) => usage,
Err(errno) => {
- let result = Err(error_from_errno("getrusage before", errno));
- send_response(&mut stream, result)?;
+ let result: PrepareWorkerResult =
+ Err(error_from_errno("getrusage before", errno));
+ send_result(&mut stream, result, worker_info)?;
continue
},
};
@@ -294,7 +288,7 @@ pub fn worker_entrypoint(
"worker: sending result to host: {:?}",
result
);
- send_response(&mut stream, result)?;
+ send_result(&mut stream, result, worker_info)?;
}
},
);
@@ -666,7 +660,7 @@ fn handle_parent_process(
match status {
Ok(WaitStatus::Exited(_pid, exit_status)) => {
let mut reader = io::BufReader::new(received_data.as_slice());
- let result = recv_child_response(&mut reader)
+ let result = recv_child_response(&mut reader, "prepare")
.map_err(|err| PrepareError::JobError(err.to_string()))?;
match result {
@@ -726,35 +720,6 @@ fn handle_parent_process(
}
}
-/// Calculate the total CPU time from the given `usage` structure, returned from
-/// [`nix::sys::resource::getrusage`], and calculates the total CPU time spent, including both user
-/// and system time.
-///
-/// # Arguments
-///
-/// - `rusage`: Contains resource usage information.
-///
-/// # Returns
-///
-/// Returns a `Duration` representing the total CPU time.
-fn get_total_cpu_usage(rusage: Usage) -> Duration {
- let micros = (((rusage.user_time().tv_sec() + rusage.system_time().tv_sec()) * 1_000_000) +
- (rusage.system_time().tv_usec() + rusage.user_time().tv_usec()) as i64) as u64;
-
- return Duration::from_micros(micros)
-}
-
-/// Get a job response.
-fn recv_child_response(received_data: &mut io::BufReader<&[u8]>) -> io::Result {
- let response_bytes = framed_recv_blocking(received_data)?;
- JobResult::decode(&mut response_bytes.as_slice()).map_err(|e| {
- io::Error::new(
- io::ErrorKind::Other,
- format!("prepare pvf recv_child_response: decode error: {:?}", e),
- )
- })
-}
-
/// Write a job response to the pipe and exit process after.
///
/// # Arguments
@@ -774,7 +739,7 @@ fn send_child_response(pipe_write: &mut PipeFd, response: JobResult) -> ! {
}
fn error_from_errno(context: &'static str, errno: Errno) -> PrepareError {
- PrepareError::Kernel(format!("{}: {}: {}", context, errno, io::Error::last_os_error()))
+ PrepareError::Kernel(stringify_errno(context, errno))
}
type JobResult = Result;
diff --git a/polkadot/node/core/pvf/src/execute/queue.rs b/polkadot/node/core/pvf/src/execute/queue.rs
index bdc3c7327b06..af147a2ba227 100644
--- a/polkadot/node/core/pvf/src/execute/queue.rs
+++ b/polkadot/node/core/pvf/src/execute/queue.rs
@@ -16,7 +16,7 @@
//! A queue that handles requests for PVF execution.
-use super::worker_interface::Outcome;
+use super::worker_interface::{Error as WorkerInterfaceError, Response as WorkerInterfaceResponse};
use crate::{
artifacts::{ArtifactId, ArtifactPathId},
host::ResultSender,
@@ -30,7 +30,10 @@ use futures::{
stream::{FuturesUnordered, StreamExt as _},
Future, FutureExt,
};
-use polkadot_node_core_pvf_common::SecurityStatus;
+use polkadot_node_core_pvf_common::{
+ execute::{JobResponse, WorkerError, WorkerResponse},
+ SecurityStatus,
+};
use polkadot_primitives::{ExecutorParams, ExecutorParamsHash};
use slotmap::HopSlotMap;
use std::{
@@ -133,7 +136,12 @@ impl Workers {
enum QueueEvent {
Spawn(IdleWorker, WorkerHandle, ExecuteJob),
- StartWork(Worker, Outcome, ArtifactId, ResultSender),
+ StartWork(
+ Worker,
+ Result,
+ ArtifactId,
+ ResultSender,
+ ),
}
type Mux = FuturesUnordered>;
@@ -340,23 +348,34 @@ fn handle_worker_spawned(
async fn handle_job_finish(
queue: &mut Queue,
worker: Worker,
- outcome: Outcome,
+ worker_result: Result,
artifact_id: ArtifactId,
result_tx: ResultSender,
) {
- let (idle_worker, result, duration, sync_channel) = match outcome {
- Outcome::Ok { result_descriptor, duration, idle_worker } => {
+ let (idle_worker, result, duration, sync_channel) = match worker_result {
+ Ok(WorkerInterfaceResponse {
+ worker_response:
+ WorkerResponse { job_response: JobResponse::Ok { result_descriptor }, duration },
+ idle_worker,
+ }) => {
// TODO: propagate the soft timeout
(Some(idle_worker), Ok(result_descriptor), Some(duration), None)
},
- Outcome::InvalidCandidate { err, idle_worker } => (
+ Ok(WorkerInterfaceResponse {
+ worker_response: WorkerResponse { job_response: JobResponse::InvalidCandidate(err), .. },
+ idle_worker,
+ }) => (
Some(idle_worker),
Err(ValidationError::Invalid(InvalidCandidate::WorkerReportedInvalid(err))),
None,
None,
),
- Outcome::RuntimeConstruction { err, idle_worker } => {
+ Ok(WorkerInterfaceResponse {
+ worker_response:
+ WorkerResponse { job_response: JobResponse::RuntimeConstruction(err), .. },
+ idle_worker,
+ }) => {
// The task for artifact removal is executed concurrently with
// the message to the host on the execution result.
let (result_tx, result_rx) = oneshot::channel();
@@ -376,27 +395,31 @@ async fn handle_job_finish(
Some(result_rx),
)
},
- Outcome::InternalError { err } => (None, Err(ValidationError::Internal(err)), None, None),
+
+ Err(WorkerInterfaceError::InternalError(err)) |
+ Err(WorkerInterfaceError::WorkerError(WorkerError::InternalError(err))) =>
+ (None, Err(ValidationError::Internal(err)), None, None),
// Either the worker or the job timed out. Kill the worker in either case. Treated as
// definitely-invalid, because if we timed out, there's no time left for a retry.
- Outcome::HardTimeout =>
+ Err(WorkerInterfaceError::HardTimeout) |
+ Err(WorkerInterfaceError::WorkerError(WorkerError::JobTimedOut)) =>
(None, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)), None, None),
// "Maybe invalid" errors (will retry).
- Outcome::WorkerIntfErr => (
+ Err(WorkerInterfaceError::CommunicationErr(_err)) => (
None,
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)),
None,
None,
),
- Outcome::JobDied { err } => (
+ Err(WorkerInterfaceError::WorkerError(WorkerError::JobDied { err, .. })) => (
None,
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousJobDeath(err))),
None,
None,
),
- Outcome::JobError { err } => (
+ Err(WorkerInterfaceError::WorkerError(WorkerError::JobError(err))) => (
None,
- Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(err))),
+ Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(err.to_string()))),
None,
None,
),
@@ -543,14 +566,14 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
queue.mux.push(
async move {
let _timer = execution_timer;
- let outcome = super::worker_interface::start_work(
+ let result = super::worker_interface::start_work(
idle,
job.artifact.clone(),
job.exec_timeout,
job.params,
)
.await;
- QueueEvent::StartWork(worker, outcome, job.artifact.id, job.result_tx)
+ QueueEvent::StartWork(worker, result, job.artifact.id, job.result_tx)
}
.boxed(),
);
diff --git a/polkadot/node/core/pvf/src/execute/worker_interface.rs b/polkadot/node/core/pvf/src/execute/worker_interface.rs
index db81da118d7b..9dcadfb4c2a7 100644
--- a/polkadot/node/core/pvf/src/execute/worker_interface.rs
+++ b/polkadot/node/core/pvf/src/execute/worker_interface.rs
@@ -29,10 +29,9 @@ use futures_timer::Delay;
use parity_scale_codec::{Decode, Encode};
use polkadot_node_core_pvf_common::{
error::InternalValidationError,
- execute::{Handshake, WorkerResponse},
+ execute::{Handshake, WorkerError, WorkerResponse},
worker_dir, SecurityStatus,
};
-use polkadot_parachain_primitives::primitives::ValidationResult;
use polkadot_primitives::ExecutorParams;
use std::{path::Path, time::Duration};
use tokio::{io, net::UnixStream};
@@ -69,7 +68,8 @@ pub async fn spawn(
gum::warn!(
target: LOG_TARGET,
worker_pid = %idle_worker.pid,
- %err
+ "failed to send a handshake to the spawned worker: {}",
+ error
);
err
})?;
@@ -78,39 +78,40 @@ pub async fn spawn(
/// Outcome of PVF execution.
///
-/// If the idle worker token is not returned, it means the worker must be terminated.
-pub enum Outcome {
- /// PVF execution completed successfully and the result is returned. The worker is ready for
- /// another job.
- Ok { result_descriptor: ValidationResult, duration: Duration, idle_worker: IdleWorker },
- /// The candidate validation failed. It may be for example because the wasm execution triggered
- /// a trap. Errors related to the preparation process are not expected to be encountered by the
- /// execution workers.
- InvalidCandidate { err: String, idle_worker: IdleWorker },
- /// The error is probably transient. It may be for example
- /// because the artifact was prepared with a Wasmtime version different from the version
- /// in the current execution environment.
- RuntimeConstruction { err: String, idle_worker: IdleWorker },
+/// PVF execution completed and the result is returned. The worker is ready for
+/// another job.
+pub struct Response {
+ /// The response (valid/invalid) from the worker.
+ pub worker_response: WorkerResponse,
+ /// Returning the idle worker token means the worker can be reused.
+ pub idle_worker: IdleWorker,
+}
+/// The idle worker token is not returned for any of these cases, meaning the worker must be
+/// terminated.
+///
+/// NOTE: Errors related to the preparation process are not expected to be encountered by the
+/// execution workers.
+#[derive(thiserror::Error, Debug)]
+pub enum Error {
/// The execution time exceeded the hard limit. The worker is terminated.
+ #[error("The communication with the worker exceeded the hard limit")]
HardTimeout,
/// An I/O error happened during communication with the worker. This may mean that the worker
/// process already died. The token is not returned in any case.
- WorkerIntfErr,
- /// The job process has died. We must kill the worker just in case.
- ///
- /// We cannot treat this as an internal error because malicious code may have caused this.
- JobDied { err: String },
- /// An unexpected error occurred in the job process.
- ///
- /// Because malicious code can cause a job error, we must not treat it as an internal error.
- JobError { err: String },
+ #[error("An I/O error happened during communication with the worker: {0}")]
+ CommunicationErr(#[from] io::Error),
+ /// The worker reported an error (can be from itself or from the job). The worker should not be
+ /// reused.
+ #[error("The worker reported an error: {0}")]
+ WorkerError(#[from] WorkerError),
/// An internal error happened during the validation. Such an error is most likely related to
/// some transient glitch.
///
/// Should only ever be used for errors independent of the candidate and PVF. Therefore it may
/// be a problem with the worker, so we terminate it.
- InternalError { err: InternalValidationError },
+ #[error("An internal error occurred: {0}")]
+ InternalError(#[from] InternalValidationError),
}
/// Given the idle token of a worker and parameters of work, communicates with the worker and
@@ -123,7 +124,7 @@ pub async fn start_work(
artifact: ArtifactPathId,
execution_timeout: Duration,
validation_params: Vec,
-) -> Outcome {
+) -> Result {
let IdleWorker { mut stream, pid, worker_dir } = worker;
gum::debug!(
@@ -136,16 +137,18 @@ pub async fn start_work(
);
with_worker_dir_setup(worker_dir, pid, &artifact.path, |worker_dir| async move {
- if let Err(error) = send_request(&mut stream, &validation_params, execution_timeout).await {
- gum::warn!(
- target: LOG_TARGET,
- worker_pid = %pid,
- validation_code_hash = ?artifact.id.code_hash,
- ?error,
- "failed to send an execute request",
- );
- return Outcome::WorkerIntfErr
- }
+ send_request(&mut stream, &validation_params, execution_timeout).await.map_err(
+ |error| {
+ gum::warn!(
+ target: LOG_TARGET,
+ worker_pid = %pid,
+ validation_code_hash = ?artifact.id.code_hash,
+ "failed to send an execute request: {}",
+ error,
+ );
+ Error::InternalError(InternalValidationError::HostCommunication(error.to_string()))
+ },
+ )?;
// We use a generous timeout here. This is in addition to the one in the child process, in
// case the child stalls. We have a wall clock timeout here in the host, but a CPU timeout
@@ -153,12 +156,12 @@ pub async fn start_work(
// load, but the CPU resources of the child can only be measured from the parent after the
// child process terminates.
let timeout = execution_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR;
- let response = futures::select! {
- response = recv_response(&mut stream).fuse() => {
- match response {
- Ok(response) =>
- handle_response(
- response,
+ let worker_result = futures::select! {
+ worker_result = recv_result(&mut stream).fuse() => {
+ match worker_result {
+ Ok(result) =>
+ handle_result(
+ result,
pid,
execution_timeout,
)
@@ -168,11 +171,11 @@ pub async fn start_work(
target: LOG_TARGET,
worker_pid = %pid,
validation_code_hash = ?artifact.id.code_hash,
- ?error,
- "failed to recv an execute response",
+ "failed to recv an execute result: {}",
+ error,
);
- return Outcome::WorkerIntfErr
+ return Err(Error::CommunicationErr(error))
},
}
},
@@ -183,29 +186,16 @@ pub async fn start_work(
validation_code_hash = ?artifact.id.code_hash,
"execution worker exceeded lenient timeout for execution, child worker likely stalled",
);
- WorkerResponse::JobTimedOut
+ return Err(Error::HardTimeout)
},
};
- match response {
- WorkerResponse::Ok { result_descriptor, duration } => Outcome::Ok {
- result_descriptor,
- duration,
- idle_worker: IdleWorker { stream, pid, worker_dir },
- },
- WorkerResponse::InvalidCandidate(err) => Outcome::InvalidCandidate {
- err,
- idle_worker: IdleWorker { stream, pid, worker_dir },
- },
- WorkerResponse::RuntimeConstruction(err) => Outcome::RuntimeConstruction {
- err,
+ match worker_result {
+ Ok(worker_response) => Ok(Response {
+ worker_response,
idle_worker: IdleWorker { stream, pid, worker_dir },
- },
- WorkerResponse::JobTimedOut => Outcome::HardTimeout,
- WorkerResponse::JobDied { err, job_pid: _ } => Outcome::JobDied { err },
- WorkerResponse::JobError(err) => Outcome::JobError { err },
-
- WorkerResponse::InternalError(err) => Outcome::InternalError { err },
+ }),
+ Err(worker_error) => Err(worker_error.into()),
}
})
.await
@@ -215,12 +205,12 @@ pub async fn start_work(
///
/// Here we know the artifact exists, but is still located in a temporary file which will be cleared
/// by [`with_worker_dir_setup`].
-async fn handle_response(
- response: WorkerResponse,
+async fn handle_result(
+ worker_result: Result,
worker_pid: u32,
execution_timeout: Duration,
-) -> WorkerResponse {
- if let WorkerResponse::Ok { duration, .. } = response {
+) -> Result {
+ if let Ok(WorkerResponse { duration, .. }) = worker_result {
if duration > execution_timeout {
// The job didn't complete within the timeout.
gum::warn!(
@@ -232,11 +222,11 @@ async fn handle_response(
);
// Return a timeout error.
- return WorkerResponse::JobTimedOut
+ return Err(WorkerError::JobTimedOut)
}
}
- response
+ worker_result
}
/// Create a temporary file for an artifact in the worker cache, execute the given future/closure
@@ -249,9 +239,9 @@ async fn with_worker_dir_setup(
pid: u32,
artifact_path: &Path,
f: F,
-) -> Outcome
+) -> Result
where
- Fut: futures::Future