diff --git a/polkadot/node/core/pvf/common/src/prepare.rs b/polkadot/node/core/pvf/common/src/prepare.rs index 28ab682ec136..87c097ad7e39 100644 --- a/polkadot/node/core/pvf/common/src/prepare.rs +++ b/polkadot/node/core/pvf/common/src/prepare.rs @@ -31,6 +31,8 @@ pub struct PrepareWorkerSuccess { pub struct PrepareSuccess { /// Canonical path to the compiled artifact. pub path: PathBuf, + /// Size in bytes + pub size: u64, /// Stats of the current preparation run. pub stats: PrepareStats, } diff --git a/polkadot/node/core/pvf/src/artifacts.rs b/polkadot/node/core/pvf/src/artifacts.rs index a3a48b61acb1..119af34082a9 100644 --- a/polkadot/node/core/pvf/src/artifacts.rs +++ b/polkadot/node/core/pvf/src/artifacts.rs @@ -142,6 +142,8 @@ pub enum ArtifactState { /// This is updated when we get the heads up for this artifact or when we just discover /// this file. last_time_needed: SystemTime, + /// Size in bytes + size: u64, /// Stats produced by successful preparation. prepare_stats: PrepareStats, }, @@ -169,6 +171,33 @@ pub struct Artifacts { inner: HashMap, } +/// Parameters we use to cleanup artifacts +/// After we hit the cache limit we remove the least used artifacts +/// but only if they are stale more than minimum stale time +#[derive(Debug)] +pub struct ArtifactsCleanupConfig { + // Max size in bytes. Reaching it the least used artefacts are deleted + cache_limit: u64, + // Inactive time after which artefact is allowed to be deleted + min_stale_time: Duration, +} + +impl Default for ArtifactsCleanupConfig { + fn default() -> Self { + Self { + cache_limit: 10 * 1024 * 1024 * 1024, // 10 GiB + min_stale_time: Duration::from_secs(24 * 60 * 60), // 24 hours + } + } +} + +#[cfg(test)] +impl ArtifactsCleanupConfig { + pub fn new(cache_limit: u64, min_stale_time: Duration) -> Self { + Self { cache_limit, min_stale_time } + } +} + impl Artifacts { #[cfg(test)] pub(crate) fn empty() -> Self { @@ -180,6 +209,11 @@ impl Artifacts { self.inner.len() } + #[cfg(test)] + fn artifact_ids(&self) -> Vec { + self.inner.keys().cloned().collect() + } + /// Create an empty table and the cache directory on-disk if it doesn't exist. pub async fn new(cache_path: &Path) -> Self { // Make sure that the cache path directory and all its parents are created. @@ -234,12 +268,16 @@ impl Artifacts { artifact_id: ArtifactId, path: PathBuf, last_time_needed: SystemTime, + size: u64, prepare_stats: PrepareStats, ) { // See the precondition. always!(self .inner - .insert(artifact_id, ArtifactState::Prepared { path, last_time_needed, prepare_stats }) + .insert( + artifact_id, + ArtifactState::Prepared { path, last_time_needed, size, prepare_stats } + ) .is_none()); } @@ -251,25 +289,40 @@ impl Artifacts { }) } - /// Remove artifacts older than the given TTL and return id and path of the removed ones. - pub fn prune(&mut self, artifact_ttl: Duration) -> Vec<(ArtifactId, PathBuf)> { + /// Remove artifacts older than the given TTL when the total artifact size reaches the limit + /// and return id and path of the removed ones + pub fn prune(&mut self, cleanup_config: &ArtifactsCleanupConfig) -> Vec<(ArtifactId, PathBuf)> { + let mut to_remove = vec![]; let now = SystemTime::now(); - let mut to_remove = vec![]; + let mut total_size = 0; + let mut artifact_sizes = vec![]; + for (k, v) in self.inner.iter() { - if let ArtifactState::Prepared { last_time_needed, ref path, .. } = *v { - if now - .duration_since(last_time_needed) - .map(|age| age > artifact_ttl) - .unwrap_or(false) - { - to_remove.push((k.clone(), path.clone())); - } + if let ArtifactState::Prepared { ref path, last_time_needed, size, .. } = *v { + total_size += size; + artifact_sizes.push((k.clone(), path.clone(), size, last_time_needed)); } } + artifact_sizes + .sort_by_key(|&(_, _, _, last_time_needed)| std::cmp::Reverse(last_time_needed)); + + while total_size > cleanup_config.cache_limit { + let Some((artifact_id, path, size, last_time_needed)) = artifact_sizes.pop() else { + break + }; + + let used_recently = now + .duration_since(last_time_needed) + .map(|stale_time| stale_time < cleanup_config.min_stale_time) + .unwrap_or(true); + if used_recently { + break; + } - for artifact in &to_remove { - self.inner.remove(&artifact.0); + self.inner.remove(&artifact_id); + to_remove.push((artifact_id, path)); + total_size -= size; } to_remove @@ -278,6 +331,8 @@ impl Artifacts { #[cfg(test)] mod tests { + use crate::testing::artifact_id; + use super::*; #[tokio::test] @@ -307,4 +362,100 @@ mod tests { assert!(entries.contains(&String::from("worker-prepare-test"))); assert_eq!(artifacts.len(), 0); } + + #[tokio::test] + async fn test_pruned_by_cache_size() { + let mock_now = SystemTime::now(); + let tempdir = tempfile::tempdir().unwrap(); + let cache_path = tempdir.path(); + + let path1 = generate_artifact_path(cache_path); + let path2 = generate_artifact_path(cache_path); + let path3 = generate_artifact_path(cache_path); + let artifact_id1 = artifact_id(1); + let artifact_id2 = artifact_id(2); + let artifact_id3 = artifact_id(3); + + let mut artifacts = Artifacts::new(cache_path).await; + let cleanup_config = ArtifactsCleanupConfig::new(1500, Duration::from_secs(0)); + + artifacts.insert_prepared( + artifact_id1.clone(), + path1.clone(), + mock_now - Duration::from_secs(5), + 1024, + PrepareStats::default(), + ); + artifacts.insert_prepared( + artifact_id2.clone(), + path2.clone(), + mock_now - Duration::from_secs(10), + 1024, + PrepareStats::default(), + ); + artifacts.insert_prepared( + artifact_id3.clone(), + path3.clone(), + mock_now - Duration::from_secs(15), + 1024, + PrepareStats::default(), + ); + + let pruned = artifacts.prune(&cleanup_config); + + assert!(artifacts.artifact_ids().contains(&artifact_id1)); + assert!(!pruned.contains(&(artifact_id1, path1))); + assert!(!artifacts.artifact_ids().contains(&artifact_id2)); + assert!(pruned.contains(&(artifact_id2, path2))); + assert!(!artifacts.artifact_ids().contains(&artifact_id3)); + assert!(pruned.contains(&(artifact_id3, path3))); + } + + #[tokio::test] + async fn test_did_not_prune_by_cache_size_because_of_stale_time() { + let mock_now = SystemTime::now(); + let tempdir = tempfile::tempdir().unwrap(); + let cache_path = tempdir.path(); + + let path1 = generate_artifact_path(cache_path); + let path2 = generate_artifact_path(cache_path); + let path3 = generate_artifact_path(cache_path); + let artifact_id1 = artifact_id(1); + let artifact_id2 = artifact_id(2); + let artifact_id3 = artifact_id(3); + + let mut artifacts = Artifacts::new(cache_path).await; + let cleanup_config = ArtifactsCleanupConfig::new(1500, Duration::from_secs(12)); + + artifacts.insert_prepared( + artifact_id1.clone(), + path1.clone(), + mock_now - Duration::from_secs(5), + 1024, + PrepareStats::default(), + ); + artifacts.insert_prepared( + artifact_id2.clone(), + path2.clone(), + mock_now - Duration::from_secs(10), + 1024, + PrepareStats::default(), + ); + artifacts.insert_prepared( + artifact_id3.clone(), + path3.clone(), + mock_now - Duration::from_secs(15), + 1024, + PrepareStats::default(), + ); + + let pruned = artifacts.prune(&cleanup_config); + + assert!(artifacts.artifact_ids().contains(&artifact_id1)); + assert!(!pruned.contains(&(artifact_id1, path1))); + assert!(artifacts.artifact_ids().contains(&artifact_id2)); + assert!(!pruned.contains(&(artifact_id2, path2))); + assert!(!artifacts.artifact_ids().contains(&artifact_id3)); + assert!(pruned.contains(&(artifact_id3, path3))); + } } diff --git a/polkadot/node/core/pvf/src/host.rs b/polkadot/node/core/pvf/src/host.rs index 4065598a3ac4..462631d33b52 100644 --- a/polkadot/node/core/pvf/src/host.rs +++ b/polkadot/node/core/pvf/src/host.rs @@ -21,7 +21,7 @@ //! [`ValidationHost`], that allows communication with that event-loop. use crate::{ - artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts}, + artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts, ArtifactsCleanupConfig}, execute::{self, PendingExecutionRequest}, metrics::Metrics, prepare, Priority, SecurityStatus, ValidationError, LOG_TARGET, @@ -293,7 +293,7 @@ pub async fn start( let run_host = async move { run(Inner { cleanup_pulse_interval: Duration::from_secs(3600), - artifact_ttl: Duration::from_secs(3600 * 24), + cleanup_config: ArtifactsCleanupConfig::default(), artifacts, to_host_rx, to_prepare_queue_tx, @@ -337,7 +337,7 @@ impl AwaitingPrepare { struct Inner { cleanup_pulse_interval: Duration, - artifact_ttl: Duration, + cleanup_config: ArtifactsCleanupConfig, artifacts: Artifacts, to_host_rx: mpsc::Receiver, @@ -359,7 +359,7 @@ struct Fatal; async fn run( Inner { cleanup_pulse_interval, - artifact_ttl, + cleanup_config, mut artifacts, to_host_rx, from_prepare_queue_rx, @@ -415,7 +415,7 @@ async fn run( break_if_fatal!(handle_cleanup_pulse( &mut to_sweeper_tx, &mut artifacts, - artifact_ttl, + &cleanup_config, ).await); }, to_host = to_host_rx.next() => { @@ -803,8 +803,12 @@ async fn handle_prepare_done( } *state = match result { - Ok(PrepareSuccess { path, stats: prepare_stats }) => - ArtifactState::Prepared { path, last_time_needed: SystemTime::now(), prepare_stats }, + Ok(PrepareSuccess { path, stats: prepare_stats, size }) => ArtifactState::Prepared { + path, + last_time_needed: SystemTime::now(), + size, + prepare_stats, + }, Err(error) => { let last_time_failed = SystemTime::now(); let num_failures = *num_failures + 1; @@ -859,9 +863,9 @@ async fn enqueue_prepare_for_execute( async fn handle_cleanup_pulse( sweeper_tx: &mut mpsc::Sender, artifacts: &mut Artifacts, - artifact_ttl: Duration, + cleanup_config: &ArtifactsCleanupConfig, ) -> Result<(), Fatal> { - let to_remove = artifacts.prune(artifact_ttl); + let to_remove = artifacts.prune(cleanup_config); gum::debug!( target: LOG_TARGET, "PVF pruning: {} artifacts reached their end of life", @@ -959,7 +963,7 @@ fn pulse_every(interval: std::time::Duration) -> impl futures::Stream #[cfg(test)] pub(crate) mod tests { use super::*; - use crate::{artifacts::generate_artifact_path, PossiblyInvalidError}; + use crate::{artifacts::generate_artifact_path, testing::artifact_id, PossiblyInvalidError}; use assert_matches::assert_matches; use futures::future::BoxFuture; use polkadot_node_core_pvf_common::prepare::PrepareStats; @@ -981,14 +985,9 @@ pub(crate) mod tests { } } - /// Creates a new PVF which artifact id can be uniquely identified by the given number. - fn artifact_id(discriminator: u32) -> ArtifactId { - ArtifactId::from_pvf_prep_data(&PvfPrepData::from_discriminator(discriminator)) - } - struct Builder { cleanup_pulse_interval: Duration, - artifact_ttl: Duration, + cleanup_config: ArtifactsCleanupConfig, artifacts: Artifacts, } @@ -997,8 +996,7 @@ pub(crate) mod tests { Self { // these are selected high to not interfere in tests in which pruning is irrelevant. cleanup_pulse_interval: Duration::from_secs(3600), - artifact_ttl: Duration::from_secs(3600), - + cleanup_config: ArtifactsCleanupConfig::default(), artifacts: Artifacts::empty(), } } @@ -1022,7 +1020,7 @@ pub(crate) mod tests { } impl Test { - fn new(Builder { cleanup_pulse_interval, artifact_ttl, artifacts }: Builder) -> Self { + fn new(Builder { cleanup_pulse_interval, artifacts, cleanup_config }: Builder) -> Self { let (to_host_tx, to_host_rx) = mpsc::channel(10); let (to_prepare_queue_tx, to_prepare_queue_rx) = mpsc::channel(10); let (from_prepare_queue_tx, from_prepare_queue_rx) = mpsc::unbounded(); @@ -1032,7 +1030,7 @@ pub(crate) mod tests { let run = run(Inner { cleanup_pulse_interval, - artifact_ttl, + cleanup_config, artifacts, to_host_rx, to_prepare_queue_tx, @@ -1183,19 +1181,21 @@ pub(crate) mod tests { let mut builder = Builder::default(); builder.cleanup_pulse_interval = Duration::from_millis(100); - builder.artifact_ttl = Duration::from_millis(500); + builder.cleanup_config = ArtifactsCleanupConfig::new(1024, Duration::from_secs(0)); let path1 = generate_artifact_path(cache_path); let path2 = generate_artifact_path(cache_path); builder.artifacts.insert_prepared( artifact_id(1), path1.clone(), mock_now, + 1024, PrepareStats::default(), ); builder.artifacts.insert_prepared( artifact_id(2), path2.clone(), mock_now, + 1024, PrepareStats::default(), ); let mut test = builder.build(); diff --git a/polkadot/node/core/pvf/src/prepare/worker_interface.rs b/polkadot/node/core/pvf/src/prepare/worker_interface.rs index d64ee1510cad..193d5ed690a4 100644 --- a/polkadot/node/core/pvf/src/prepare/worker_interface.rs +++ b/polkadot/node/core/pvf/src/prepare/worker_interface.rs @@ -234,6 +234,19 @@ async fn handle_response( return Outcome::TimedOut } + let size = match tokio::fs::metadata(cache_path).await { + Ok(metadata) => metadata.len(), + Err(err) => { + gum::warn!( + target: LOG_TARGET, + ?cache_path, + "failed to read size of the artifact: {}", + err, + ); + return Outcome::IoErr(err.to_string()) + }, + }; + // The file name should uniquely identify the artifact even across restarts. In case the cache // for some reason is not cleared correctly, we cannot // accidentally execute an artifact compiled under a different wasmtime version, host @@ -253,6 +266,7 @@ async fn handle_response( worker, result: Ok(PrepareSuccess { path: artifact_path, + size, stats: PrepareStats { cpu_time_elapsed, memory_stats: memory_stats.clone() }, }), }, diff --git a/polkadot/node/core/pvf/src/testing.rs b/polkadot/node/core/pvf/src/testing.rs index 60b0b4b8d3d0..8c75dafa69c2 100644 --- a/polkadot/node/core/pvf/src/testing.rs +++ b/polkadot/node/core/pvf/src/testing.rs @@ -21,8 +21,9 @@ pub use crate::{ worker_interface::{spawn_with_program_path, SpawnErr}, }; -use crate::get_worker_version; +use crate::{artifacts::ArtifactId, get_worker_version}; use is_executable::IsExecutable; +use polkadot_node_core_pvf_common::pvf::PvfPrepData; use polkadot_node_primitives::NODE_VERSION; use polkadot_primitives::ExecutorParams; use std::{ @@ -126,3 +127,8 @@ pub fn build_workers_and_get_paths() -> (PathBuf, PathBuf) { let guard = mutex.lock().unwrap(); (guard.0.clone(), guard.1.clone()) } + +/// Creates a new PVF which artifact id can be uniquely identified by the given number. +pub fn artifact_id(discriminator: u32) -> ArtifactId { + ArtifactId::from_pvf_prep_data(&PvfPrepData::from_discriminator(discriminator)) +} diff --git a/prdoc/pr_4662.prdoc b/prdoc/pr_4662.prdoc new file mode 100644 index 000000000000..50f8a5bfd011 --- /dev/null +++ b/prdoc/pr_4662.prdoc @@ -0,0 +1,17 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Cleanup PVF artifact by cache limit and stale time + +doc: + - audience: Node Operator + description: | + Extend the PVF artifacts cleanup strategy. Previously, we pruned artifacts that were stale more than 24 hours. + After this change we attempt pruning artifacts only when they reach the 10 GB cache limit. If the least used + artifact is stale less than 24 hours we don't remove it. + +crates: + - name: polkadot-node-core-pvf-common + bump: patch + - name: polkadot-node-core-pvf + bump: patch