Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve artifact cache unless stale #1918

Merged
merged 41 commits into from
Nov 19, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
a563e25
make pruning explicit
eagr Oct 17, 2023
f31d354
preserve cache unless stale
eagr Oct 17, 2023
195bbce
barely working
eagr Oct 17, 2023
99aa012
use ArtifactId::from_file_name()
eagr Oct 17, 2023
17973ef
ignore non-unicode file names
eagr Oct 17, 2023
e81b456
generalize concat_const!()
eagr Oct 18, 2023
851a77a
per advices
eagr Oct 18, 2023
a3b0fcf
break on IO error
eagr Oct 18, 2023
b323c28
make pruning sound
eagr Oct 18, 2023
a2808f8
log more events
eagr Oct 18, 2023
0657d41
refactor
eagr Oct 20, 2023
0384ae7
doc
eagr Oct 20, 2023
4b1bb0a
Refactor indentation
mrcnski Oct 21, 2023
a8bcce4
refactor
eagr Oct 21, 2023
50b7ccc
checksum poc
eagr Oct 22, 2023
b6c1a07
Revert "checksum poc"
eagr Oct 30, 2023
2ad5262
redo checksum p1
eagr Oct 30, 2023
3723806
p2
eagr Oct 30, 2023
e44c451
remove corrupted cache
eagr Oct 31, 2023
6c19164
diversify results
eagr Nov 1, 2023
dad5285
fix tests
eagr Nov 1, 2023
1ede201
fix pruning
eagr Nov 1, 2023
2d51b52
fix message serialization
eagr Nov 1, 2023
36a33e8
clean up
eagr Nov 1, 2023
69f6a44
retire path_prefix()
eagr Nov 2, 2023
d3254f5
improve test
eagr Nov 3, 2023
f4d22fa
Merge branch 'master' into preserve-art
mrcnski Nov 12, 2023
39448ff
Fix test
mrcnski Nov 12, 2023
3a2c1cd
cargo fmt
mrcnski Nov 12, 2023
a02cb06
as per advices
eagr Nov 13, 2023
53e4557
tag artifact with runtime version
eagr Nov 13, 2023
d4f3083
fix tests
eagr Nov 13, 2023
9d2142e
Merge branch 'master' into preserve-art
mrcnski Nov 14, 2023
931aae1
upstream build fn to substrate
eagr Nov 15, 2023
5de4e8e
glitch
eagr Nov 15, 2023
a0b71c5
wrong attribution
eagr Nov 15, 2023
c85adfe
as per suggestions
eagr Nov 17, 2023
b897f1c
glitch
eagr Nov 17, 2023
89ada31
prevent `cargo tree` from accessing network
eagr Nov 17, 2023
868426a
glitch
eagr Nov 17, 2023
652bec7
Merge branch 'master' into preserve-art
eagr Nov 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 190 additions & 65 deletions polkadot/node/core/pvf/src/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

//! PVF artifacts (final compiled code blobs).
//!
//! # Lifecycle of an artifact
//! # Lifecycle of an artifact
Comment on lines 17 to +19
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random thought, could we add an # Artifact integrity section here? It could explain that artifacts must remain valid to satisfy the SAFETY constraints of execute_artifact. Therefore we do two things: check file integrity and version compatibility on host startup, and also before voting against a candidate we re-check the file integrity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we do this when we have decided what to do if an artifact is found corrupted? I'll make a note of this.

//!
//! 1. During node start-up, the artifacts cache is cleaned up. This means that all local artifacts
mrcnski marked this conversation as resolved.
Show resolved Hide resolved
//! stored on-disk are cleared, and we start with an empty [`Artifacts`] table.
Expand Down Expand Up @@ -55,18 +55,58 @@
//! older by a predefined parameter. This process is run very rarely (say, once a day). Once the
//! artifact is expired it is removed from disk eagerly atomically.

use crate::host::PrepareResultSender;
use crate::{host::PrepareResultSender, LOG_TARGET};
use always_assert::always;
use polkadot_core_primitives::Hash;
use polkadot_node_core_pvf_common::{error::PrepareError, prepare::PrepareStats, pvf::PvfPrepData};
use polkadot_node_primitives::NODE_VERSION;
use polkadot_parachain_primitives::primitives::ValidationCodeHash;
use polkadot_primitives::ExecutorParamsHash;
use std::{
collections::HashMap,
path::{Path, PathBuf},
str::FromStr as _,
time::{Duration, SystemTime},
};

