Skip to content

Commit

Permalink
Add configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreiEres committed Jun 5, 2024
1 parent c519c51 commit a7e043b
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 33 deletions.
31 changes: 29 additions & 2 deletions polkadot/cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

pub use polkadot_node_primitives::NODE_VERSION;

use clap::Parser;
use clap::{Parser, ValueEnum};
use std::path::PathBuf;

#[allow(missing_docs)]
Expand Down Expand Up @@ -136,23 +136,50 @@ pub struct RunCmd {
/// **Dangerous!** Do not touch unless explicitly advised to.
#[arg(long)]
pub execute_workers_max_num: Option<usize>,

/// Override the maximum number of pvf workers that can be spawned in the pvf prepare
/// pool for tasks with the priority below critical.
///
/// **Dangerous!** Do not touch unless explicitly advised to.

#[arg(long)]
pub prepare_workers_soft_max_num: Option<usize>,

/// Override the absolute number of pvf workers that can be spawned in the pvf prepare pool.
///
/// **Dangerous!** Do not touch unless explicitly advised to.
#[arg(long)]
pub prepare_workers_hard_max_num: Option<usize>,

/// The strategy we use to cleanup pvf workers artifacts
///
/// **Dangerous!** Do not touch unless explicitly advised to.
#[arg(long, value_name = "STRATEGY", value_enum, default_value_t = WorkersCleanupMode::Size)]
pub workers_cleanup: WorkersCleanupMode,

/// TESTING ONLY: disable the version check between nodes and workers.
#[arg(long, hide = true)]
pub disable_worker_version_check: bool,
}

/// How we cleanup pvf workers artefacts
#[derive(Debug, Clone, ValueEnum)]
#[value(rename_all = "kebab-case")]
pub enum WorkersCleanupMode {
/// Invalidate least used artifact when cache size is more that 10 GB
Size,
/// Invalidate artifactes used more than 24 hours ago
Time,
}

impl From<WorkersCleanupMode> for service::WorkersCleanupMode {
fn from(cleanup: WorkersCleanupMode) -> Self {
match cleanup {
WorkersCleanupMode::Time => service::WorkersCleanupMode::Time,
WorkersCleanupMode::Size => service::WorkersCleanupMode::Size,
}
}
}

#[allow(missing_docs)]
#[derive(Debug, Parser)]
pub struct Cli {
Expand Down
1 change: 1 addition & 0 deletions polkadot/cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ where
execute_workers_max_num: cli.run.execute_workers_max_num,
prepare_workers_hard_max_num: cli.run.prepare_workers_hard_max_num,
prepare_workers_soft_max_num: cli.run.prepare_workers_soft_max_num,
workers_cleanup: cli.run.workers_cleanup.into(),
},
)
.map(|full| full.task_manager)?;
Expand Down
17 changes: 16 additions & 1 deletion polkadot/node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

