Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change prepare worker to use fork instead of threads #1685

Merged
merged 58 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
6009a85
change prepare worker to use fork instead of threads
jpserrat Sep 24, 2023
cbed258
get total cpu time for current child
jpserrat Sep 24, 2023
08c7766
fix fmt and int cast
jpserrat Sep 24, 2023
40429a2
exit from child job on success for prepare worker
jpserrat Sep 25, 2023
4300566
prepare worker: change entrypoint docstring, remove cpu time thread
jpserrat Oct 1, 2023
92d25a2
Merge branch 'master' of github.com:paritytech/polkadot-sdk into jpse…
jpserrat Oct 2, 2023
d0a261a
prepare worker: use scale codec for encode and decode and fix docstring
jpserrat Oct 4, 2023
9970ef8
Merge branch 'master' of github.com:paritytech/polkadot-sdk into jpse…
jpserrat Oct 4, 2023
5f100d9
prepare worker: fix artifact result map
jpserrat Oct 5, 2023
7b9421c
Merge branch 'master' of github.com:paritytech/polkadot-sdk into jpse…
jpserrat Oct 5, 2023
e64ff83
prepare worker: remove elapsed time
jpserrat Oct 5, 2023
462d73c
prepare worker: fix result hadling when target os is linux
jpserrat Oct 5, 2023
7ef719d
prepare worker: remove useless use of format!
jpserrat Oct 5, 2023
b0fa175
prepare worker: remove unused preparation_timeout
jpserrat Oct 5, 2023
220f069
prepare worker: move pipe read before child wait
jpserrat Oct 9, 2023
96de656
prepare worker: use getrusage for timeout
jpserrat Oct 12, 2023
ab51e84
Merge branch 'master' of github.com:paritytech/polkadot-sdk into jpse…
jpserrat Oct 14, 2023
6c96fd9
Merge branch 'master' into jpserrat-pvf-fork-instead-threads
mrcnski Oct 17, 2023
fdb3462
Do some cleanup / addressing of issues
mrcnski Oct 21, 2023
f4bea3a
prepare worker: add docstring and pipe error from child process to pa…
jpserrat Oct 22, 2023
6615598
prepare worker: closes stream in child process
jpserrat Oct 25, 2023
2196c5f
prepare worker: fix prepare job
jpserrat Oct 25, 2023
1682ed0
prepare worker: handle prepare job when target_os is linux
jpserrat Oct 25, 2023
5c9b1d8
Merge branch 'master' into jpserrat-pvf-fork-instead-threads
mrcnski Oct 31, 2023
962fccb
Some minor updates
mrcnski Nov 1, 2023
45b0648
Merge branch 'master' into jpserrat-pvf-fork-instead-threads
mrcnski Nov 1, 2023
7baf706
Clean up `fork` usage a bit
mrcnski Nov 1, 2023
7687015
Update prepare-worker-syscalls list
mrcnski Nov 1, 2023
c584441
Merge branch 'master' into jpserrat-pvf-fork-instead-threads
mrcnski Nov 2, 2023
0d5eb73
Add test when forked process dies (e.g. OOM, seccomp violation)
mrcnski Nov 2, 2023
a932a7c
Update for seccomp (add detection for job death); fix some errors
mrcnski Nov 2, 2023
7e1414e
Some clarifications
mrcnski Nov 2, 2023
d9fcb5d
change execute worker from thread to fork
jpserrat Nov 2, 2023
f830b05
Fix bench
mrcnski Nov 3, 2023
d9fcc7c
cargo fmt
mrcnski Nov 3, 2023
7d86911
Fix some issues with prepare worker
mrcnski Nov 3, 2023
3b855d8
Fix some issues with execute worker/job; update errors
mrcnski Nov 3, 2023
0c04842
Fix some test failures
mrcnski Nov 3, 2023
87e96d0
add cpu monitor thread back to prepare and execute worker
jpserrat Nov 4, 2023
9e3d65c
Merge branch 'jpserrat-pvf-fork-instead-threads' of github.com:Jpserr…
jpserrat Nov 4, 2023
de87f5f
Merge branch 'master' into jpserrat-pvf-fork-instead-threads
mrcnski Nov 5, 2023
2e497a7
Merge remote-tracking branch 'Jpserrat/jpserrat-pvf-fork-instead-thre…
mrcnski Nov 5, 2023
6bd7d20
cargo fmt
mrcnski Nov 5, 2023
1fb8c13
Clean up error handling a bit
mrcnski Nov 5, 2023
9c666b4
Fix some tests
mrcnski Nov 5, 2023
8c7c519
Make error reporting more robust and secure
mrcnski Nov 6, 2023
af96dfa
Some fixes
mrcnski Nov 6, 2023
67736c4
Make sure `pre_encoded_payloads` tests the correct type
mrcnski Nov 6, 2023
e25344c
Some fixes
mrcnski Nov 7, 2023
203ce25
Fix `prechecking_out_of_memory` test
mrcnski Nov 7, 2023
de7d19d
Add tests, fix some more issues
mrcnski Nov 7, 2023
cec6477
Fix test
mrcnski Nov 7, 2023
89cfd2e
Make naming of some types more clear
mrcnski Nov 7, 2023
58b814c
Update execute worker syscalls
mrcnski Nov 7, 2023
a28cf2e
Update prepare worker syscalls
mrcnski Nov 7, 2023
b03c695
Address review comments
mrcnski Nov 9, 2023
8070074
Add tests for num_threads of child processes
mrcnski Nov 10, 2023
05555e7
Update impl guide
mrcnski Nov 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 15 additions & 9 deletions polkadot/node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,14 +639,19 @@ async fn validate_candidate_exhaustive(
},
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout)) =>
Ok(ValidationResult::Invalid(InvalidCandidate::Timeout)),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::WorkerReportedError(e))) =>
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::WorkerReportedInvalid(e))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(e))),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(
"ambiguous worker death".to_string(),
))),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic(err))) =>
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::JobError(err))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err))),

Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousJobDeath(err))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(format!(
"ambiguous job death: {err}"
)))),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::PrepareError(e))) => {
// In principle if preparation of the `WASM` fails, the current candidate can not be the
// reason for that. So we can't say whether it is invalid or not. In addition, with
Expand Down Expand Up @@ -738,9 +743,9 @@ trait ValidationBackend {
};

// Allow limited retries for each kind of error.
let mut num_death_retries_left = 1;
let mut num_job_error_retries_left = 1;
let mut num_internal_retries_left = 1;
let mut num_awd_retries_left = 1;
let mut num_panic_retries_left = 1;
loop {
// Stop retrying if we exceeded the timeout.
if total_time_start.elapsed() + retry_delay > exec_timeout {
Expand All @@ -749,11 +754,12 @@ trait ValidationBackend {

match validation_result {
Err(ValidationError::InvalidCandidate(
WasmInvalidCandidate::AmbiguousWorkerDeath,
)) if num_awd_retries_left > 0 => num_awd_retries_left -= 1,
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic(_)))
if num_panic_retries_left > 0 =>
num_panic_retries_left -= 1,
WasmInvalidCandidate::AmbiguousWorkerDeath |
WasmInvalidCandidate::AmbiguousJobDeath(_),
)) if num_death_retries_left > 0 => num_death_retries_left -= 1,
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::JobError(_)))
if num_job_error_retries_left > 0 =>
num_job_error_retries_left -= 1,
Err(ValidationError::InternalError(_)) if num_internal_retries_left > 0 =>
num_internal_retries_left -= 1,
_ => break,
Expand Down
12 changes: 7 additions & 5 deletions polkadot/node/core/candidate-validation/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,11 +695,13 @@ fn candidate_validation_retry_panic_errors() {

let v = executor::block_on(validate_candidate_exhaustive(
MockValidateCandidateBackend::with_hardcoded_result_list(vec![
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic("foo".into()))),
// Throw an AWD error, we should still retry again.
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::JobError("foo".into()))),
// Throw an AJD error, we should still retry again.
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousJobDeath(
"baz".into(),
))),
// Throw another panic error.
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic("bar".into()))),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::JobError("bar".into()))),
]),
validation_data,
validation_code,
Expand Down Expand Up @@ -1216,7 +1218,7 @@ fn precheck_properly_classifies_outcomes() {

inner(Err(PrepareError::Prevalidation("foo".to_owned())), PreCheckOutcome::Invalid);
inner(Err(PrepareError::Preparation("bar".to_owned())), PreCheckOutcome::Invalid);
inner(Err(PrepareError::Panic("baz".to_owned())), PreCheckOutcome::Invalid);
inner(Err(PrepareError::JobError("baz".to_owned())), PreCheckOutcome::Invalid);

inner(Err(PrepareError::TimedOut), PreCheckOutcome::Failed);
inner(Err(PrepareError::IoErr("fizz".to_owned())), PreCheckOutcome::Failed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl TestHost {
where
F: FnOnce(&mut Config),
{
let (prepare_worker_path, execute_worker_path) = testing::get_and_check_worker_paths();
let (prepare_worker_path, execute_worker_path) = testing::build_workers_and_get_paths(true);

let cache_dir = tempfile::tempdir().unwrap();
let mut config = Config::new(
Expand Down
2 changes: 1 addition & 1 deletion polkadot/node/core/pvf/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ cpu-time = "1.0.0"
futures = "0.3.21"
gum = { package = "tracing-gum", path = "../../../gum" }
libc = "0.2.139"
thiserror = "1.0.31"

parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] }

Expand All @@ -30,7 +31,6 @@ sp-tracing = { path = "../../../../../substrate/primitives/tracing" }
[target.'cfg(target_os = "linux")'.dependencies]
landlock = "0.3.0"
seccompiler = "0.4.0"
thiserror = "1.0.31"

[dev-dependencies]
assert_matches = "1.4.0"
Expand Down
55 changes: 27 additions & 28 deletions polkadot/node/core/pvf/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ pub enum PrepareError {
/// Instantiation of the WASM module instance failed.
#[codec(index = 2)]
RuntimeConstruction(String),
/// An unexpected panic has occurred in the preparation worker.
/// An unexpected error has occurred in the preparation job.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to explicitly define somewhere what a "preparation job" is and how it is different from a "preparation worker". It's a new idea that may not be obvious to a new code reader.

#[codec(index = 3)]
Panic(String),
JobError(String),
/// Failed to prepare the PVF due to the time limit.
#[codec(index = 4)]
TimedOut,
Expand All @@ -48,12 +48,12 @@ pub enum PrepareError {
/// The temporary file for the artifact could not be created at the given cache path. This
/// state is reported by the validation host (not by the worker).
#[codec(index = 6)]
CreateTmpFileErr(String),
CreateTmpFile(String),
/// The response from the worker is received, but the file cannot be renamed (moved) to the
/// final destination location. This state is reported by the validation host (not by the
/// worker).
#[codec(index = 7)]
RenameTmpFileErr {
RenameTmpFile {
err: String,
// Unfortunately `PathBuf` doesn't implement `Encode`/`Decode`, so we do a fallible
// conversion to `Option<String>`.
Expand All @@ -68,11 +68,14 @@ pub enum PrepareError {
/// reported by the validation host (not by the worker).
#[codec(index = 9)]
ClearWorkerDir(String),
/// The preparation job process died, due to OOM, a seccomp violation, or some other factor.
JobDied(String),
#[codec(index = 10)]
/// Some error occurred when interfacing with the kernel.
#[codec(index = 11)]
Kernel(String),
}

/// Pre-encoded length-prefixed `PrepareResult::Err(PrepareError::OutOfMemory)`
pub const OOM_PAYLOAD: &[u8] = b"\x02\x00\x00\x00\x00\x00\x00\x00\x01\x08";

impl PrepareError {
/// Returns whether this is a deterministic error, i.e. one that should trigger reliably. Those
/// errors depend on the PVF itself and the sc-executor/wasmtime logic.
Expand All @@ -83,12 +86,15 @@ impl PrepareError {
pub fn is_deterministic(&self) -> bool {
use PrepareError::*;
match self {
Prevalidation(_) | Preparation(_) | Panic(_) | OutOfMemory => true,
TimedOut |
Prevalidation(_) | Preparation(_) | JobError(_) | OutOfMemory => true,
IoErr(_) |
CreateTmpFileErr(_) |
RenameTmpFileErr { .. } |
ClearWorkerDir(_) => false,
JobDied(_) |
CreateTmpFile(_) |
RenameTmpFile { .. } |
ClearWorkerDir(_) |
Kernel(_) => false,
// Can occur due to issues with the PVF, but also due to factors like local load.
TimedOut => false,
// Can occur due to issues with the PVF, but also due to local errors.
RuntimeConstruction(_) => false,
}
Expand All @@ -102,14 +108,16 @@ impl fmt::Display for PrepareError {
Prevalidation(err) => write!(f, "prevalidation: {}", err),
Preparation(err) => write!(f, "preparation: {}", err),
RuntimeConstruction(err) => write!(f, "runtime construction: {}", err),
Panic(err) => write!(f, "panic: {}", err),
JobError(err) => write!(f, "panic: {}", err),
TimedOut => write!(f, "prepare: timeout"),
IoErr(err) => write!(f, "prepare: io error while receiving response: {}", err),
CreateTmpFileErr(err) => write!(f, "prepare: error creating tmp file: {}", err),
RenameTmpFileErr { err, src, dest } =>
JobDied(err) => write!(f, "prepare: prepare job died: {}", err),
CreateTmpFile(err) => write!(f, "prepare: error creating tmp file: {}", err),
RenameTmpFile { err, src, dest } =>
write!(f, "prepare: error renaming tmp file ({:?} -> {:?}): {}", src, dest, err),
OutOfMemory => write!(f, "prepare: out of memory"),
ClearWorkerDir(err) => write!(f, "prepare: error clearing worker cache: {}", err),
Kernel(err) => write!(f, "prepare: error interfacing with the kernel: {}", err),
}
}
}
Expand All @@ -133,9 +141,9 @@ pub enum InternalValidationError {
// conversion to `Option<String>`.
path: Option<String>,
},
/// An error occurred in the CPU time monitor thread. Should be totally unrelated to
/// validation.
CpuTimeMonitorThread(String),
/// Some error occurred when interfacing with the kernel.
Kernel(String),

/// Some non-deterministic preparation error occurred.
NonDeterministicPrepareError(PrepareError),
}
Expand All @@ -158,17 +166,8 @@ impl fmt::Display for InternalValidationError {
"validation: host could not clear the worker cache ({:?}) after a job: {}",
path, err
),
CpuTimeMonitorThread(err) =>
write!(f, "validation: an error occurred in the CPU time monitor thread: {}", err),
Kernel(err) => write!(f, "validation: error interfacing with the kernel: {}", err),
NonDeterministicPrepareError(err) => write!(f, "validation: prepare: {}", err),
}
}
}

#[test]
fn pre_encoded_payloads() {
let oom_enc = PrepareResult::Err(PrepareError::OutOfMemory).encode();
let mut oom_payload = oom_enc.len().to_le_bytes().to_vec();
oom_payload.extend(oom_enc);
assert_eq!(oom_payload, OOM_PAYLOAD);
}
49 changes: 43 additions & 6 deletions polkadot/node/core/pvf/common/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ pub struct Handshake {
pub executor_params: ExecutorParams,
}

/// The response from an execution job on the worker.
/// The response from the execution worker.
#[derive(Debug, Encode, Decode)]
pub enum Response {
pub enum WorkerResponse {
/// The job completed successfully.
Ok {
/// The result of parachain validation.
Expand All @@ -41,14 +41,38 @@ pub enum Response {
/// The candidate is invalid.
InvalidCandidate(String),
/// The job timed out.
TimedOut,
/// An unexpected panic has occurred in the execution worker.
Panic(String),
JobTimedOut,
/// The job process has died. We must kill the worker just in case.
///
/// We cannot treat this as an internal error because malicious code may have killed the job.
/// We still retry it, because in the non-malicious case it is likely spurious.
JobDied(String),
/// An unexpected error occurred in the job process, e.g. failing to spawn a thread, panic,
/// etc.
///
/// Because malicious code can cause a job error, we must not treat it as an internal error. We
/// still retry it, because in the non-malicious case it is likely spurious.
JobError(String),

/// Some internal error occurred.
InternalError(InternalValidationError),
}

impl Response {
/// The result of a job on the execution worker.
pub type JobResult = Result<JobResponse, JobError>;

/// The successful response from a job on the execution worker.
#[derive(Debug, Encode, Decode)]
pub enum JobResponse {
Ok {
/// The result of parachain validation.
result_descriptor: ValidationResult,
},
/// The candidate is invalid.
InvalidCandidate(String),
}

impl JobResponse {
/// Creates an invalid response from a context `ctx` and a message `msg` (which can be empty).
pub fn format_invalid(ctx: &'static str, msg: &str) -> Self {
if msg.is_empty() {
Expand All @@ -58,3 +82,16 @@ impl Response {
}
}
}

/// An unexpected error occurred in the execution job.
#[derive(thiserror::Error, Debug, Encode, Decode)]
pub enum JobError {
#[error("The job timed out")]
TimedOut,
#[error("An unexpected panic has occurred in the execution job: {0}")]
Panic(String),
#[error("Could not spawn the requested thread: {0}")]
CouldNotSpawnThread(String),
#[error("An error occurred in the CPU time monitor thread: {0}")]
CpuTimeMonitorThread(String),
}
3 changes: 3 additions & 0 deletions polkadot/node/core/pvf/execute-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ license.workspace = true
[dependencies]
cpu-time = "1.0.0"
gum = { package = "tracing-gum", path = "../../../gum" }
os_pipe = "1.1.4"
nix = { version = "0.27.1", features = ["resource", "process"]}
libc = "0.2.139"

parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] }

Expand Down
Loading
Loading