Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup PVF artifact by cache limit and stale time #4662

Merged
merged 20 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions polkadot/node/core/pvf/common/src/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub struct PrepareWorkerSuccess {
pub struct PrepareSuccess {
/// Canonical path to the compiled artifact.
pub path: PathBuf,
/// Size in bytes
pub size: u64,
/// Stats of the current preparation run.
pub stats: PrepareStats,
}
Expand Down
179 changes: 165 additions & 14 deletions polkadot/node/core/pvf/src/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ pub enum ArtifactState {
/// This is updated when we get the heads up for this artifact or when we just discover
/// this file.
last_time_needed: SystemTime,
/// Size in bytes
size: u64,
/// Stats produced by successful preparation.
prepare_stats: PrepareStats,
},
Expand Down Expand Up @@ -169,6 +171,33 @@ pub struct Artifacts {
inner: HashMap<ArtifactId, ArtifactState>,
}

/// Parameters we use to cleanup artifacts
/// After we hit the cache limit we remove the least used artefacts
AndreiEres marked this conversation as resolved.
Show resolved Hide resolved
/// but only if they are stale more than minimum stale time
#[derive(Debug)]
pub struct ArtifactsCleanupConfig {
// Max size in bytes. Reaching it the least used artefacts are deleted
cache_limit: u64,
// Inactive time after which artefact is allowed to be deleted
min_stale_time: Duration,
}

impl Default for ArtifactsCleanupConfig {
fn default() -> Self {
Self {
cache_limit: 10 * 1024 * 1024 * 1024, // 1 GB
AndreiEres marked this conversation as resolved.
Show resolved Hide resolved
min_stale_time: Duration::from_secs(24 * 60 * 60), // 24 hours
}
}
}

#[cfg(test)]
impl ArtifactsCleanupConfig {
pub fn new(cache_limit: u64, min_stale_time: Duration) -> Self {
Self { cache_limit, min_stale_time }
}
}

impl Artifacts {
#[cfg(test)]
pub(crate) fn empty() -> Self {
Expand All @@ -180,6 +209,11 @@ impl Artifacts {
self.inner.len()
}

#[cfg(test)]
fn artifact_ids(&self) -> Vec<ArtifactId> {
self.inner.keys().cloned().collect()
}

/// Create an empty table and the cache directory on-disk if it doesn't exist.
pub async fn new(cache_path: &Path) -> Self {
// Make sure that the cache path directory and all its parents are created.
Expand Down Expand Up @@ -234,12 +268,16 @@ impl Artifacts {
artifact_id: ArtifactId,
path: PathBuf,
last_time_needed: SystemTime,
size: u64,
prepare_stats: PrepareStats,
) {
// See the precondition.
always!(self
.inner
.insert(artifact_id, ArtifactState::Prepared { path, last_time_needed, prepare_stats })
.insert(
artifact_id,
ArtifactState::Prepared { path, last_time_needed, size, prepare_stats }
)
.is_none());
}

Expand All @@ -251,25 +289,40 @@ impl Artifacts {
})
}

/// Remove artifacts older than the given TTL and return id and path of the removed ones.
pub fn prune(&mut self, artifact_ttl: Duration) -> Vec<(ArtifactId, PathBuf)> {
/// Remove artifacts older than the given TTL or the total artifacts size limit and return id
/// and path of the removed ones.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doc comment needs fixing: we evict LRU artifacts only if we go over cache limit.

pub fn prune(&mut self, cleanup_config: &ArtifactsCleanupConfig) -> Vec<(ArtifactId, PathBuf)> {
let mut to_remove = vec![];
let now = SystemTime::now();

let mut to_remove = vec![];
let mut total_size = 0;
let mut artifact_sizes = vec![];

for (k, v) in self.inner.iter() {
if let ArtifactState::Prepared { last_time_needed, ref path, .. } = *v {
if now
.duration_since(last_time_needed)
.map(|age| age > artifact_ttl)
.unwrap_or(false)
{
to_remove.push((k.clone(), path.clone()));
}
if let ArtifactState::Prepared { ref path, last_time_needed, size, .. } = *v {
total_size += size;
artifact_sizes.push((k.clone(), path.clone(), size, last_time_needed));
}
}
artifact_sizes
.sort_by_key(|&(_, _, _, last_time_needed)| std::cmp::Reverse(last_time_needed));

while total_size > cleanup_config.cache_limit {
let Some((artifact_id, path, size, last_time_needed)) = artifact_sizes.pop() else {
break
};

let used_recently = now
.duration_since(last_time_needed)
.map(|stale_time| stale_time < cleanup_config.min_stale_time)
.unwrap_or(true);
if used_recently {
break;
}
Comment on lines +315 to +321
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm 🤔 So, the cache may grow unbounded as long as all the artifacts are fresh? I mean, this mimics the current behavior, so no new attack vectors are introduced here for sure, but probably we should review the old ones... I don't call for any change here right now, it's just for discussion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the cache may grow unbounded as long as all the artifacts are fresh

I believe it growths no more than now because we use same 24h limit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly. I'm just wondering if there's some scenario where someone buys some cheap coretime leftovers and starts pushing tons of PVFs around within 24 hours to overflow validators' disk space. Sounds unlikely, just paranoia, probably.


for artifact in &to_remove {
self.inner.remove(&artifact.0);
self.inner.remove(&artifact_id);
to_remove.push((artifact_id, path));
total_size -= size;
}

to_remove
Expand All @@ -278,6 +331,8 @@ impl Artifacts {

#[cfg(test)]
mod tests {
use crate::testing::artifact_id;

use super::*;

#[tokio::test]
Expand Down Expand Up @@ -307,4 +362,100 @@ mod tests {
assert!(entries.contains(&String::from("worker-prepare-test")));
assert_eq!(artifacts.len(), 0);
}

#[tokio::test]
async fn test_pruned_by_cache_size() {
let mock_now = SystemTime::now();
let tempdir = tempfile::tempdir().unwrap();
let cache_path = tempdir.path();

let path1 = generate_artifact_path(cache_path);
let path2 = generate_artifact_path(cache_path);
let path3 = generate_artifact_path(cache_path);
let artifact_id1 = artifact_id(1);
let artifact_id2 = artifact_id(2);
let artifact_id3 = artifact_id(3);

let mut artifacts = Artifacts::new(cache_path).await;
let cleanup_config = ArtifactsCleanupConfig::new(1500, Duration::from_secs(0));

artifacts.insert_prepared(
artifact_id1.clone(),
path1.clone(),
mock_now - Duration::from_secs(5),
1024,
PrepareStats::default(),
);
artifacts.insert_prepared(
artifact_id2.clone(),
path2.clone(),
mock_now - Duration::from_secs(10),
1024,
PrepareStats::default(),
);
artifacts.insert_prepared(
artifact_id3.clone(),
path3.clone(),
mock_now - Duration::from_secs(15),
1024,
PrepareStats::default(),
);

let pruned = artifacts.prune(&cleanup_config);

assert!(artifacts.artifact_ids().contains(&artifact_id1));
assert!(!pruned.contains(&(artifact_id1, path1)));
assert!(!artifacts.artifact_ids().contains(&artifact_id2));
assert!(pruned.contains(&(artifact_id2, path2)));
assert!(!artifacts.artifact_ids().contains(&artifact_id3));
assert!(pruned.contains(&(artifact_id3, path3)));
}

#[tokio::test]
async fn test_did_not_prune_by_cache_size_because_of_stale_time() {
let mock_now = SystemTime::now();
let tempdir = tempfile::tempdir().unwrap();
let cache_path = tempdir.path();

let path1 = generate_artifact_path(cache_path);
let path2 = generate_artifact_path(cache_path);
let path3 = generate_artifact_path(cache_path);
let artifact_id1 = artifact_id(1);
let artifact_id2 = artifact_id(2);
let artifact_id3 = artifact_id(3);

let mut artifacts = Artifacts::new(cache_path).await;
let cleanup_config = ArtifactsCleanupConfig::new(1500, Duration::from_secs(12));

artifacts.insert_prepared(
artifact_id1.clone(),
path1.clone(),
mock_now - Duration::from_secs(5),
1024,
PrepareStats::default(),
);
artifacts.insert_prepared(
artifact_id2.clone(),
path2.clone(),
mock_now - Duration::from_secs(10),
1024,
PrepareStats::default(),
);
artifacts.insert_prepared(
artifact_id3.clone(),
path3.clone(),
mock_now - Duration::from_secs(15),
1024,
PrepareStats::default(),
);

let pruned = artifacts.prune(&cleanup_config);

assert!(artifacts.artifact_ids().contains(&artifact_id1));
assert!(!pruned.contains(&(artifact_id1, path1)));
assert!(artifacts.artifact_ids().contains(&artifact_id2));
assert!(!pruned.contains(&(artifact_id2, path2)));
assert!(!artifacts.artifact_ids().contains(&artifact_id3));
assert!(pruned.contains(&(artifact_id3, path3)));
}
}
42 changes: 21 additions & 21 deletions polkadot/node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//! [`ValidationHost`], that allows communication with that event-loop.

use crate::{
artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts},
artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts, ArtifactsCleanupConfig},
execute::{self, PendingExecutionRequest},
metrics::Metrics,
prepare, Priority, SecurityStatus, ValidationError, LOG_TARGET,
Expand Down Expand Up @@ -293,7 +293,7 @@ pub async fn start(
let run_host = async move {
run(Inner {
cleanup_pulse_interval: Duration::from_secs(3600),
artifact_ttl: Duration::from_secs(3600 * 24),
cleanup_config: ArtifactsCleanupConfig::default(),
artifacts,
to_host_rx,
to_prepare_queue_tx,
Expand Down Expand Up @@ -337,7 +337,7 @@ impl AwaitingPrepare {

struct Inner {
cleanup_pulse_interval: Duration,
artifact_ttl: Duration,
cleanup_config: ArtifactsCleanupConfig,
artifacts: Artifacts,

to_host_rx: mpsc::Receiver<ToHost>,
Expand All @@ -359,7 +359,7 @@ struct Fatal;
async fn run(
Inner {
cleanup_pulse_interval,
artifact_ttl,
cleanup_config,
mut artifacts,
to_host_rx,
from_prepare_queue_rx,
Expand Down Expand Up @@ -415,7 +415,7 @@ async fn run(
break_if_fatal!(handle_cleanup_pulse(
&mut to_sweeper_tx,
&mut artifacts,
artifact_ttl,
&cleanup_config,
).await);
},
to_host = to_host_rx.next() => {
Expand Down Expand Up @@ -803,8 +803,12 @@ async fn handle_prepare_done(
}

*state = match result {
Ok(PrepareSuccess { path, stats: prepare_stats }) =>
ArtifactState::Prepared { path, last_time_needed: SystemTime::now(), prepare_stats },
Ok(PrepareSuccess { path, stats: prepare_stats, size }) => ArtifactState::Prepared {
path,
last_time_needed: SystemTime::now(),
size,
prepare_stats,
},
Err(error) => {
let last_time_failed = SystemTime::now();
let num_failures = *num_failures + 1;
Expand Down Expand Up @@ -859,9 +863,9 @@ async fn enqueue_prepare_for_execute(
async fn handle_cleanup_pulse(
sweeper_tx: &mut mpsc::Sender<PathBuf>,
artifacts: &mut Artifacts,
artifact_ttl: Duration,
cleanup_config: &ArtifactsCleanupConfig,
) -> Result<(), Fatal> {
let to_remove = artifacts.prune(artifact_ttl);
let to_remove = artifacts.prune(cleanup_config);
gum::debug!(
target: LOG_TARGET,
"PVF pruning: {} artifacts reached their end of life",
Expand Down Expand Up @@ -959,7 +963,7 @@ fn pulse_every(interval: std::time::Duration) -> impl futures::Stream<Item = ()>
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::{artifacts::generate_artifact_path, PossiblyInvalidError};
use crate::{artifacts::generate_artifact_path, testing::artifact_id, PossiblyInvalidError};
use assert_matches::assert_matches;
use futures::future::BoxFuture;
use polkadot_node_core_pvf_common::prepare::PrepareStats;
Expand All @@ -981,14 +985,9 @@ pub(crate) mod tests {
}
}

/// Creates a new PVF which artifact id can be uniquely identified by the given number.
fn artifact_id(discriminator: u32) -> ArtifactId {
ArtifactId::from_pvf_prep_data(&PvfPrepData::from_discriminator(discriminator))
}

struct Builder {
cleanup_pulse_interval: Duration,
artifact_ttl: Duration,
cleanup_config: ArtifactsCleanupConfig,
artifacts: Artifacts,
}

Expand All @@ -997,8 +996,7 @@ pub(crate) mod tests {
Self {
// these are selected high to not interfere in tests in which pruning is irrelevant.
cleanup_pulse_interval: Duration::from_secs(3600),
artifact_ttl: Duration::from_secs(3600),

cleanup_config: ArtifactsCleanupConfig::default(),
artifacts: Artifacts::empty(),
}
}
Expand All @@ -1022,7 +1020,7 @@ pub(crate) mod tests {
}

impl Test {
fn new(Builder { cleanup_pulse_interval, artifact_ttl, artifacts }: Builder) -> Self {
fn new(Builder { cleanup_pulse_interval, artifacts, cleanup_config }: Builder) -> Self {
let (to_host_tx, to_host_rx) = mpsc::channel(10);
let (to_prepare_queue_tx, to_prepare_queue_rx) = mpsc::channel(10);
let (from_prepare_queue_tx, from_prepare_queue_rx) = mpsc::unbounded();
Expand All @@ -1032,7 +1030,7 @@ pub(crate) mod tests {

let run = run(Inner {
cleanup_pulse_interval,
artifact_ttl,
cleanup_config,
artifacts,
to_host_rx,
to_prepare_queue_tx,
Expand Down Expand Up @@ -1183,19 +1181,21 @@ pub(crate) mod tests {

let mut builder = Builder::default();
builder.cleanup_pulse_interval = Duration::from_millis(100);
builder.artifact_ttl = Duration::from_millis(500);
builder.cleanup_config = ArtifactsCleanupConfig::new(1024, Duration::from_secs(0));
let path1 = generate_artifact_path(cache_path);
let path2 = generate_artifact_path(cache_path);
builder.artifacts.insert_prepared(
artifact_id(1),
path1.clone(),
mock_now,
1024,
PrepareStats::default(),
);
builder.artifacts.insert_prepared(
artifact_id(2),
path2.clone(),
mock_now,
1024,
PrepareStats::default(),
);
let mut test = builder.build();
Expand Down
Loading
Loading