macro_rules! concat_const {
mrcnski marked this conversation as resolved.
Show resolved Hide resolved
($($arg:tt),*) => {{
// ensure inputs to be strings
$(const _: &str = $arg;)*

const LEN: usize = 0 $(+ $arg.len())*;

const CAT: [u8; LEN] = {
let mut cat = [0u8; LEN];
// for turning off unused warning
let mut _offset = 0;

$({
const BYTES: &[u8] = $arg.as_bytes();

let mut i = 0;
let len = BYTES.len();
while i < len {
cat[_offset + i] = BYTES[i];
i += 1;
}
_offset += len;
})*

cat
};

match std::str::from_utf8(&CAT) {
Ok(s) => s,
Err(_) => panic!("Error converting bytes to str"),
}
}}
}

const RUNTIME_PREFIX: &str = "wasmtime_";
const NODE_PREFIX: &str = "polkadot_v";
const ARTIFACT_PREFIX: &str = concat_const!(RUNTIME_PREFIX, NODE_PREFIX, NODE_VERSION);

/// Identifier of an artifact. Encodes a code hash of the PVF and a hash of executor parameter set.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ArtifactId {
Expand All @@ -75,9 +115,6 @@ pub struct ArtifactId {
}

impl ArtifactId {
const PREFIX: &'static str = "wasmtime_";
const NODE_VERSION_PREFIX: &'static str = "polkadot_v";

/// Creates a new artifact ID with the given hash.
pub fn new(code_hash: ValidationCodeHash, executor_params_hash: ExecutorParamsHash) -> Self {
Self { code_hash, executor_params_hash }
Expand All @@ -88,38 +125,32 @@ impl ArtifactId {
Self::new(pvf.code_hash(), pvf.executor_params().hash())
}

/// Returns the expected path to this artifact given the root of the cache.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should update the docstring.

pub fn path(&self, cache_path: &Path) -> PathBuf {
let file_name =
format!("{}_{:#x}_{:#x}", ARTIFACT_PREFIX, self.code_hash, self.executor_params_hash);
cache_path.join(file_name)
}

/// Tries to recover the artifact id from the given file name.
#[cfg(test)]
pub fn from_file_name(file_name: &str) -> Option<Self> {
use polkadot_core_primitives::Hash;
use std::str::FromStr as _;
fn from_file_name(file_name: &str) -> Option<Self> {
let file_name = file_name.strip_prefix(ARTIFACT_PREFIX)?.strip_prefix('_')?;

let file_name =
file_name.strip_prefix(Self::PREFIX)?.strip_prefix(Self::NODE_VERSION_PREFIX)?;
// [ code hash | param hash ]
let hashes: Vec<&str> = file_name.split('_').collect();

// [ node version | code hash | param hash ]
let parts: Vec<&str> = file_name.split('_').collect();
let (_node_ver, code_hash_str, executor_params_hash_str) = (parts[0], parts[1], parts[2]);
if hashes.len() != 2 {
return None
}

let (code_hash_str, executor_params_hash_str) = (hashes[0], hashes[1]);

let code_hash = Hash::from_str(code_hash_str).ok()?.into();
let executor_params_hash =
ExecutorParamsHash::from_hash(Hash::from_str(executor_params_hash_str).ok()?);

Some(Self { code_hash, executor_params_hash })
}

/// Returns the expected path to this artifact given the root of the cache.
pub fn path(&self, cache_path: &Path) -> PathBuf {
let file_name = format!(
"{}{}{}_{:#x}_{:#x}",
Self::PREFIX,
Self::NODE_VERSION_PREFIX,
NODE_VERSION,
self.code_hash,
self.executor_params_hash
);
cache_path.join(file_name)
}
}

/// A bundle of the artifact ID and the path.
Expand Down Expand Up @@ -176,32 +207,85 @@ pub enum ArtifactState {

/// A container of all known artifact ids and their states.
pub struct Artifacts {
artifacts: HashMap<ArtifactId, ArtifactState>,
inner: HashMap<ArtifactId, ArtifactState>,
}

impl Artifacts {
/// Initialize a blank cache at the given path. This will clear everything present at the
/// given path, to be populated over time.
///
/// The recognized artifacts will be filled in the table and unrecognized will be removed.
pub async fn new(cache_path: &Path) -> Self {
// First delete the entire cache. This includes artifacts and any leftover worker dirs (see
// [`WorkerDir`]). Nodes are long-running so this should populate shortly.
let _ = tokio::fs::remove_dir_all(cache_path).await;
#[cfg(test)]
pub(crate) fn empty() -> Self {
Self { inner: HashMap::new() }
}

#[cfg(test)]
pub(crate) fn len(&self) -> usize {
self.inner.len()
}

/// Create an empty table and populate it with valid artifacts as [`ArtifactState::Prepared`],
/// if any. The existing caches will be checked by their file name to determine whether they are
/// valid, e.g., matching the current node version. The ones deemed invalid will be pruned.
pub async fn new_and_prune(cache_path: &Path) -> Self {
let mut artifacts = Self { inner: HashMap::new() };
artifacts.insert_and_prune(cache_path).await;
artifacts
}

async fn insert_and_prune(&mut self, cache_path: impl AsRef<Path>) {
fn is_fresh(file_name: &str) -> bool {
file_name.starts_with(ARTIFACT_PREFIX)
}

// Make sure that the cache path directory and all its parents are created.
let cache_path = cache_path.as_ref();
let _ = tokio::fs::create_dir_all(cache_path).await;

Self { artifacts: HashMap::new() }
}
if let Ok(mut dir) = tokio::fs::read_dir(cache_path).await {
mrcnski marked this conversation as resolved.
Show resolved Hide resolved
let mut prunes = vec![];

loop {
match dir.next_entry().await {
mrcnski marked this conversation as resolved.
Show resolved Hide resolved
Ok(None) => break,
Ok(Some(entry)) => {
let file_name = entry.file_name();
if let Some(file_name) = file_name.to_str() {
mrcnski marked this conversation as resolved.
Show resolved Hide resolved
if is_fresh(file_name) {
mrcnski marked this conversation as resolved.
Show resolved Hide resolved
if let Some(id) = ArtifactId::from_file_name(file_name) {
self.insert_prepared(id, SystemTime::now(), Default::default());
gum::debug!(
target: LOG_TARGET,
"reusing valid artifact found on disk for current node version v{}",
NODE_VERSION,
);
mrcnski marked this conversation as resolved.
Show resolved Hide resolved
continue
}
}

prunes.push(tokio::fs::remove_file(cache_path.join(file_name)));
gum::debug!(
target: LOG_TARGET,
"discarding invalid artifact {}",
file_name,
);
}
},
Err(err) => {
gum::error!(
target: LOG_TARGET,
?err,
"I/O error while collecting stale artifacts",
);
break
},
}
}

#[cfg(test)]
pub(crate) fn empty() -> Self {
Self { artifacts: HashMap::new() }
futures::future::join_all(prunes).await;
}
}

/// Returns the state of the given artifact by its ID.
pub fn artifact_state_mut(&mut self, artifact_id: &ArtifactId) -> Option<&mut ArtifactState> {
self.artifacts.get_mut(artifact_id)
self.inner.get_mut(artifact_id)
}

/// Inform the table about the artifact with the given ID. The state will be set to "preparing".
Expand All @@ -215,7 +299,7 @@ impl Artifacts {
) {
// See the precondition.
always!(self
.artifacts
.inner
.insert(artifact_id, ArtifactState::Preparing { waiting_for_response, num_failures: 0 })
.is_none());
}
Expand All @@ -224,16 +308,15 @@ impl Artifacts {
///
/// This function must be used only for brand-new artifacts and should never be used for
/// replacing existing ones.
#[cfg(test)]
pub fn insert_prepared(
pub(crate) fn insert_prepared(
&mut self,
artifact_id: ArtifactId,
last_time_needed: SystemTime,
prepare_stats: PrepareStats,
) {
// See the precondition.
mrcnski marked this conversation as resolved.
Show resolved Hide resolved
always!(self
.artifacts
.inner
.insert(artifact_id, ArtifactState::Prepared { last_time_needed, prepare_stats })
.is_none());
}
Expand All @@ -244,7 +327,7 @@ impl Artifacts {
let now = SystemTime::now();

let mut to_remove = vec![];
for (k, v) in self.artifacts.iter() {
for (k, v) in self.inner.iter() {
if let ArtifactState::Prepared { last_time_needed, .. } = *v {
if now
.duration_since(last_time_needed)
Expand All @@ -257,7 +340,7 @@ impl Artifacts {
}

for artifact in &to_remove {
self.artifacts.remove(artifact);
self.inner.remove(artifact);
}

to_remove
Expand All @@ -266,15 +349,57 @@ impl Artifacts {

#[cfg(test)]
mod tests {
use super::{ArtifactId, Artifacts, NODE_VERSION};
use super::{ArtifactId, Artifacts, ARTIFACT_PREFIX, NODE_VERSION};
use polkadot_primitives::ExecutorParamsHash;
use rand::Rng;
use sp_core::H256;
use std::{path::Path, str::FromStr};
use std::{
fs,
path::{Path, PathBuf},
str::FromStr,
};

fn rand_hash() -> String {
let mut rng = rand::thread_rng();
let hex: Vec<_> = "0123456789abcdef".chars().collect();
(0..64).map(|_| hex[rng.gen_range(0..hex.len())]).collect()
}

fn file_name(code_hash: &str, param_hash: &str) -> String {
format!("wasmtime_polkadot_v{}_0x{}_0x{}", NODE_VERSION, code_hash, param_hash)
}

fn fake_artifact_path(
dir: impl AsRef<Path>,
prefix: &str,
code_hash: impl AsRef<str>,
params_hash: impl AsRef<str>,
) -> PathBuf {
let mut path = dir.as_ref().to_path_buf();
let file_name = format!("{}_0x{}_0x{}", prefix, code_hash.as_ref(), params_hash.as_ref());
path.push(file_name);
path
}

fn create_artifact(
dir: impl AsRef<Path>,
prefix: &str,
code_hash: impl AsRef<str>,
params_hash: impl AsRef<str>,
) {
let path = fake_artifact_path(dir, prefix, code_hash, params_hash);
fs::File::create(path).unwrap();
}

fn create_rand_artifact(dir: impl AsRef<Path>, prefix: &str) {
create_artifact(dir, prefix, rand_hash(), rand_hash());
}

#[test]
fn artifact_prefix() {
assert_eq!(ARTIFACT_PREFIX, format!("wasmtime_polkadot_v{}", NODE_VERSION),)
}

#[test]
fn from_file_name() {
assert!(ArtifactId::from_file_name("").is_none());
Expand Down Expand Up @@ -318,26 +443,26 @@ mod tests {
}

#[tokio::test]
async fn artifacts_removes_cache_on_startup() {
let fake_cache_path = crate::worker_intf::tmppath("test-cache").await.unwrap();
let fake_artifact_path = {
let mut p = fake_cache_path.clone();
p.push("wasmtime_0x1234567890123456789012345678901234567890123456789012345678901234");
p
};
async fn remove_stale_cache_on_startup() {
let cache_dir = crate::worker_intf::tmppath("test-cache").await.unwrap();

// create a tmp cache with 1 artifact.
fs::create_dir_all(&cache_dir).unwrap();

std::fs::create_dir_all(&fake_cache_path).unwrap();
std::fs::File::create(fake_artifact_path).unwrap();
// 5 invalid, 1 valid
create_rand_artifact(&cache_dir, "");
create_rand_artifact(&cache_dir, "wasmtime_polkadot_v");
create_rand_artifact(&cache_dir, "wasmtime_polkadot_v1.0.0");
create_rand_artifact(&cache_dir, ARTIFACT_PREFIX);
create_artifact(&cache_dir, ARTIFACT_PREFIX, "", "");
create_artifact(&cache_dir, ARTIFACT_PREFIX, "000", "000000");

// this should remove it and re-create.
assert_eq!(fs::read_dir(&cache_dir).unwrap().count(), 6);

let p = &fake_cache_path;
Artifacts::new(p).await;
let artifacts = Artifacts::new_and_prune(&cache_dir).await;

assert_eq!(std::fs::read_dir(&fake_cache_path).unwrap().count(), 0);
assert_eq!(fs::read_dir(&cache_dir).unwrap().count(), 1);
assert_eq!(artifacts.len(), 1);

std::fs::remove_dir_all(fake_cache_path).unwrap();
fs::remove_dir_all(cache_dir).unwrap();
}
}
2 changes: 1 addition & 1 deletion polkadot/node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future<O
let run_sweeper = sweeper_task(to_sweeper_rx);

let run_host = async move {
let artifacts = Artifacts::new(&config.cache_path).await;
let artifacts = Artifacts::new_and_prune(&config.cache_path).await;

run(Inner {
cache_path: config.cache_path,
Expand Down
Loading