Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Implement worker binary embedding and extraction
Browse files Browse the repository at this point in the history
  • Loading branch information
mrcnski committed Apr 24, 2023
1 parent 37920a1 commit aecc106
Show file tree
Hide file tree
Showing 19 changed files with 105 additions and 139 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 0 additions & 19 deletions cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,6 @@ pub enum Subcommand {
/// Revert the chain to a previous state.
Revert(sc_cli::RevertCmd),

#[allow(missing_docs)]
#[command(name = "prepare-worker", hide = true)]
PvfPrepareWorker(ValidationWorkerCommand),

#[allow(missing_docs)]
#[command(name = "execute-worker", hide = true)]
PvfExecuteWorker(ValidationWorkerCommand),

/// Sub-commands concerned with benchmarking.
/// The pallet benchmarking moved to the `pallet` sub-command.
#[command(subcommand)]
Expand All @@ -75,17 +67,6 @@ pub enum Subcommand {
ChainInfo(sc_cli::ChainInfoCmd),
}

#[allow(missing_docs)]
#[derive(Debug, Parser)]
pub struct ValidationWorkerCommand {
/// The path to the validation host's socket.
#[arg(long)]
pub socket_path: String,
/// Calling node implementation version
#[arg(long)]
pub node_impl_version: String,
}

#[allow(missing_docs)]
#[derive(Debug, Parser)]
#[group(skip)]
Expand Down
44 changes: 0 additions & 44 deletions cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,50 +479,6 @@ pub fn run() -> Result<()> {
))
})?)
},
Some(Subcommand::PvfPrepareWorker(cmd)) => {
let mut builder = sc_cli::LoggerBuilder::new("");
builder.with_colors(false);
let _ = builder.init();

#[cfg(target_os = "android")]
{
return Err(sc_cli::Error::Input(
"PVF preparation workers are not supported under this platform".into(),
)
.into())
}

#[cfg(not(target_os = "android"))]
{
polkadot_node_core_pvf_worker::prepare_worker_entrypoint(
&cmd.socket_path,
Some(&cmd.node_impl_version),
);
Ok(())
}
},
Some(Subcommand::PvfExecuteWorker(cmd)) => {
let mut builder = sc_cli::LoggerBuilder::new("");
builder.with_colors(false);
let _ = builder.init();

#[cfg(target_os = "android")]
{
return Err(sc_cli::Error::Input(
"PVF execution workers are not supported under this platform".into(),
)
.into())
}

#[cfg(not(target_os = "android"))]
{
polkadot_node_core_pvf_worker::execute_worker_entrypoint(
&cmd.socket_path,
Some(&cmd.node_impl_version),
);
Ok(())
}
},
Some(Subcommand::Benchmark(cmd)) => {
let runner = cli.create_runner(cmd)?;
let chain_spec = &runner.config().chain_spec;
Expand Down
10 changes: 7 additions & 3 deletions node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,11 @@ const DEFAULT_APPROVAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(12);
pub struct Config {
/// The path where candidate validation can store compiled artifacts for PVFs.
pub artifacts_cache_path: PathBuf,
/// The path to extract the PVF workers to, if `program_path` is `None`.
pub pvf_workers_path: PathBuf,
/// The path to the executable which can be used for spawning PVF compilation & validation
/// workers.
pub program_path: PathBuf,
pub program_path: Option<PathBuf>,
}

/// The candidate validation subsystem.
Expand Down Expand Up @@ -117,6 +119,7 @@ impl<Context> CandidateValidationSubsystem {
self.metrics,
self.pvf_metrics,
self.config.artifacts_cache_path,
self.config.pvf_workers_path,
self.config.program_path,
)
.map_err(|e| SubsystemError::with_origin("candidate-validation", e))
Expand All @@ -131,10 +134,11 @@ async fn run<Context>(
metrics: Metrics,
pvf_metrics: polkadot_node_core_pvf::Metrics,
cache_path: PathBuf,
program_path: PathBuf,
workers_path: PathBuf,
program_path: Option<PathBuf>,
) -> SubsystemResult<()> {
let (validation_host, task) = polkadot_node_core_pvf::start(
polkadot_node_core_pvf::Config::new(cache_path, program_path),
polkadot_node_core_pvf::Config::new(cache_path, workers_path, program_path),
pvf_metrics,
);
ctx.spawn_blocking("pvf-validation-host", task.boxed())?;
Expand Down
2 changes: 1 addition & 1 deletion node/core/pvf/src/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ mod tests {

#[tokio::test]
async fn artifacts_removes_cache_on_startup() {
let fake_cache_path = crate::worker_common::tmpfile("test-cache").await.unwrap();
let fake_cache_path = crate::worker_intf::tmpfile("test-cache").await.unwrap();
let fake_artifact_path = {
let mut p = fake_cache_path.clone();
p.push("wasmtime_0x1234567890123456789012345678901234567890123456789012345678901234");
Expand Down
2 changes: 1 addition & 1 deletion node/core/pvf/src/execute/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
artifacts::{ArtifactId, ArtifactPathId},
host::ResultSender,
metrics::Metrics,
worker_common::{IdleWorker, WorkerHandle},
worker_intf::{IdleWorker, WorkerHandle},
InvalidCandidate, ValidationError, LOG_TARGET,
};
use futures::{
Expand Down
4 changes: 2 additions & 2 deletions node/core/pvf/src/execute/worker_intf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use crate::{
artifacts::ArtifactPathId,
worker_common::{
worker_intf::{
framed_recv, framed_send, path_to_bytes, spawn_with_program_path, IdleWorker, SpawnErr,
WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
},
Expand All @@ -45,7 +45,7 @@ pub async fn spawn(
let (mut idle_worker, worker_handle) = spawn_with_program_path(
"execute",
program_path,
&["--node-impl-version", env!("SUBSTRATE_CLI_IMPL_VERSION")],
&["execute-worker", "--node-impl-version", env!("SUBSTRATE_CLI_IMPL_VERSION")],
spawn_timeout,
)
.await?;
Expand Down
62 changes: 58 additions & 4 deletions node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use std::{
path::{Path, PathBuf},
time::{Duration, SystemTime},
};
use tokio;

/// The time period after which a failed preparation artifact is considered ready to be retried.
/// Note that we will only retry if another request comes in after this cooldown has passed.
Expand All @@ -49,6 +50,15 @@ pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_millis(200);
/// The amount of times we will retry failed prepare jobs.
pub const NUM_PREPARE_RETRIES: u32 = 5;

// HACK: Getting the binary locations this way is a bit ugly but seems to work? Should eventually
// use something like wasm-builder: <https://github.com/paritytech/substrate/issues/13982>.
/// The prepare worker binary.
const PREPARE_EXE: &'static [u8] =
include_bytes!(concat!(env!("OUT_DIR"), "/../../../prepare_worker"));
/// The execute worker binary.
const EXECUTE_EXE: &'static [u8] =
include_bytes!(concat!(env!("OUT_DIR"), "/../../../execute_worker"));

/// An alias to not spell the type for the oneshot sender for the PVF execution result.
pub(crate) type ResultSender = oneshot::Sender<Result<ValidationResult, ValidationError>>;

Expand Down Expand Up @@ -140,6 +150,8 @@ struct ExecutePvfInputs {
pub struct Config {
/// The root directory where the prepared artifacts can be stored.
pub cache_path: PathBuf,
/// If we are using the embedded worker binaries, the directory where they are extracted to.
pub workers_path: Option<PathBuf>,
/// The path to the program that can be used to spawn the prepare workers.
pub prepare_worker_program_path: PathBuf,
/// The time allotted for a prepare worker to spawn and report to the host.
Expand All @@ -159,18 +171,28 @@ pub struct Config {

impl Config {
/// Create a new instance of the configuration.
///
/// The binary at `program_path` will be used if that is `Some`, otherwise the embedded workers
/// will be extracted to `workers_path` and used.
pub fn new(
cache_path: std::path::PathBuf,
prepare_worker_path: std::path::PathBuf,
execute_worker_path: std::path::PathBuf,
workers_path: std::path::PathBuf,
program_path: Option<std::path::PathBuf>,
) -> Self {
// Do not contaminate the other parts of the codebase with the types from `tokio`.
let cache_path = PathBuf::from(cache_path);
let prepare_worker_path = PathBuf::from(prepare_worker_path);
let execute_worker_path = PathBuf::from(execute_worker_path);

let (prepare_worker_path, execute_worker_path, workers_path) =
if let Some(path) = program_path {
let path = PathBuf::from(path);
(path.clone(), path, None)
} else {
(worker_path(&workers_path, "prepare"), worker_path(&workers_path, "execute"), Some(workers_path))
};

Self {
cache_path,
workers_path,
prepare_worker_program_path: prepare_worker_path.clone(),
prepare_worker_spawn_timeout: Duration::from_secs(3),
prepare_workers_soft_max_num: 1,
Expand Down Expand Up @@ -222,6 +244,17 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future<O
let run_sweeper = sweeper_task(to_sweeper_rx);

let run_host = async move {
if let Some(workers_path) = config.workers_path {
// Make sure that the workers path directory and all its parents are created.
// TODO?: First delete any existing binaries.
// let _ = tokio::fs::remove_dir_all(config.workers_path).await;
let _ = tokio::fs::create_dir_all(workers_path).await;
extract_worker_binaries(
&config.prepare_worker_program_path,
&config.execute_worker_program_path,
)
.await;
}
let artifacts = Artifacts::new(&config.cache_path).await;

run(Inner {
Expand Down Expand Up @@ -856,6 +889,27 @@ fn pulse_every(interval: std::time::Duration) -> impl futures::Stream<Item = ()>
.map(|_| ())
}

// TODO: Should we purge unneeded binaries?
/// Extracts the worker binaries embedded in this binary onto disk and return their paths.
async fn extract_worker_binaries(prepare_worker_path: &Path, execute_worker_path: &Path) {
// Skip extraction if the binaries are already present.
if !prepare_worker_path.exists() {
let _ = tokio::fs::write(prepare_worker_path, PREPARE_EXE).await;
}
if !execute_worker_path.exists() {
let _ = tokio::fs::write(execute_worker_path, EXECUTE_EXE).await;
}
}

/// Returns the expected path to this worker given the root of the cache.
///
/// Appends with the version (including the commit) to avoid conflicts with other versions of
/// polkadot running, i.e. in testnets.
fn worker_path(workers_path: &Path, job_kind: &str) -> PathBuf {
let file_name = format!("{}-worker_{}", job_kind, env!("SUBSTRATE_CLI_IMPL_VERSION"));
workers_path.join(file_name)
}

#[cfg(test)]
pub(crate) mod tests {
use super::*;
Expand Down
9 changes: 2 additions & 7 deletions node/core/pvf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ mod metrics;
mod prepare;
mod priority;
mod pvf;
mod worker_common;
mod worker_intf;

pub use artifacts::CompiledArtifact;
pub use error::{InvalidCandidate, PrepareError, PrepareResult, ValidationError};
Expand All @@ -109,11 +109,6 @@ pub use pvf::PvfPrepData;

pub use host::{start, Config, ValidationHost};
pub use metrics::Metrics;
pub use worker_common::{framed_recv, framed_send, JOB_TIMEOUT_WALL_CLOCK_FACTOR};
pub use worker_intf::{framed_recv, framed_send, JOB_TIMEOUT_WALL_CLOCK_FACTOR};

const LOG_TARGET: &str = "parachain::pvf";

#[doc(hidden)]
pub mod testing {
pub use crate::worker_common::{spawn_with_program_path, SpawnErr};
}
2 changes: 1 addition & 1 deletion node/core/pvf/src/prepare/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
error::{PrepareError, PrepareResult},
metrics::Metrics,
pvf::PvfPrepData,
worker_common::{IdleWorker, WorkerHandle},
worker_intf::{IdleWorker, WorkerHandle},
LOG_TARGET,
};
use always_assert::never;
Expand Down
2 changes: 1 addition & 1 deletion node/core/pvf/src/prepare/worker_intf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
metrics::Metrics,
prepare::PrepareStats,
pvf::PvfPrepData,
worker_common::{
worker_intf::{
framed_recv, framed_send, path_to_bytes, spawn_with_program_path, tmpfile_in, IdleWorker,
SpawnErr, WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
},
Expand Down
File renamed without changes.
11 changes: 0 additions & 11 deletions node/core/pvf/worker/.cargo/config.toml

This file was deleted.

7 changes: 0 additions & 7 deletions node/core/pvf/worker/.cargo/musl-g++

This file was deleted.

13 changes: 0 additions & 13 deletions node/core/pvf/worker/.cargo/musl-gcc

This file was deleted.

3 changes: 1 addition & 2 deletions node/core/pvf/worker/bin/execute_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,5 @@

//! Execute worker.
// TODO: Build with musl.
// TODO: Embed into polkadot binary.

polkadot_node_core_pvf_worker::decl_worker_main!(execute);
polkadot_node_core_pvf_worker::decl_worker_main!("execute-worker");
3 changes: 1 addition & 2 deletions node/core/pvf/worker/bin/prepare_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,5 @@

//! Prepare worker.
// TODO: Build with musl.
// TODO: Embed into polkadot binary.

polkadot_node_core_pvf_worker::decl_worker_main!(prepare);
polkadot_node_core_pvf_worker::decl_worker_main!("prepare-worker");
Loading

0 comments on commit aecc106

Please sign in to comment.