Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change prepare worker to use fork instead of threads #1685

Merged
merged 58 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
6009a85
change prepare worker to use fork instead of threads
jpserrat Sep 24, 2023
cbed258
get total cpu time for current child
jpserrat Sep 24, 2023
08c7766
fix fmt and int cast
jpserrat Sep 24, 2023
40429a2
exit from child job on success for prepare worker
jpserrat Sep 25, 2023
4300566
prepare worker: change entrypoint docstring, remove cpu time thread
jpserrat Oct 1, 2023
92d25a2
Merge branch 'master' of github.com:paritytech/polkadot-sdk into jpse…
jpserrat Oct 2, 2023
d0a261a
prepare worker: use scale codec for encode and decode and fix docstring
jpserrat Oct 4, 2023
9970ef8
Merge branch 'master' of github.com:paritytech/polkadot-sdk into jpse…
jpserrat Oct 4, 2023
5f100d9
prepare worker: fix artifact result map
jpserrat Oct 5, 2023
7b9421c
Merge branch 'master' of github.com:paritytech/polkadot-sdk into jpse…
jpserrat Oct 5, 2023
e64ff83
prepare worker: remove elapsed time
jpserrat Oct 5, 2023
462d73c
prepare worker: fix result hadling when target os is linux
jpserrat Oct 5, 2023
7ef719d
prepare worker: remove useless use of format!
jpserrat Oct 5, 2023
b0fa175
prepare worker: remove unused preparation_timeout
jpserrat Oct 5, 2023
220f069
prepare worker: move pipe read before child wait
jpserrat Oct 9, 2023
96de656
prepare worker: use getrusage for timeout
jpserrat Oct 12, 2023
ab51e84
Merge branch 'master' of github.com:paritytech/polkadot-sdk into jpse…
jpserrat Oct 14, 2023
6c96fd9
Merge branch 'master' into jpserrat-pvf-fork-instead-threads
mrcnski Oct 17, 2023
fdb3462
Do some cleanup / addressing of issues
mrcnski Oct 21, 2023
f4bea3a
prepare worker: add docstring and pipe error from child process to pa…
jpserrat Oct 22, 2023
6615598
prepare worker: closes stream in child process
jpserrat Oct 25, 2023
2196c5f
prepare worker: fix prepare job
jpserrat Oct 25, 2023
1682ed0
prepare worker: handle prepare job when target_os is linux
jpserrat Oct 25, 2023
5c9b1d8
Merge branch 'master' into jpserrat-pvf-fork-instead-threads
mrcnski Oct 31, 2023
962fccb
Some minor updates
mrcnski Nov 1, 2023
45b0648
Merge branch 'master' into jpserrat-pvf-fork-instead-threads
mrcnski Nov 1, 2023
7baf706
Clean up `fork` usage a bit
mrcnski Nov 1, 2023
7687015
Update prepare-worker-syscalls list
mrcnski Nov 1, 2023
c584441
Merge branch 'master' into jpserrat-pvf-fork-instead-threads
mrcnski Nov 2, 2023
0d5eb73
Add test when forked process dies (e.g. OOM, seccomp violation)
mrcnski Nov 2, 2023
a932a7c
Update for seccomp (add detection for job death); fix some errors
mrcnski Nov 2, 2023
7e1414e
Some clarifications
mrcnski Nov 2, 2023
d9fcb5d
change execute worker from thread to fork
jpserrat Nov 2, 2023
f830b05
Fix bench
mrcnski Nov 3, 2023
d9fcc7c
cargo fmt
mrcnski Nov 3, 2023
7d86911
Fix some issues with prepare worker
mrcnski Nov 3, 2023
3b855d8
Fix some issues with execute worker/job; update errors
mrcnski Nov 3, 2023
0c04842
Fix some test failures
mrcnski Nov 3, 2023
87e96d0
add cpu monitor thread back to prepare and execute worker
jpserrat Nov 4, 2023
9e3d65c
Merge branch 'jpserrat-pvf-fork-instead-threads' of github.com:Jpserr…
jpserrat Nov 4, 2023
de87f5f
Merge branch 'master' into jpserrat-pvf-fork-instead-threads
mrcnski Nov 5, 2023
2e497a7
Merge remote-tracking branch 'Jpserrat/jpserrat-pvf-fork-instead-thre…
mrcnski Nov 5, 2023
6bd7d20
cargo fmt
mrcnski Nov 5, 2023
1fb8c13
Clean up error handling a bit
mrcnski Nov 5, 2023
9c666b4
Fix some tests
mrcnski Nov 5, 2023
8c7c519
Make error reporting more robust and secure
mrcnski Nov 6, 2023
af96dfa
Some fixes
mrcnski Nov 6, 2023
67736c4
Make sure `pre_encoded_payloads` tests the correct type
mrcnski Nov 6, 2023
e25344c
Some fixes
mrcnski Nov 7, 2023
203ce25
Fix `prechecking_out_of_memory` test
mrcnski Nov 7, 2023
de7d19d
Add tests, fix some more issues
mrcnski Nov 7, 2023
cec6477
Fix test
mrcnski Nov 7, 2023
89cfd2e
Make naming of some types more clear
mrcnski Nov 7, 2023
58b814c
Update execute worker syscalls
mrcnski Nov 7, 2023
a28cf2e
Update prepare worker syscalls
mrcnski Nov 7, 2023
b03c695
Address review comments
mrcnski Nov 9, 2023
8070074
Add tests for num_threads of child processes
mrcnski Nov 10, 2023
05555e7
Update impl guide
mrcnski Nov 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 43 additions & 37 deletions polkadot/node/core/pvf/execute-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use nix::{
},
unistd::{ForkResult, Pid},
};
use os_pipe::PipeWriter;
use os_pipe::{self, PipeReader, PipeWriter};
use parity_scale_codec::{Decode, Encode};
use polkadot_node_core_pvf_common::{
error::InternalValidationError,
Expand Down Expand Up @@ -265,7 +265,7 @@ fn validate_using_artifact(
///
/// # Arguments
///
/// - `pipe_write`: A `os_pipe::PipeWriter` structure, the writing end of a pipe.
/// - `pipe_write`: A `PipeWriter` structure, the writing end of a pipe.
///
/// - `compiled_artifact_blob`: The artifact bytes from compiled by the prepare worker`.
///
Expand All @@ -279,7 +279,7 @@ fn validate_using_artifact(
///
/// - pipe back `JobResponse` to the parent process.
fn handle_child_process(
pipe_write: os_pipe::PipeWriter,
mut pipe_write: PipeWriter,
compiled_artifact_blob: Vec<u8>,
executor_params: ExecutorParams,
params: Vec<u8>,
Expand All @@ -304,7 +304,7 @@ fn handle_child_process(
WaitOutcome::TimedOut,
)
.unwrap_or_else(|err| {
send_child_response(&pipe_write, Err(JobError::CouldNotSpawnThread(err.to_string())))
send_child_response(&mut pipe_write, Err(JobError::CouldNotSpawnThread(err.to_string())))
});

let executor_params_2 = executor_params.clone();
Expand All @@ -316,7 +316,7 @@ fn handle_child_process(
EXECUTE_THREAD_STACK_SIZE,
)
.unwrap_or_else(|err| {
send_child_response(&pipe_write, Err(JobError::CouldNotSpawnThread(err.to_string())))
send_child_response(&mut pipe_write, Err(JobError::CouldNotSpawnThread(err.to_string())))
});

let outcome = thread::wait_for_threads(condvar);
Expand All @@ -339,7 +339,7 @@ fn handle_child_process(
unreachable!("we run wait_while until the outcome is no longer pending; qed"),
};

send_child_response(&pipe_write, response);
send_child_response(&mut pipe_write, response);
}

/// Waits for child process to finish and handle child response from pipe.
Expand All @@ -358,15 +358,17 @@ fn handle_child_process(
///
/// - The response, either `Ok` or some error state.
fn handle_parent_process(
mut pipe_read: os_pipe::PipeReader,
mut pipe_read: PipeReader,
child: Pid,
worker_pid: u32,
usage_before: Usage,
timeout: Duration,
) -> io::Result<WorkerResponse> {
// Read from the child.
let mut received_data = Vec::new();
pipe_read.read_to_end(&mut received_data)?;
let result = recv_child_response(&mut pipe_read)
// Could not decode job response. There is either a bug or the job was hijacked.
// Should retry at any rate.
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;

let status = nix::sys::wait::waitpid(child, None);
gum::trace!(
Expand All @@ -382,7 +384,7 @@ fn handle_parent_process(
};

// Using `getrusage` is needed to check whether child has timedout since we cannot rely on
// child. to report its own time.
// child to report its own time.
// As `getrusage` returns resource usage from all terminated child processes,
// it is necessary to subtract the usage before the current child process to isolate its cpu
// time
Expand All @@ -399,29 +401,23 @@ fn handle_parent_process(
}

match status {
Ok(WaitStatus::Exited(_, libc::EXIT_SUCCESS)) => {
match JobResult::decode(&mut received_data.as_slice()) {
Ok(Ok(JobResponse::Ok { result_descriptor })) =>
Ok(WorkerResponse::Ok { result_descriptor, duration: cpu_tv }),
Ok(Ok(JobResponse::InvalidCandidate(err))) =>
Ok(WorkerResponse::InvalidCandidate(err)),
Ok(Err(job_error)) => {
gum::warn!(
target: LOG_TARGET,
%worker_pid,
"execute job error: {}",
job_error,
);
if matches!(job_error, JobError::TimedOut) {
Ok(WorkerResponse::JobTimedOut)
} else {
Ok(WorkerResponse::JobError(job_error.to_string()))
}
},
// Could not decode job response. There is either a bug or the job was hijacked.
// Should retry at any rate.
Err(err) => Err(io::Error::new(io::ErrorKind::Other, err.to_string())),
}
Ok(WaitStatus::Exited(_, _exit_status)) => match result {
Ok(JobResponse::Ok { result_descriptor }) =>
Ok(WorkerResponse::Ok { result_descriptor, duration: cpu_tv }),
Ok(JobResponse::InvalidCandidate(err)) => Ok(WorkerResponse::InvalidCandidate(err)),
Err(job_error) => {
gum::warn!(
target: LOG_TARGET,
%worker_pid,
"execute job error: {}",
job_error,
);
if matches!(job_error, JobError::TimedOut) {
Ok(WorkerResponse::JobTimedOut)
} else {
Ok(WorkerResponse::JobError(job_error.to_string()))
}
},
},
// The job was killed by the given signal.
//
Expand Down Expand Up @@ -457,16 +453,26 @@ fn get_total_cpu_usage(rusage: Usage) -> Duration {
return Duration::from_micros(micros)
}

/// Get a job response.
fn recv_child_response(pipe_read: &mut PipeReader) -> io::Result<JobResult> {
let response_bytes = framed_recv_blocking(pipe_read)?;
JobResult::decode(&mut response_bytes.as_slice()).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("execute pvf recv_child_response: decode error: {:?}", e),
)
})
}

/// Write response to the pipe and exit process after.
///
/// # Arguments
///
/// - `pipe_write`: A `os_pipe::PipeWriter` structure, the writing end of a pipe.
/// - `pipe_write`: A `PipeWriter` structure, the writing end of a pipe.
///
/// - `response`: Child process response, or error.
fn send_child_response(mut pipe_write: &PipeWriter, response: JobResult) -> ! {
pipe_write
.write_all(response.encode().as_slice())
fn send_child_response(pipe_write: &mut PipeWriter, response: JobResult) -> ! {
framed_send_blocking(pipe_write, response.encode().as_slice())
.unwrap_or_else(|_| process::exit(libc::EXIT_FAILURE));

if response.is_ok() {
Expand Down
63 changes: 37 additions & 26 deletions polkadot/node/core/pvf/prepare-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use nix::{
},
unistd::{ForkResult, Pid},
};
use os_pipe::{self, PipeWriter};
use os_pipe::{self, PipeReader, PipeWriter};
use parity_scale_codec::{Decode, Encode};
use polkadot_node_core_pvf_common::{
error::{PrepareError, PrepareResult},
Expand Down Expand Up @@ -93,6 +93,7 @@ impl AsRef<[u8]> for CompiledArtifact {
}
}

/// Get a worker request.
fn recv_request(stream: &mut UnixStream) -> io::Result<PvfPrepData> {
let pvf = framed_recv_blocking(stream)?;
let pvf = PvfPrepData::decode(&mut &pvf[..]).map_err(|e| {
Expand All @@ -104,6 +105,7 @@ fn recv_request(stream: &mut UnixStream) -> io::Result<PvfPrepData> {
Ok(pvf)
}

/// Send a worker response.
fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<()> {
framed_send_blocking(stream, &result.encode())
}
Expand All @@ -124,7 +126,11 @@ fn start_memory_tracking(fd: RawFd, limit: Option<isize>) {
// Syscalls never allocate or deallocate, so this is safe.
libc::syscall(libc::SYS_write, fd, OOM_PAYLOAD.as_ptr(), OOM_PAYLOAD.len());
libc::syscall(libc::SYS_close, fd);
libc::syscall(libc::SYS_exit, 1);
// Make sure we exit from all threads. Copied from libc.
libc::syscall(libc::SYS_exit_group, 1);
loop {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RETURN VALUE top
This system call does not return.

So, what's the point of the loop? I don't think it ever gets executed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We noticed it too, seems the documentation is simply wrong or incomplete. From @s0me0ne-unkn0wn:

According to docs, single exit_group should fix it all and never return, but it didn't. And it's followed by exit in two separate libc implementations so they probably know something we don't know. Maybe exit_group docs are out of date.
The primary issue, however, as I see it, is that we were issuing exit syscall from a thread while the process main thread was blocked in the futex syscall which exit is supposed to clean up. I didn't dig deeper to the exact point where it deadlocks but seems like a fair hypothesis. And exit_group forces all the threads to exit, so, I suppose, the main thread breaks out of futex syscall and everything shuts down as expected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexggh tbh, we've just copy-pasted the idea from musl: https://github.com/rofl0r/musl/blob/master/src/exit/_Exit.c
glibc behavior is similar: https://codebrowser.dev/glibc/glibc/sysdeps/unix/sysv/linux/_exit.c.html

I myself would like to understand why it is like it is, but I need a bit more spare time than I have to dig deeper into the kernel.

libc::syscall(libc::SYS_exit, 1);
}
}
#[cfg(not(target_os = "linux"))]
{
Expand All @@ -135,7 +141,7 @@ fn start_memory_tracking(fd: RawFd, limit: Option<isize>) {
// code is only run by a validator, it's a lesser evil.
libc::write(fd, OOM_PAYLOAD.as_ptr().cast(), OOM_PAYLOAD.len());
libc::close(fd);
std::process::exit(1);
libc::_exit(1);
}
})),
);
Expand Down Expand Up @@ -309,7 +315,7 @@ struct Response {
///
/// - `pvf`: `PvfPrepData` structure, containing data to prepare the artifact
///
/// - `pipe_write`: A `os_pipe::PipeWriter` structure, the writing end of a pipe.
/// - `pipe_write`: A `PipeWriter` structure, the writing end of a pipe.
///
/// - `preparation_timeout`: The timeout in `Duration`.
///
Expand All @@ -324,7 +330,7 @@ struct Response {
/// - If success, pipe back `Response`.
fn handle_child_process(
pvf: PvfPrepData,
pipe_write: os_pipe::PipeWriter,
mut pipe_write: PipeWriter,
preparation_timeout: Duration,
prepare_job_kind: PrepareJobKind,
executor_params: Arc<ExecutorParams>,
Expand Down Expand Up @@ -373,7 +379,7 @@ fn handle_child_process(
WaitOutcome::TimedOut,
)
.unwrap_or_else(|err| {
send_child_response(&pipe_write, Err(PrepareError::Panic(err.to_string())))
send_child_response(&mut pipe_write, Err(PrepareError::Panic(err.to_string())))
});

let prepare_thread = spawn_worker_thread(
Expand Down Expand Up @@ -403,7 +409,7 @@ fn handle_child_process(
WaitOutcome::Finished,
)
.unwrap_or_else(|err| {
send_child_response(&pipe_write, Err(PrepareError::IoErr(err.to_string())))
send_child_response(&mut pipe_write, Err(PrepareError::IoErr(err.to_string())))
});

let outcome = thread::wait_for_threads(condvar);
Expand All @@ -425,7 +431,7 @@ fn handle_child_process(

match prepare_thread.join().unwrap_or_else(|err| {
send_child_response(
&pipe_write,
&mut pipe_write,
Err(PrepareError::Panic(stringify_panic_payload(err))),
)
}) {
Expand Down Expand Up @@ -470,7 +476,7 @@ fn handle_child_process(
unreachable!("we run wait_while until the outcome is no longer pending; qed"),
};

send_child_response(&pipe_write, result);
send_child_response(&mut pipe_write, result);
}

/// Waits for child process to finish and handle child response from pipe.
Expand Down Expand Up @@ -498,18 +504,16 @@ fn handle_child_process(
///
/// - If the child process timeout, it returns `PrepareError::TimedOut`.
fn handle_parent_process(
mut pipe_read: os_pipe::PipeReader,
mut pipe_read: PipeReader,
child: Pid,
temp_artifact_dest: PathBuf,
worker_pid: u32,
usage_before: Usage,
timeout: Duration,
) -> Result<PrepareStats, PrepareError> {
// Read from the child.
let mut received_data = Vec::new();
pipe_read
.read_to_end(&mut received_data)
.map_err(|err| PrepareError::IoErr(err.to_string()))?;
let result =
recv_child_response(&mut pipe_read).map_err(|err| PrepareError::IoErr(err.to_string()))?;

let status = nix::sys::wait::waitpid(child, None);
gum::trace!(
Expand Down Expand Up @@ -540,11 +544,7 @@ fn handle_parent_process(
}

match status {
Ok(WaitStatus::Exited(_pid, libc::EXIT_SUCCESS)) => {
let result: Result<Response, PrepareError> =
Result::decode(&mut received_data.as_slice())
// There is either a bug or the job was hijacked.
.map_err(|err| PrepareError::IoErr(err.to_string()))?;
Ok(WaitStatus::Exited(_pid, _exit_status)) => {
match result {
Err(err) => Err(err),
Ok(response) => {
Expand Down Expand Up @@ -607,16 +607,26 @@ fn get_total_cpu_usage(rusage: Usage) -> Duration {
return Duration::from_micros(micros)
}

/// Write response to the pipe and exit process after.
/// Get a job response.
fn recv_child_response(pipe_read: &mut PipeReader) -> io::Result<JobResponse> {
let response_bytes = framed_recv_blocking(pipe_read)?;
JobResponse::decode(&mut response_bytes.as_slice()).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("prepare pvf recv_child_response: decode error: {:?}", e),
)
})
}

/// Write a job response to the pipe and exit process after.
///
/// # Arguments
///
/// - `pipe_write`: A `os_pipe::PipeWriter` structure, the writing end of a pipe.
/// - `pipe_write`: A `PipeWriter` structure, the writing end of a pipe.
///
/// - `response`: Child process response
fn send_child_response(mut pipe_write: &PipeWriter, response: JobResponse) -> ! {
pipe_write
.write_all(response.encode().as_slice())
fn send_child_response(pipe_write: &mut PipeWriter, response: JobResponse) -> ! {
framed_send_blocking(pipe_write, response.encode().as_slice())
.unwrap_or_else(|_| process::exit(libc::EXIT_FAILURE));

if response.is_ok() {
Expand All @@ -638,8 +648,9 @@ const OOM_PAYLOAD: &[u8] = b"\x02\x00\x00\x00\x00\x00\x00\x00\x01\x08";
#[test]
fn pre_encoded_payloads() {
// NOTE: This must match the type of `response` in `send_child_response`.
let unencoded: JobResponse = Result::Err(PrepareError::OutOfMemory);
let oom_encoded = unencoded.encode();
let oom_unencoded: JobResponse = Result::Err(PrepareError::OutOfMemory);
let oom_encoded = oom_unencoded.encode();
// The payload is prefixed with its length in `framed_send`.
let mut oom_payload = oom_encoded.len().to_le_bytes().to_vec();
oom_payload.extend(oom_encoded);
assert_eq!(oom_payload, OOM_PAYLOAD);
Expand Down
Loading