use polkadot_node_core_pvf::{
InternalValidationError, InvalidCandidate as WasmInvalidCandidate, PossiblyInvalidError,
PrepareError, PrepareJobKind, PvfPrepData, ValidationError, ValidationHost,
PrepareError, PrepareJobKind, PvfPrepData, ValidationError, ValidationHost, WorkersCleanup,
};
use polkadot_node_primitives::{
BlockData, InvalidCandidate, PoV, ValidationResult, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT,
Expand Down Expand Up @@ -107,6 +107,15 @@ pub struct Config {
pub pvf_prepare_workers_soft_max_num: usize,
/// The absolute number of pvf workers that can be spawned in the pvf prepare pool.
pub pvf_prepare_workers_hard_max_num: usize,
/// The strategy we use to cleanup artifacts
pub pvf_workers_cleanup_mode: WorkersCleanupMode,
}

#[allow(missing_docs)]
#[derive(Debug, Clone)]
pub enum WorkersCleanupMode {
Time,
Size,
}

/// The candidate validation subsystem.
Expand Down Expand Up @@ -234,8 +243,13 @@ async fn run<Context>(
pvf_execute_workers_max_num,
pvf_prepare_workers_soft_max_num,
pvf_prepare_workers_hard_max_num,
pvf_workers_cleanup_mode,
}: Config,
) -> SubsystemResult<()> {
let workers_cleanup = match pvf_workers_cleanup_mode {
WorkersCleanupMode::Time => WorkersCleanup::by_time(),
WorkersCleanupMode::Size => WorkersCleanup::by_size(),
};
let (validation_host, task) = polkadot_node_core_pvf::start(
polkadot_node_core_pvf::Config::new(
artifacts_cache_path,
Expand All @@ -246,6 +260,7 @@ async fn run<Context>(
pvf_execute_workers_max_num,
pvf_prepare_workers_soft_max_num,
pvf_prepare_workers_hard_max_num,
workers_cleanup,
),
pvf_metrics,
)
Expand Down
2 changes: 2 additions & 0 deletions polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use criterion::{criterion_group, criterion_main, BatchSize, Criterion, SamplingMode};
use polkadot_node_core_pvf::{
start, testing, Config, Metrics, PrepareError, PrepareJobKind, PvfPrepData, ValidationHost,
WorkersCleanup,
};
use polkadot_primitives::ExecutorParams;
use rococo_runtime::WASM_BINARY;
Expand Down Expand Up @@ -51,6 +52,7 @@ impl TestHost {
2,
1,
2,
WorkersCleanup::by_size(),
);
f(&mut config);
let (host, task) = start(config, Metrics::default()).await.unwrap();
Expand Down
33 changes: 13 additions & 20 deletions polkadot/node/core/pvf/src/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@
//! older by a predefined parameter. This process is run very rarely (say, once a day). Once the
//! artifact is expired it is removed from disk eagerly atomically.

use crate::{host::PrecheckResultSender, worker_interface::WORKER_DIR_PREFIX};
use crate::{
host::{PrecheckResultSender, WorkersCleanup},
worker_interface::WORKER_DIR_PREFIX,
};
use always_assert::always;
use polkadot_node_core_pvf_common::{error::PrepareError, prepare::PrepareStats, pvf::PvfPrepData};
use polkadot_parachain_primitives::primitives::ValidationCodeHash;
Expand All @@ -63,7 +66,7 @@ use std::{
collections::HashMap,
fs,
path::{Path, PathBuf},
time::{Duration, SystemTime},
time::SystemTime,
};

/// The extension to use for cached artifacts.
Expand Down Expand Up @@ -171,16 +174,6 @@ pub struct Artifacts {
inner: HashMap<ArtifactId, ArtifactState>,
}

/// A condition which we use to cleanup artifacts
#[derive(Debug)]
pub enum CleanupBy {
// Inactive time after which artefact is deleted
Time(Duration),
// Max size in bytes. Reaching it the least used artefacts are deleted
#[allow(dead_code)]
Size(u64),
}

impl Artifacts {
#[cfg(test)]
pub(crate) fn empty() -> Self {
Expand Down Expand Up @@ -274,11 +267,11 @@ impl Artifacts {

/// Remove artifacts older than the given TTL or the total artifacts size limit and return id
/// and path of the removed ones.
pub fn prune(&mut self, cleanup_by: &CleanupBy) -> Vec<(ArtifactId, PathBuf)> {
pub fn prune(&mut self, cleanup: &WorkersCleanup) -> Vec<(ArtifactId, PathBuf)> {
let mut to_remove = vec![];

match cleanup_by {
CleanupBy::Time(artifact_ttl) => {
match cleanup {
WorkersCleanup::ByTime(artifact_ttl) => {
let now = SystemTime::now();
for (k, v) in self.inner.iter() {
if let ArtifactState::Prepared { last_time_needed, ref path, .. } = *v {
Expand All @@ -292,7 +285,7 @@ impl Artifacts {
}
}
},
CleanupBy::Size(size_limit) => {
WorkersCleanup::BySize(size_limit) => {
let mut total_size = 0;
let mut artifact_sizes = vec![];

Expand Down Expand Up @@ -324,9 +317,9 @@ impl Artifacts {

#[cfg(test)]
mod tests {
use crate::testing::artifact_id;

use super::*;
use crate::testing::artifact_id;
use std::time::Duration;

#[tokio::test]
async fn cache_cleared_on_startup() {
Expand Down Expand Up @@ -393,7 +386,7 @@ mod tests {
PrepareStats::default(),
);

let pruned = artifacts.prune(&CleanupBy::Time(Duration::from_secs(9)));
let pruned = artifacts.prune(&WorkersCleanup::ByTime(Duration::from_secs(9)));

assert!(artifacts.artifact_ids().contains(&artifact_id1));
assert!(!pruned.contains(&(artifact_id1, path1)));
Expand Down Expand Up @@ -440,7 +433,7 @@ mod tests {
PrepareStats::default(),
);

let pruned = artifacts.prune(&CleanupBy::Size(1500));
let pruned = artifacts.prune(&WorkersCleanup::BySize(1500));

assert!(artifacts.artifact_ids().contains(&artifact_id1));
assert!(!pruned.contains(&(artifact_id1, path1)));
Expand Down
40 changes: 32 additions & 8 deletions polkadot/node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//! [`ValidationHost`], that allows communication with that event-loop.

use crate::{
artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts, CleanupBy},
artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts},
execute::{self, PendingExecutionRequest},
metrics::Metrics,
prepare, Priority, SecurityStatus, ValidationError, LOG_TARGET,
Expand Down Expand Up @@ -178,6 +178,8 @@ pub struct Config {
pub execute_worker_spawn_timeout: Duration,
/// The maximum number of execute workers that can run at the same time.
pub execute_workers_max_num: usize,
/// The strategy we use to cleanup artifacts
pub workers_cleanup: WorkersCleanup,
}

impl Config {
Expand All @@ -191,6 +193,7 @@ impl Config {
execute_workers_max_num: usize,
prepare_workers_soft_max_num: usize,
prepare_workers_hard_max_num: usize,
workers_cleanup: WorkersCleanup,
) -> Self {
Self {
cache_path,
Expand All @@ -205,10 +208,31 @@ impl Config {
execute_worker_program_path,
execute_worker_spawn_timeout: Duration::from_secs(3),
execute_workers_max_num,
workers_cleanup,
}
}
}

/// A condition which we use to cleanup artifacts
#[derive(Debug)]
pub enum WorkersCleanup {
// Inactive time after which artefact is deleted
ByTime(Duration),
// Max size in bytes. Reaching it the least used artefacts are deleted
#[allow(dead_code)]
BySize(u64),
}

impl WorkersCleanup {
pub fn by_time() -> Self {
Self::ByTime(Duration::from_secs(24 * 60 * 60)) // 24 hours
}

pub fn by_size() -> Self {
Self::BySize(10 * 1024 * 1024 * 1024) // 10 GB
}
}

/// Start the validation host.
///
/// Returns a [handle][`ValidationHost`] to the started validation host and the future. The future
Expand Down Expand Up @@ -293,7 +317,7 @@ pub async fn start(
let run_host = async move {
run(Inner {
cleanup_pulse_interval: Duration::from_secs(3600),
cleanup_by: CleanupBy::Time(Duration::from_secs(3600 * 24)),
cleanup: WorkersCleanup::ByTime(Duration::from_secs(3600 * 24)),
artifacts,
to_host_rx,
to_prepare_queue_tx,
Expand Down Expand Up @@ -337,7 +361,7 @@ impl AwaitingPrepare {

struct Inner {
cleanup_pulse_interval: Duration,
cleanup_by: CleanupBy,
cleanup: WorkersCleanup,
artifacts: Artifacts,

to_host_rx: mpsc::Receiver<ToHost>,
Expand All @@ -359,7 +383,7 @@ struct Fatal;
async fn run(
Inner {
cleanup_pulse_interval,
cleanup_by,
cleanup,
mut artifacts,
to_host_rx,
from_prepare_queue_rx,
Expand Down Expand Up @@ -415,7 +439,7 @@ async fn run(
break_if_fatal!(handle_cleanup_pulse(
&mut to_sweeper_tx,
&mut artifacts,
&cleanup_by,
&cleanup,
).await);
},
to_host = to_host_rx.next() => {
Expand Down Expand Up @@ -863,9 +887,9 @@ async fn enqueue_prepare_for_execute(
async fn handle_cleanup_pulse(
sweeper_tx: &mut mpsc::Sender<PathBuf>,
artifacts: &mut Artifacts,
cleanup_by: &CleanupBy,
cleanup: &WorkersCleanup,
) -> Result<(), Fatal> {
let to_remove = artifacts.prune(cleanup_by);
let to_remove = artifacts.prune(cleanup);
gum::debug!(
target: LOG_TARGET,
"PVF pruning: {} artifacts reached their end of life",
Expand Down Expand Up @@ -1031,7 +1055,7 @@ pub(crate) mod tests {

let run = run(Inner {
cleanup_pulse_interval,
cleanup_by: CleanupBy::Time(artifact_ttl),
cleanup: WorkersCleanup::ByTime(artifact_ttl),
artifacts,
to_host_rx,
to_prepare_queue_tx,
Expand Down
2 changes: 1 addition & 1 deletion polkadot/node/core/pvf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub mod testing;

pub use error::{InvalidCandidate, PossiblyInvalidError, ValidationError};
pub use host::{
start, Config, ValidationHost, EXECUTE_BINARY_NAME, HOST_MESSAGE_QUEUE_SIZE,
start, Config, ValidationHost, WorkersCleanup, EXECUTE_BINARY_NAME, HOST_MESSAGE_QUEUE_SIZE,
PREPARE_BINARY_NAME,
};
pub use metrics::Metrics;
Expand Down
3 changes: 2 additions & 1 deletion polkadot/node/core/pvf/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use polkadot_node_core_pvf::SecurityStatus;
use polkadot_node_core_pvf::{
start, testing::build_workers_and_get_paths, Config, InvalidCandidate, Metrics,
PossiblyInvalidError, PrepareError, PrepareJobKind, PvfPrepData, ValidationError,
ValidationHost, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
ValidationHost, WorkersCleanup, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
};
use polkadot_parachain_primitives::primitives::{BlockData, ValidationParams, ValidationResult};
use polkadot_primitives::{ExecutorParam, ExecutorParams, PvfExecKind, PvfPrepKind};
Expand Down Expand Up @@ -66,6 +66,7 @@ impl TestHost {
2,
1,
2,
WorkersCleanup::by_size(),
);
f(&mut config);
let (host, task) = start(config, Metrics::default()).await.unwrap();
Expand Down
5 changes: 5 additions & 0 deletions polkadot/node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use polkadot_node_subsystem_util::database::Database;

#[cfg(feature = "full-node")]
pub use {
polkadot_node_core_candidate_validation::WorkersCleanupMode,
polkadot_overseer::{Handle, Overseer, OverseerConnector, OverseerHandle},
polkadot_primitives::runtime_api::ParachainHost,
relay_chain_selection::SelectRelayChain,
Expand Down Expand Up @@ -654,6 +655,8 @@ pub struct NewFullParams<OverseerGenerator: OverseerGen> {
pub prepare_workers_soft_max_num: Option<usize>,
/// An optional absolute number of pvf workers that can be spawned in the pvf prepare pool.
pub prepare_workers_hard_max_num: Option<usize>,
/// The strategy we use to cleanup pvf workers artifacts
pub workers_cleanup: WorkersCleanupMode,
pub overseer_gen: OverseerGenerator,
pub overseer_message_channel_capacity_override: Option<usize>,
#[allow(dead_code)]
Expand Down Expand Up @@ -752,6 +755,7 @@ pub fn new_full<
execute_workers_max_num,
prepare_workers_soft_max_num,
prepare_workers_hard_max_num,
workers_cleanup,
}: NewFullParams<OverseerGenerator>,
) -> Result<NewFull, Error> {
use polkadot_availability_recovery::FETCH_CHUNKS_THRESHOLD;
Expand Down Expand Up @@ -971,6 +975,7 @@ pub fn new_full<
),
pvf_prepare_workers_soft_max_num: prepare_workers_soft_max_num.unwrap_or(1),
pvf_prepare_workers_hard_max_num: prepare_workers_hard_max_num.unwrap_or(2),
pvf_workers_cleanup_mode: workers_cleanup,
})
} else {
None
Expand Down

0 comments on commit a7e043b

Please sign in to comment.