Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Check spawned worker version vs node version before PVF preparation #6861

Merged
merged 20 commits into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
df6c683
Check spawned worker version vs node version before PVF preparation
s0me0ne-unkn0wn Mar 12, 2023
335495b
Address discussions
s0me0ne-unkn0wn Mar 12, 2023
b96cc31
Propagate errors and shutdown preparation and execution pipelines pro…
s0me0ne-unkn0wn Mar 13, 2023
54b11c6
Add logs; Fix execution worker checks
s0me0ne-unkn0wn Mar 14, 2023
3cd790f
Revert "Propagate errors and shutdown preparation and execution pipel…
s0me0ne-unkn0wn Mar 14, 2023
acc94f7
Don't try to shut down; report the condition and exit worker
s0me0ne-unkn0wn Mar 15, 2023
0db3ed3
Merge branch 's0me0ne/worker-version-v2' into s0me0ne/worker-version
s0me0ne-unkn0wn Mar 15, 2023
89c7897
Get rid of `VersionMismatch` preparation error
s0me0ne-unkn0wn Mar 15, 2023
fb5ccc9
Merge master
s0me0ne-unkn0wn Mar 15, 2023
db26c64
Add docs; Fix tests
s0me0ne-unkn0wn Mar 15, 2023
5a344de
Merge branch 'master' into s0me0ne/worker-version
s0me0ne-unkn0wn Mar 15, 2023
b696baa
Update Cargo.lock
s0me0ne-unkn0wn Mar 15, 2023
501d4be
Kill again, but only the main node process
s0me0ne-unkn0wn Mar 18, 2023
a42d0a5
Merge branch 'master' into s0me0ne/worker-version
s0me0ne-unkn0wn Mar 20, 2023
1b4678b
Move unsafe code to a common safe function
s0me0ne-unkn0wn Mar 20, 2023
ec9c4ca
Fix libc dependency error on MacOS
mrcnski Mar 20, 2023
0bd760f
pvf spawning: Add some logging, add a small integration test
mrcnski Mar 22, 2023
a9562c3
Merge branch 'master' into s0me0ne/worker-version
s0me0ne-unkn0wn Mar 29, 2023
0ecce2e
Minor fixes
s0me0ne-unkn0wn Mar 29, 2023
456314c
Restart CI
s0me0ne-unkn0wn Mar 29, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ pub enum Subcommand {
#[derive(Debug, Parser)]
pub struct ValidationWorkerCommand {
/// The path to the validation host's socket.
#[arg(long)]
pub socket_path: String,
/// Calling node implementation version
#[arg(long)]
pub node_impl_version: String,
}

#[allow(missing_docs)]
Expand Down
10 changes: 8 additions & 2 deletions cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,10 @@ pub fn run() -> Result<()> {

#[cfg(not(target_os = "android"))]
{
polkadot_node_core_pvf::prepare_worker_entrypoint(&cmd.socket_path);
polkadot_node_core_pvf::prepare_worker_entrypoint(
&cmd.socket_path,
Some(&cmd.node_impl_version),
);
Ok(())
}
},
Expand All @@ -513,7 +516,10 @@ pub fn run() -> Result<()> {

#[cfg(not(target_os = "android"))]
{
polkadot_node_core_pvf::execute_worker_entrypoint(&cmd.socket_path);
polkadot_node_core_pvf::execute_worker_entrypoint(
&cmd.socket_path,
Some(&cmd.node_impl_version),
);
Ok(())
}
},
Expand Down
5 changes: 4 additions & 1 deletion node/core/pvf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ cpu-time = "1.0.0"
futures = "0.3.21"
futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../../gum" }
libc = "0.2.139"
pin-project = "1.0.9"
rand = "0.8.5"
rayon = "1.5.1"
Expand All @@ -41,8 +42,10 @@ sp-wasm-interface = { git = "https://github.com/paritytech/substrate", branch =
sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }

[build-dependencies]
substrate-build-script-utils = { git = "https://github.com/paritytech/substrate", branch = "master" }

[target.'cfg(target_os = "linux")'.dependencies]
libc = "0.2.139"
tikv-jemalloc-ctl = "0.5.0"

[dev-dependencies]
Expand Down
19 changes: 19 additions & 0 deletions node/core/pvf/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2017-2023 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

fn main() {
substrate_build_script_utils::generate_cargo_keys();
}
34 changes: 26 additions & 8 deletions node/core/pvf/src/execute/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,13 @@ pub async fn spawn(
executor_params: ExecutorParams,
spawn_timeout: Duration,
) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
let (mut idle_worker, worker_handle) =
spawn_with_program_path("execute", program_path, &["execute-worker"], spawn_timeout)
.await?;
let (mut idle_worker, worker_handle) = spawn_with_program_path(
"execute",
program_path,
&["execute-worker", "--node-impl-version", env!("SUBSTRATE_CLI_IMPL_VERSION")],
spawn_timeout,
)
.await?;
send_handshake(&mut idle_worker.stream, Handshake { executor_params })
.await
.map_err(|error| {
Expand Down Expand Up @@ -260,11 +264,25 @@ impl Response {
}

/// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies
/// the path to the socket used to communicate with the host.
pub fn worker_entrypoint(socket_path: &str) {
/// the path to the socket used to communicate with the host. The `node_version`, if `Some`,
/// is checked against the worker version. A mismatch results in immediate worker termination.
/// `None` is used for tests and in other situations when version check is not necessary.
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
s0me0ne-unkn0wn marked this conversation as resolved.
Show resolved Hide resolved
worker_event_loop("execute", socket_path, |rt_handle, mut stream| async move {
let handshake = recv_handshake(&mut stream).await?;
let worker_pid = std::process::id();
if let Some(version) = node_version {
if version != env!("SUBSTRATE_CLI_IMPL_VERSION") {
gum::error!(
target: LOG_TARGET,
%worker_pid,
"Node and worker version mismatch, node needs restarting, forcing shutdown",
);
crate::kill_parent_node_in_emergency();
return Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch"))
}
}

let handshake = recv_handshake(&mut stream).await?;
let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e))
})?);
Expand All @@ -273,7 +291,7 @@ pub fn worker_entrypoint(socket_path: &str) {
let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?;
gum::debug!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
%worker_pid,
"worker: validating artifact {}",
artifact_path.display(),
);
Expand Down Expand Up @@ -307,7 +325,7 @@ pub fn worker_entrypoint(socket_path: &str) {
// Log if we exceed the timeout and the other thread hasn't finished.
gum::warn!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
%worker_pid,
"execute job took {}ms cpu time, exceeded execute timeout {}ms",
cpu_time_elapsed.as_millis(),
execution_timeout.as_millis(),
Expand Down
1 change: 1 addition & 0 deletions node/core/pvf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ pub use pvf::PvfPrepData;

pub use host::{start, Config, ValidationHost};
pub use metrics::Metrics;
pub(crate) use worker_common::kill_parent_node_in_emergency;
pub use worker_common::JOB_TIMEOUT_WALL_CLOCK_FACTOR;

pub use execute::worker_entrypoint as execute_worker_entrypoint;
Expand Down
28 changes: 24 additions & 4 deletions node/core/pvf/src/prepare/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@ pub async fn spawn(
program_path: &Path,
spawn_timeout: Duration,
) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
spawn_with_program_path("prepare", program_path, &["prepare-worker"], spawn_timeout).await
spawn_with_program_path(
"prepare",
program_path,
&["prepare-worker", "--node-impl-version", env!("SUBSTRATE_CLI_IMPL_VERSION")],
spawn_timeout,
)
.await
s0me0ne-unkn0wn marked this conversation as resolved.
Show resolved Hide resolved
}

pub enum Outcome {
Expand Down Expand Up @@ -321,7 +327,9 @@ async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result<PrepareR
}

/// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies
/// the path to the socket used to communicate with the host.
/// the path to the socket used to communicate with the host. The `node_version`, if `Some`,
/// is checked against the worker version. A mismatch results in immediate worker termination.
Copy link
Contributor

@mrcnski mrcnski Mar 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can this mismatch occur? (This is a question a reader would have.) This should be thoroughly documented, maybe in its own e.g. # Version Mismatch section.

/// `None` is used for tests and in other situations when version check is not necessary.
///
/// # Flow
///
Expand All @@ -342,10 +350,22 @@ async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result<PrepareR
///
/// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we
/// send that in the `PrepareResult`.
pub fn worker_entrypoint(socket_path: &str) {
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move {
let worker_pid = std::process::id();
s0me0ne-unkn0wn marked this conversation as resolved.
Show resolved Hide resolved
if let Some(version) = node_version {
if version != env!("SUBSTRATE_CLI_IMPL_VERSION") {
gum::error!(
target: LOG_TARGET,
%worker_pid,
"Node and worker version mismatch, node needs restarting, forcing shutdown",
);
crate::kill_parent_node_in_emergency();
return Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch"))
}
}

loop {
let worker_pid = std::process::id();
let (pvf, dest) = recv_request(&mut stream).await?;
gum::debug!(
target: LOG_TARGET,
Expand Down
22 changes: 17 additions & 5 deletions node/core/pvf/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,22 +61,34 @@ macro_rules! decl_puppet_worker_main {
$crate::sp_tracing::try_init_simple();

let args = std::env::args().collect::<Vec<_>>();
if args.len() < 2 {
if args.len() < 3 {
panic!("wrong number of arguments");
}

let mut version = None;
let mut socket_path: &str = "";

for i in 2..args.len() {
match args[i].as_ref() {
"--socket-path" => socket_path = args[i + 1].as_str(),
"--node-version" => version = Some(args[i + 1].as_str()),
_ => (),
}
}

let subcommand = &args[1];
match subcommand.as_ref() {
"exit" => {
std::process::exit(1);
},
"sleep" => {
std::thread::sleep(std::time::Duration::from_secs(5));
},
"prepare-worker" => {
let socket_path = &args[2];
$crate::prepare_worker_entrypoint(socket_path);
$crate::prepare_worker_entrypoint(&socket_path, version);
},
"execute-worker" => {
let socket_path = &args[2];
$crate::execute_worker_entrypoint(socket_path);
$crate::execute_worker_entrypoint(&socket_path, version);
},
other => panic!("unknown subcommand: {}", other),
}
Expand Down
43 changes: 41 additions & 2 deletions node/core/pvf/src/worker_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,21 @@ pub async fn spawn_with_program_path(
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
"cannot bind unix socket: {:?}",
err,
);
SpawnErr::Bind
})?;

let handle =
WorkerHandle::spawn(program_path, extra_args, socket_path).map_err(|err| {
WorkerHandle::spawn(&program_path, extra_args, socket_path).map_err(|err| {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
"cannot spawn a worker: {:?}",
err,
);
Expand All @@ -84,6 +88,8 @@ pub async fn spawn_with_program_path(
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
"cannot accept a worker: {:?}",
err,
);
Expand All @@ -92,6 +98,14 @@ pub async fn spawn_with_program_path(
Ok((IdleWorker { stream, pid: handle.id() }, handle))
}
_ = Delay::new(spawn_timeout).fuse() => {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
?spawn_timeout,
"spawning and connecting to socket timed out",
);
Err(SpawnErr::AcceptTimeout)
}
}
Expand Down Expand Up @@ -162,6 +176,13 @@ where
F: FnMut(Handle, UnixStream) -> Fut,
Fut: futures::Future<Output = io::Result<Never>>,
{
gum::debug!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"starting pvf worker ({})",
debug_id,
);

let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it.");
let handle = rt.handle();
let err = rt
Expand All @@ -179,7 +200,7 @@ where
gum::debug!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"pvf worker ({}): {:?}",
"quitting pvf worker ({}): {:?}",
debug_id,
err,
);
Expand Down Expand Up @@ -280,6 +301,7 @@ impl WorkerHandle {
) -> io::Result<Self> {
let mut child = process::Command::new(program.as_ref())
.args(extra_args)
.arg("--socket-path")
.arg(socket_path.as_ref().as_os_str())
.stdout(std::process::Stdio::piped())
.kill_on_drop(true)
Expand Down Expand Up @@ -393,3 +415,20 @@ pub async fn framed_recv(r: &mut (impl AsyncRead + Unpin)) -> io::Result<Vec<u8>
r.read_exact(&mut buf).await?;
Ok(buf)
}

/// In case of node and worker version mismatch (as a result of in-place upgrade), send `SIGKILL`
/// to the node to tear it down and prevent it from raising disputes on valid candidates. Node
/// restart should be handled by the node owner. As node exits, unix sockets opened to workers
/// get closed by the OS and other workers receive error on socket read and also exit. Preparation
/// jobs are written to the temporary files that are renamed to real artifacts on the node side, so
/// no leftover artifacts are possible.
pub(crate) fn kill_parent_node_in_emergency() {
unsafe {
// SAFETY: `getpid()` never fails but may return "no-parent" (0) or "parent-init" (1) in
// some corner cases, which is checked. `kill()` never fails.
let ppid = libc::getppid();
if ppid > 1 {
libc::kill(ppid, libc::SIGKILL);
}
}
}
9 changes: 9 additions & 0 deletions node/core/pvf/tests/it/worker_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ use crate::PUPPET_EXE;
use polkadot_node_core_pvf::testing::worker_common::{spawn_with_program_path, SpawnErr};
use std::time::Duration;

// Test spawning a program that immediately exits with a failure code.
#[tokio::test]
async fn spawn_immediate_exit() {
let result =
spawn_with_program_path("integration-test", PUPPET_EXE, &["exit"], Duration::from_secs(2))
.await;
assert!(matches!(result, Err(SpawnErr::AcceptTimeout)));
}

#[tokio::test]
async fn spawn_timeout() {
let result =
Expand Down
4 changes: 2 additions & 2 deletions node/malus/src/malus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl MalusCli {

#[cfg(not(target_os = "android"))]
{
polkadot_node_core_pvf::prepare_worker_entrypoint(&cmd.socket_path);
polkadot_node_core_pvf::prepare_worker_entrypoint(&cmd.socket_path, None);
}
},
NemesisVariant::PvfExecuteWorker(cmd) => {
Expand All @@ -108,7 +108,7 @@ impl MalusCli {

#[cfg(not(target_os = "android"))]
{
polkadot_node_core_pvf::execute_worker_entrypoint(&cmd.socket_path);
polkadot_node_core_pvf::execute_worker_entrypoint(&cmd.socket_path, None);
}
},
}
Expand Down