From 2f571eb80b3390e9b7e71d212d695f023c27d5e9 Mon Sep 17 00:00:00 2001 From: Bas Zalmstra Date: Mon, 16 Dec 2024 13:15:33 +0100 Subject: [PATCH] feat: improve performance when linking files using `rayon` (#985) --- Cargo.toml | 1 + crates/rattler-bin/src/commands/create.rs | 17 +- crates/rattler/Cargo.toml | 1 + crates/rattler/src/install/driver.rs | 2 +- .../src/install/installer/indicatif.rs | 4 +- crates/rattler/src/install/installer/mod.rs | 109 ++++-- crates/rattler/src/install/link.rs | 15 +- crates/rattler/src/install/mod.rs | 349 +++++++++++++++++- crates/rattler_cache/Cargo.toml | 1 + crates/rattler_cache/src/package_cache/mod.rs | 12 +- crates/rattler_cache/src/validation.rs | 160 +++++--- 11 files changed, 546 insertions(+), 125 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index cee85eaf0..5f5c1793d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,6 +107,7 @@ plist = "1" purl = { version = "0.1.3", features = ["serde"] } quote = "1.0.37" rand = "0.8.5" +rayon = "1.10.0" reflink-copy = "0.1.20" regex = "1.11.1" reqwest = { version = "0.12.9", default-features = false } diff --git a/crates/rattler-bin/src/commands/create.rs b/crates/rattler-bin/src/commands/create.rs index 346e46d6f..40ebbc18b 100644 --- a/crates/rattler-bin/src/commands/create.rs +++ b/crates/rattler-bin/src/commands/create.rs @@ -27,7 +27,7 @@ use rattler_solve::{ libsolv_c::{self}, resolvo, SolverImpl, SolverTask, }; -use reqwest::Client; +use reqwest::{Client, Url}; use crate::global_multi_progress; @@ -165,11 +165,16 @@ pub async fn create(opt: Opt) -> anyhow::Result<()> { )) .with_client(download_client.clone()) .with_channel_config(rattler_repodata_gateway::ChannelConfig { - default: SourceConfig { - sharded_enabled: false, - ..SourceConfig::default() - }, - ..rattler_repodata_gateway::ChannelConfig::default() + default: SourceConfig::default(), + per_channel: [( + Url::parse("https://prefix.dev")?, + SourceConfig { + sharded_enabled: true, + ..SourceConfig::default() + }, + )] + .into_iter() + .collect(), }) .finish(); diff --git a/crates/rattler/Cargo.toml b/crates/rattler/Cargo.toml index a7b4fef0d..457ba9f4a 100644 --- a/crates/rattler/Cargo.toml +++ b/crates/rattler/Cargo.toml @@ -38,6 +38,7 @@ rattler_digest = { path = "../rattler_digest", version = "1.0.3", default-featur rattler_networking = { path = "../rattler_networking", version = "0.21.8", default-features = false } rattler_shell = { path = "../rattler_shell", version = "0.22.10", default-features = false } rattler_package_streaming = { path = "../rattler_package_streaming", version = "0.22.18", default-features = false, features = ["reqwest"] } +rayon = { workspace = true } reflink-copy = { workspace = true } regex = { workspace = true } reqwest = { workspace = true, features = ["stream", "json", "gzip"] } diff --git a/crates/rattler/src/install/driver.rs b/crates/rattler/src/install/driver.rs index 73be08f68..73c7454a3 100644 --- a/crates/rattler/src/install/driver.rs +++ b/crates/rattler/src/install/driver.rs @@ -29,7 +29,7 @@ use crate::install::link_script::LinkScriptError; /// system has available. pub struct InstallDriver { io_concurrency_semaphore: Option>, - clobber_registry: Arc>, + pub(crate) clobber_registry: Arc>, execute_link_scripts: bool, } diff --git a/crates/rattler/src/install/installer/indicatif.rs b/crates/rattler/src/install/installer/indicatif.rs index 2fc5726de..accc9bf6c 100644 --- a/crates/rattler/src/install/installer/indicatif.rs +++ b/crates/rattler/src/install/installer/indicatif.rs @@ -375,8 +375,8 @@ impl IndicatifReporterInner { .iter() .map(|&idx| (self.package_sizes[idx], &self.package_names[idx])); - let largest_package = package_iter.max_by_key(|(size, _)| *size); - if let Some((_, first)) = largest_package { + let smallest_package = package_iter.min_by_key(|(size, _)| *size); + if let Some((_, first)) = smallest_package { msg.push_str(first); } diff --git a/crates/rattler/src/install/installer/mod.rs b/crates/rattler/src/install/installer/mod.rs index 6546bcce3..bfafbe32a 100644 --- a/crates/rattler/src/install/installer/mod.rs +++ b/crates/rattler/src/install/installer/mod.rs @@ -9,13 +9,6 @@ use std::{ sync::Arc, }; -use super::{unlink_package, AppleCodeSignBehavior, InstallDriver, InstallOptions, Transaction}; -use crate::install::link_script::LinkScriptError; -use crate::{ - default_cache_dir, - install::{clobber_registry::ClobberedPath, link_script::PrePostLinkResult}, - package_cache::PackageCache, -}; pub use error::InstallerError; use futures::{stream::FuturesUnordered, FutureExt, StreamExt, TryFutureExt}; #[cfg(feature = "indicatif")] @@ -23,8 +16,8 @@ pub use indicatif::{ DefaultProgressFormatter, IndicatifReporter, IndicatifReporterBuilder, Placement, ProgressFormatter, }; -use rattler_cache::package_cache::CacheLock; -use rattler_cache::package_cache::CacheReporter; +use itertools::Itertools; +use rattler_cache::package_cache::{CacheLock, CacheReporter}; use rattler_conda_types::{ prefix_record::{Link, LinkType}, Platform, PrefixRecord, RepoDataRecord, @@ -35,6 +28,16 @@ use reqwest::Client; use simple_spawn_blocking::tokio::run_blocking_task; use tokio::{sync::Semaphore, task::JoinError}; +use super::{unlink_package, AppleCodeSignBehavior, InstallDriver, InstallOptions, Transaction}; +use crate::{ + default_cache_dir, + install::{ + clobber_registry::ClobberedPath, + link_script::{LinkScriptError, PrePostLinkResult}, + }, + package_cache::PackageCache, +}; + /// An installer that can install packages into a prefix. #[derive(Default)] pub struct Installer { @@ -344,7 +347,17 @@ impl Installer { // Execute the operations in the transaction. let mut pending_futures = FuturesUnordered::new(); - for (idx, operation) in transaction.operations.iter().enumerate() { + for (idx, operation) in transaction + .operations + .iter() + .enumerate() + .sorted_by_key(|(_, op)| { + op.record_to_install() + .and_then(|r| r.package_record.size) + .unwrap_or(0) + }) + .rev() + { let downloader = &downloader; let package_cache = &package_cache; let reporter = self.reporter.clone(); @@ -462,35 +475,45 @@ async fn link_package( install_options: InstallOptions, driver: &InstallDriver, ) -> Result<(), InstallerError> { - // Link the contents of the package into the prefix. - let paths = - crate::install::link_package(cached_package_dir, target_prefix, driver, install_options) - .await + let record = record.clone(); + let target_prefix = target_prefix.to_path_buf(); + let cached_package_dir = cached_package_dir.to_path_buf(); + let clobber_registry = driver.clobber_registry.clone(); + + let (tx, rx) = tokio::sync::oneshot::channel(); + + rayon::spawn_fifo(move || { + let inner = move || { + // Link the contents of the package into the prefix. + let paths = crate::install::link_package_sync( + &cached_package_dir, + &target_prefix, + clobber_registry, + install_options, + ) .map_err(|e| InstallerError::LinkError(record.file_name.clone(), e))?; - // Construct a PrefixRecord for the package - let prefix_record = PrefixRecord { - repodata_record: record.clone(), - package_tarball_full_path: None, - extracted_package_dir: Some(cached_package_dir.to_path_buf()), - files: paths - .iter() - .map(|entry| entry.relative_path.clone()) - .collect(), - paths_data: paths.into(), - // TODO: Retrieve the requested spec for this package from the request - requested_spec: None, - - link: Some(Link { - source: cached_package_dir.to_path_buf(), - // TODO: compute the right value here based on the options and `can_hard_link` ... - link_type: Some(LinkType::HardLink), - }), - }; + // Construct a PrefixRecord for the package + let prefix_record = PrefixRecord { + repodata_record: record.clone(), + package_tarball_full_path: None, + extracted_package_dir: Some(cached_package_dir.clone()), + files: paths + .iter() + .map(|entry| entry.relative_path.clone()) + .collect(), + paths_data: paths.into(), + // TODO: Retrieve the requested spec for this package from the request + requested_spec: None, + + link: Some(Link { + source: cached_package_dir, + // TODO: compute the right value here based on the options and `can_hard_link` + // ... + link_type: Some(LinkType::HardLink), + }), + }; - let target_prefix = target_prefix.to_path_buf(); - driver - .run_blocking_io_task(move || { let conda_meta_path = target_prefix.join("conda-meta"); std::fs::create_dir_all(&conda_meta_path).map_err(|e| { InstallerError::IoError("failed to create conda-meta directory".to_string(), e) @@ -508,9 +531,17 @@ async fn link_package( ); prefix_record .write_to_path(conda_meta_path.join(&pkg_meta_path), true) - .map_err(|e| InstallerError::IoError(format!("failed to write {pkg_meta_path}"), e)) - }) - .await + .map_err(|e| { + InstallerError::IoError(format!("failed to write {pkg_meta_path}"), e) + })?; + + Ok(()) + }; + + let _ = tx.send(inner()); + }); + + rx.await.unwrap_or(Err(InstallerError::Cancelled)) } /// Given a repodata record, fetch the package into the cache if its not already diff --git a/crates/rattler/src/install/link.rs b/crates/rattler/src/install/link.rs index 9b49a7eea..b47b66982 100644 --- a/crates/rattler/src/install/link.rs +++ b/crates/rattler/src/install/link.rs @@ -375,12 +375,15 @@ fn reflink_to_destination( loop { match reflink(source_path, destination_path) { Ok(_) => { - // Copy over filesystem permissions. We do this to ensure that the destination file has the - // same permissions as the source file. - let metadata = std::fs::metadata(source_path) - .map_err(LinkFileError::FailedToReadSourceFileMetadata)?; - std::fs::set_permissions(destination_path, metadata.permissions()) - .map_err(LinkFileError::FailedToUpdateDestinationFilePermissions)?; + #[cfg(target_os = "linux")] + { + // Copy over filesystem permissions. We do this to ensure that the destination file has the + // same permissions as the source file. + let metadata = std::fs::metadata(source_path) + .map_err(LinkFileError::FailedToReadSourceFileMetadata)?; + std::fs::set_permissions(destination_path, metadata.permissions()) + .map_err(LinkFileError::FailedToUpdateDestinationFilePermissions)?; + } return Ok(LinkMethod::Reflink); } Err(e) if e.kind() == ErrorKind::AlreadyExists => { diff --git a/crates/rattler/src/install/mod.rs b/crates/rattler/src/install/mod.rs index cc81a6b4d..49e17b2ab 100644 --- a/crates/rattler/src/install/mod.rs +++ b/crates/rattler/src/install/mod.rs @@ -31,12 +31,12 @@ mod test_utils; use std::{ cmp::Ordering, - collections::{binary_heap::PeekMut, BinaryHeap, HashSet}, + collections::{binary_heap::PeekMut, BinaryHeap, HashMap, HashSet}, fs, future::ready, io::ErrorKind, path::{Path, PathBuf}, - sync::Arc, + sync::{Arc, Mutex}, }; pub use apple_codesign::AppleCodeSignBehavior; @@ -57,16 +57,21 @@ use rattler_conda_types::{ prefix_record::PathsEntry, Platform, }; +use rayon::{ + iter::Either, + prelude::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}, +}; use simple_spawn_blocking::Cancelled; use tokio::task::JoinError; use tracing::instrument; pub use transaction::{Transaction, TransactionError, TransactionOperation}; pub use unlink::{empty_trash, unlink_package}; -use crate::install::entry_point::{ - create_unix_python_entry_point, create_windows_python_entry_point, -}; pub use crate::install::entry_point::{get_windows_launcher, python_entry_point_template}; +use crate::install::{ + clobber_registry::ClobberRegistry, + entry_point::{create_unix_python_entry_point, create_windows_python_entry_point}, +}; /// An error that might occur when installing a package. #[derive(Debug, thiserror::Error)] @@ -305,9 +310,8 @@ pub async fn link_package( let allow_ref_links = options.allow_ref_links.unwrap_or_else(|| { match reflink_copy::check_reflink_support(package_dir, target_dir) { Ok(reflink_copy::ReflinkSupport::Supported) => true, - Ok(reflink_copy::ReflinkSupport::NotSupported) => false, + Ok(reflink_copy::ReflinkSupport::NotSupported) | Err(_) => false, Ok(reflink_copy::ReflinkSupport::Unknown) => allow_hard_links, - Err(_) => false, } }); @@ -544,6 +548,283 @@ pub async fn link_package( Ok(paths) } +/// Given an extracted package archive (`package_dir`), installs its files to +/// the `target_dir`. +/// +/// Returns a [`PathsEntry`] for every file that was linked into the target +/// directory. The entries are ordered in the same order as they appear in the +/// `paths.json` file of the package. +#[instrument(skip_all, fields(package_dir = % package_dir.display()))] +pub fn link_package_sync( + package_dir: &Path, + target_dir: &Path, + clobber_registry: Arc>, + options: InstallOptions, +) -> Result, InstallError> { + // Determine the target prefix for linking + let target_prefix = options + .target_prefix + .as_deref() + .unwrap_or(target_dir) + .to_str() + .ok_or(InstallError::TargetPrefixIsNotUtf8)? + .to_owned(); + + // Ensure target directory exists + fs_err::create_dir_all(target_dir).map_err(InstallError::FailedToCreateTargetDirectory)?; + + // Reuse or read the `paths.json` and `index.json` files from the package + // directory + let paths_json = options.paths_json.map_or_else( + || { + PathsJson::from_package_directory_with_deprecated_fallback(package_dir) + .map_err(InstallError::FailedToReadPathsJson) + }, + Ok, + )?; + let index_json = options.index_json.map_or_else( + || { + IndexJson::from_package_directory(package_dir) + .map_err(InstallError::FailedToReadIndexJson) + }, + Ok, + )?; + + // Error out if this is a noarch python package but the python information is + // missing. + if index_json.noarch.is_python() && options.python_info.is_none() { + return Err(InstallError::MissingPythonInfo); + } + + // Parse the `link.json` file and extract entry points from it. + let link_json = if index_json.noarch.is_python() { + options.link_json.flatten().map_or_else( + || { + LinkJson::from_package_directory(package_dir) + .map_or_else( + |e| { + // Its ok if the file is not present. + if e.kind() == ErrorKind::NotFound { + Ok(None) + } else { + Err(e) + } + }, + |link_json| Ok(Some(link_json)), + ) + .map_err(InstallError::FailedToReadLinkJson) + }, + |value| Ok(Some(value)), + )? + } else { + None + }; + + // Determine whether or not we can use symbolic links + let allow_symbolic_links = options + .allow_symbolic_links + .unwrap_or_else(|| can_create_symlinks_sync(target_dir)); + let allow_hard_links = options + .allow_hard_links + .unwrap_or_else(|| can_create_hardlinks_sync(target_dir, package_dir)); + let allow_ref_links = options.allow_ref_links.unwrap_or_else(|| { + match reflink_copy::check_reflink_support(package_dir, target_dir) { + Ok(reflink_copy::ReflinkSupport::Supported) => true, + Ok(reflink_copy::ReflinkSupport::NotSupported) | Err(_) => false, + Ok(reflink_copy::ReflinkSupport::Unknown) => allow_hard_links, + } + }); + + // Determine the platform to use + let platform = options.platform.unwrap_or(Platform::current()); + + // compute all path renames + let mut final_paths = compute_paths(&index_json, &paths_json, options.python_info.as_ref()); + + // register all paths in the install driver path registry + let clobber_paths = clobber_registry + .lock() + .unwrap() + .register_paths(&index_json, &final_paths); + + for (_, computed_path) in final_paths.iter_mut() { + if let Some(clobber_rename) = clobber_paths.get(computed_path) { + *computed_path = clobber_rename.clone(); + } + } + + // Figure out all the directories that we are going to need + let mut directories_to_construct = HashSet::new(); + let mut paths_by_directory = HashMap::new(); + for (entry, computed_path) in final_paths { + let Some(entry_parent) = computed_path.parent() else { + continue; + }; + + // Iterate over all parent directories and create them if they do not exist. + let mut current_path = Some(entry_parent); + while let Some(path) = current_path { + if !path.as_os_str().is_empty() && directories_to_construct.insert(path.to_path_buf()) { + current_path = path.parent(); + } else { + break; + } + } + + // Store the path by directory so we can create them in parallel + paths_by_directory + .entry(entry_parent.to_path_buf()) + .or_insert_with(Vec::new) + .push((entry, computed_path)); + } + + for directory in directories_to_construct.into_iter().sorted() { + let full_path = target_dir.join(directory); + match fs::create_dir(&full_path) { + Ok(_) => (), + Err(e) if e.kind() == ErrorKind::AlreadyExists => (), + Err(e) => return Err(InstallError::FailedToCreateDirectory(full_path, e)), + } + } + + // Wrap the python info in an `Arc` so we can more easily share it with async + // tasks. + let python_info = options.python_info; + + // Link the individual files in parallel + let link_target_prefix = target_prefix.clone(); + let package_dir = package_dir.to_path_buf(); + let link_target_dir = target_dir.to_path_buf(); + let mut paths = paths_by_directory + .into_values() + .collect_vec() + .into_par_iter() + .with_min_len(100) + .flat_map(move |entries_in_subdir| { + let mut path_entries = Vec::with_capacity(entries_in_subdir.len()); + for (entry, computed_path) in entries_in_subdir { + let clobber_rename = clobber_paths.get(&entry.relative_path).cloned(); + let link_result = link_file( + &entry, + computed_path.clone(), + &package_dir, + &link_target_dir, + &link_target_prefix, + allow_symbolic_links && !entry.no_link, + allow_hard_links && !entry.no_link, + allow_ref_links && !entry.no_link, + platform, + options.apple_codesign_behavior, + ); + + let result = match link_result { + Ok(linked_file) => linked_file, + Err(e) => { + return vec![Err(InstallError::FailedToLink( + entry.relative_path.clone(), + e, + ))] + } + }; + + // Construct a `PathsEntry` from the result of the linking operation + path_entries.push(Ok(PathsEntry { + relative_path: result.relative_path, + original_path: if clobber_rename.is_some() { + Some(entry.relative_path.clone()) + } else { + None + }, + path_type: entry.path_type.into(), + no_link: entry.no_link, + sha256: entry.sha256, + sha256_in_prefix: Some(result.sha256), + size_in_bytes: Some(result.file_size), + file_mode: match result.method { + LinkMethod::Patched(file_mode) => Some(file_mode), + _ => None, + }, + prefix_placeholder: entry + .prefix_placeholder + .as_ref() + .map(|p| p.placeholder.clone()), + })); + } + + path_entries + }) + .collect::, _>>()?; + + // If this package is a noarch python package we also have to create entry + // points. + // + // Be careful with the fact that this code is currently running in parallel with + // the linking of individual files. + if let Some(link_json) = link_json { + // Parse the `link.json` file and extract entry points from it. + let entry_points = match link_json.noarch { + NoArchLinks::Python(entry_points) => entry_points.entry_points, + NoArchLinks::Generic => { + unreachable!("we only use link.json for noarch: python packages") + } + }; + + // Get python info + let python_info = python_info + .clone() + .expect("should be safe because its checked above that this contains a value"); + + let target_prefix = target_prefix.clone(); + let target_dir = target_dir.to_path_buf(); + + // Create entry points for each listed item. This is different between Windows + // and unix because on Windows, two PathEntry's are created whereas on + // Linux only one is created. + let mut entry_point_paths = if platform.is_windows() { + entry_points + .into_iter() + // .into_par_iter() + // .with_min_len(100) + .flat_map(move |entry_point| { + match create_windows_python_entry_point( + &target_dir, + &target_prefix, + &entry_point, + &python_info, + &platform, + ) { + Ok([a, b]) => Either::Left([Ok(a), Ok(b)].into_iter()), + Err(e) => Either::Right(std::iter::once(Err( + InstallError::FailedToCreatePythonEntryPoint(e), + ))), + } + }) + .collect::, _>>()? + } else { + entry_points + .into_iter() + // .into_par_iter() + // .with_min_len(100) + .map(move |entry_point| { + match create_unix_python_entry_point( + &target_dir, + &target_prefix, + &entry_point, + &python_info, + ) { + Ok(a) => Ok(a), + Err(e) => Err(InstallError::FailedToCreatePythonEntryPoint(e)), + } + }) + .collect::>()? + }; + + paths.append(&mut entry_point_paths); + }; + + Ok(paths) +} + fn compute_paths( index_json: &IndexJson, paths_json: &PathsJson, @@ -636,6 +917,33 @@ async fn read_link_json( } } +/// Returns true if it is possible to create symlinks in the target directory. +fn can_create_symlinks_sync(target_dir: &Path) -> bool { + let uuid = uuid::Uuid::new_v4(); + let symlink_path = target_dir.join(format!("symtest_{uuid}")); + #[cfg(windows)] + let result = std::os::windows::fs::symlink_file("./", &symlink_path); + #[cfg(unix)] + let result = fs_err::os::unix::fs::symlink("./", &symlink_path); + match result { + Ok(_) => { + if let Err(e) = fs_err::remove_file(&symlink_path) { + tracing::warn!( + "failed to delete temporary file '{}': {e}", + symlink_path.display() + ); + } + true + } + Err(e) => { + tracing::debug!( + "failed to create symlink in target directory: {e}. Disabling use of symlinks." + ); + false + } + } +} + /// A helper struct for a `BinaryHeap` to provides ordering to items that are /// otherwise unordered. struct OrderWrapper { @@ -697,6 +1005,12 @@ async fn can_create_hardlinks(target_dir: &Path, package_dir: &Path) -> bool { paths_have_same_filesystem(target_dir, package_dir).await } +/// Returns true if it is possible to create hard links from the target +/// directory to the package cache directory. +fn can_create_hardlinks_sync(target_dir: &Path, package_dir: &Path) -> bool { + paths_have_same_filesystem_sync(target_dir, package_dir) +} + /// Returns true if two paths share the same filesystem #[cfg(unix)] async fn paths_have_same_filesystem(a: &Path, b: &Path) -> bool { @@ -707,6 +1021,18 @@ async fn paths_have_same_filesystem(a: &Path, b: &Path) -> bool { } } +/// Returns true if two paths share the same filesystem +#[cfg(unix)] +fn paths_have_same_filesystem_sync(a: &Path, b: &Path) -> bool { + use std::os::unix::fs::MetadataExt; + let a = std::fs::metadata(a); + let b = std::fs::metadata(b); + match (a, b) { + (Ok(a), Ok(b)) => a.dev() == b.dev(), + _ => false, + } +} + /// Returns true if two paths share the same filesystem #[cfg(not(unix))] async fn paths_have_same_filesystem(a: &Path, b: &Path) -> bool { @@ -716,6 +1042,15 @@ async fn paths_have_same_filesystem(a: &Path, b: &Path) -> bool { } } +/// Returns true if two paths share the same filesystem +#[cfg(not(unix))] +fn paths_have_same_filesystem_sync(a: &Path, b: &Path) -> bool { + match (a.canonicalize(), b.canonicalize()) { + (Ok(a), Ok(b)) => a.components().next() == b.components().next(), + _ => false, + } +} + #[cfg(test)] mod test { use std::{env::temp_dir, process::Command, str::FromStr}; diff --git a/crates/rattler_cache/Cargo.toml b/crates/rattler_cache/Cargo.toml index 442f14805..d226751fe 100644 --- a/crates/rattler_cache/Cargo.toml +++ b/crates/rattler_cache/Cargo.toml @@ -31,6 +31,7 @@ reqwest-middleware.workspace = true digest.workspace = true fs4 = { workspace = true, features = ["fs-err-tokio"] } simple_spawn_blocking = { version = "1.0.0", path = "../simple_spawn_blocking", features = ["tokio"] } +rayon = { workspace = true } [dev-dependencies] assert_matches.workspace = true diff --git a/crates/rattler_cache/src/package_cache/mod.rs b/crates/rattler_cache/src/package_cache/mod.rs index 06164834f..86ab061f2 100644 --- a/crates/rattler_cache/src/package_cache/mod.rs +++ b/crates/rattler_cache/src/package_cache/mod.rs @@ -28,7 +28,7 @@ use simple_spawn_blocking::Cancelled; use tracing::instrument; use url::Url; -use crate::validation::validate_package_directory; +use crate::validation::{validate_package_directory, ValidationMode}; mod cache_key; mod cache_lock; @@ -364,8 +364,10 @@ where } // Validate the package directory. - let validation_result = - tokio::task::spawn_blocking(move || validate_package_directory(&path_inner)).await; + let validation_result = tokio::task::spawn_blocking(move || { + validate_package_directory(&path_inner, ValidationMode::Full) + }) + .await; if let Some((reporter, index)) = reporter { reporter.on_validate_complete(index); @@ -506,6 +508,7 @@ mod test { use url::Url; use super::PackageCache; + use crate::validation::ValidationMode; use crate::{package_cache::CacheKey, validation::validate_package_directory}; fn get_test_data_dir() -> PathBuf { @@ -553,7 +556,8 @@ mod test { .unwrap(); // Validate the contents of the package - let (_, current_paths) = validate_package_directory(cache_lock.path()).unwrap(); + let (_, current_paths) = + validate_package_directory(cache_lock.path(), ValidationMode::Full).unwrap(); // Make sure that the paths are the same as what we would expect from the // original tar archive. diff --git a/crates/rattler_cache/src/validation.rs b/crates/rattler_cache/src/validation.rs index bc4e2cd86..d2b6f43fb 100644 --- a/crates/rattler_cache/src/validation.rs +++ b/crates/rattler_cache/src/validation.rs @@ -1,25 +1,39 @@ //! Functionality to validate the contents of a Conda package. //! -//! Almost all Conda packages contain a file `info/paths.json` that describes all the files the -//! package contains. The [`validate_package_directory`] function validates that a directory -//! containing an extracted Conda package archive actually contains the files as described by the -//! `paths.json` file. +//! Almost all Conda packages contain a file `info/paths.json` that describes +//! all the files the package contains. The [`validate_package_directory`] +//! function validates that a directory containing an extracted Conda package +//! archive actually contains the files as described by the `paths.json` file. //! -//! Very old Conda packages do not contain a `paths.json` file. These packages contain a -//! (deprecated) `files` file as well as optionally a `has_prefix` and some other files. If the -//! `paths.json` file is missing these deprecated files are used instead to reconstruct a -//! [`PathsJson`] object. See [`PathsJson::from_deprecated_package_directory`] for more information. +//! Very old Conda packages do not contain a `paths.json` file. These packages +//! contain a (deprecated) `files` file as well as optionally a `has_prefix` and +//! some other files. If the `paths.json` file is missing these deprecated files +//! are used instead to reconstruct a [`PathsJson`] object. See +//! [`PathsJson::from_deprecated_package_directory`] for more information. -use digest::Digest; -use rattler_conda_types::package::{IndexJson, PackageFile, PathType, PathsEntry, PathsJson}; -use rattler_digest::Sha256; use std::{ - io::ErrorKind, + io::{BufReader, ErrorKind}, path::{Path, PathBuf}, }; -/// An error that is returned by [`validate_package_directory`] if the contents of the directory seems to be -/// corrupted. +use digest::Digest; +use rattler_conda_types::package::{IndexJson, PackageFile, PathType, PathsEntry, PathsJson}; +use rattler_digest::Sha256; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; +use rayon::prelude::IndexedParallelIterator; + +/// The mode in which the validation should be performed. +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum ValidationMode { + /// Only check if the files exists. Do not check if the hashes match. + Fast, + + /// Check if the files exists and the content matches the hashes. + Full, +} + +/// An error that is returned by [`validate_package_directory`] if the contents +/// of the directory seems to be corrupted. #[derive(Debug, thiserror::Error)] pub enum PackageValidationError { /// Neither a `paths.json` file nor a deprecated `files` file was found. @@ -43,7 +57,8 @@ pub enum PackageValidationError { ReadIndexJsonError(#[source] std::io::Error), } -/// An error that indicates that a specific file in a package archive directory seems to be corrupted. +/// An error that indicates that a specific file in a package archive directory +/// seems to be corrupted. #[derive(Debug, thiserror::Error)] pub enum PackageEntryValidationError { /// An error occurred while reading the metadata of the file. @@ -75,24 +90,26 @@ pub enum PackageEntryValidationError { HashMismatch(String, String), } -/// Determine whether the files in the specified directory match what is expected according to the -/// `info/paths.json` file in the same directory. +/// Determine whether the files in the specified directory match what is +/// expected according to the `info/paths.json` file in the same directory. /// -/// If the `info/paths.json` file could not be found this function tries to reconstruct the -/// information from older deprecated methods. See [`PathsJson::from_deprecated_package_directory`]. +/// If the `info/paths.json` file could not be found this function tries to +/// reconstruct the information from older deprecated methods. See +/// [`PathsJson::from_deprecated_package_directory`]. /// -/// If validation succeeds the parsed [`PathsJson`] object is returned which contains information -/// about the files in the archive. +/// If validation succeeds the parsed [`PathsJson`] object is returned which +/// contains information about the files in the archive. pub fn validate_package_directory( package_dir: &Path, + mode: ValidationMode, ) -> Result<(IndexJson, PathsJson), PackageValidationError> { // Validate that there is a valid IndexJson let index_json = IndexJson::from_package_directory(package_dir) .map_err(PackageValidationError::ReadIndexJsonError)?; - // Read the 'paths.json' file which describes all files that should be present. If the file - // could not be found try reconstructing the paths information from deprecated files in the - // package directory. + // Read the 'paths.json' file which describes all files that should be present. + // If the file could not be found try reconstructing the paths information + // from deprecated files in the package directory. let paths = match PathsJson::from_package_directory(package_dir) { Err(e) if e.kind() == ErrorKind::NotFound => { match PathsJson::from_deprecated_package_directory(package_dir) { @@ -108,48 +125,63 @@ pub fn validate_package_directory( }; // Validate all the entries - validate_package_directory_from_paths(package_dir, &paths) + validate_package_directory_from_paths(package_dir, &paths, mode) .map_err(|(path, err)| PackageValidationError::CorruptedEntry(path, err))?; Ok((index_json, paths)) } -/// Determine whether the files in the specified directory match wat is expected according to the -/// passed in [`PathsJson`]. +/// Determine whether the files in the specified directory match wat is expected +/// according to the passed in [`PathsJson`]. pub fn validate_package_directory_from_paths( package_dir: &Path, paths: &PathsJson, + mode: ValidationMode, ) -> Result<(), (PathBuf, PackageEntryValidationError)> { // Check every entry in the PathsJson object - for entry in paths.paths.iter() { - validate_package_entry(package_dir, entry).map_err(|e| (entry.relative_path.clone(), e))?; - } - - Ok(()) + paths + .paths + .par_iter() + .with_min_len(1000) + .try_for_each(|entry| { + validate_package_entry(package_dir, entry, mode) + .map_err(|e| (entry.relative_path.clone(), e)) + }) } -/// Determine whether the information in the [`PathsEntry`] matches the file in the package directory. +/// Determine whether the information in the [`PathsEntry`] matches the file in +/// the package directory. fn validate_package_entry( package_dir: &Path, entry: &PathsEntry, + mode: ValidationMode, ) -> Result<(), PackageEntryValidationError> { let path = package_dir.join(&entry.relative_path); // Validate based on the type of path match entry.path_type { - PathType::HardLink => validate_package_hard_link_entry(path, entry), - PathType::SoftLink => validate_package_soft_link_entry(path, entry), - PathType::Directory => validate_package_directory_entry(path, entry), + PathType::HardLink => validate_package_hard_link_entry(path, entry, mode), + PathType::SoftLink => validate_package_soft_link_entry(path, entry, mode), + PathType::Directory => validate_package_directory_entry(path, entry, mode), } } -/// Determine whether the information in the [`PathsEntry`] matches the file at the specified path. +/// Determine whether the information in the [`PathsEntry`] matches the file at +/// the specified path. fn validate_package_hard_link_entry( path: PathBuf, entry: &PathsEntry, + mode: ValidationMode, ) -> Result<(), PackageEntryValidationError> { debug_assert!(entry.path_type == PathType::HardLink); + if mode == ValidationMode::Fast { + if !path.is_file() { + return Err(PackageEntryValidationError::NotFound); + } + return Ok(()); + } + // Short-circuit if we have no validation reference if entry.sha256.is_none() && entry.size_in_bytes.is_none() { if !path.is_file() { @@ -159,7 +191,7 @@ fn validate_package_hard_link_entry( } // Open the file for reading - let mut file = match std::fs::File::open(&path) { + let file = match std::fs::File::open(&path) { Ok(file) => file, Err(e) if e.kind() == ErrorKind::NotFound => { return Err(PackageEntryValidationError::NotFound); @@ -184,6 +216,7 @@ fn validate_package_hard_link_entry( // Check the SHA256 hash of the file if let Some(expected_hash) = &entry.sha256 { // Determine the hash of the file on disk + let mut file = BufReader::with_capacity(64 * 1024, file); let mut hasher = Sha256::default(); std::io::copy(&mut file, &mut hasher)?; let hash = hasher.finalize(); @@ -200,11 +233,12 @@ fn validate_package_hard_link_entry( Ok(()) } -/// Determine whether the information in the [`PathsEntry`] matches the symbolic link at the specified -/// path. +/// Determine whether the information in the [`PathsEntry`] matches the symbolic +/// link at the specified path. fn validate_package_soft_link_entry( path: PathBuf, entry: &PathsEntry, + _mode: ValidationMode, ) -> Result<(), PackageEntryValidationError> { debug_assert!(entry.path_type == PathType::SoftLink); @@ -212,18 +246,21 @@ fn validate_package_soft_link_entry( return Err(PackageEntryValidationError::ExpectedSymlink); } - // TODO: Validate symlink content. Dont validate the SHA256 hash of the file because since a - // symlink will most likely point to another file added as a hardlink by the package this is - // double work. Instead check that the symlink is correct e.g. `../a` points to the same file as - // `b/../../a` but they are different. + // TODO: Validate symlink content. Dont validate the SHA256 hash of the file + // because since a symlink will most likely point to another file added as a + // hardlink by the package this is double work. Instead check that the + // symlink is correct e.g. `../a` points to the same file as `b/../../a` but + // they are different. Ok(()) } -/// Determine whether the information in the [`PathsEntry`] matches the directory at the specified path. +/// Determine whether the information in the [`PathsEntry`] matches the +/// directory at the specified path. fn validate_package_directory_entry( path: PathBuf, entry: &PathsEntry, + _mode: ValidationMode, ) -> Result<(), PackageEntryValidationError> { debug_assert!(entry.path_type == PathType::Directory); @@ -236,16 +273,18 @@ fn validate_package_directory_entry( #[cfg(test)] mod test { - use super::{ - validate_package_directory, validate_package_directory_from_paths, - PackageEntryValidationError, PackageValidationError, - }; + use std::io::Write; + use assert_matches::assert_matches; use rattler_conda_types::package::{PackageFile, PathType, PathsJson}; use rstest::rstest; - use std::io::Write; use url::Url; + use super::{ + validate_package_directory, validate_package_directory_from_paths, + PackageEntryValidationError, PackageValidationError, ValidationMode, + }; + #[rstest] #[case::conda( "https://conda.anaconda.org/conda-forge/win-64/conda-22.9.0-py38haa244fe_2.tar.bz2", @@ -270,9 +309,9 @@ mod test { rattler_package_streaming::fs::extract(&package_path, temp_dir.path()).unwrap(); - // Validate that the extracted package is correct. Since it's just been extracted this should - // work. - let result = validate_package_directory(temp_dir.path()); + // Validate that the extracted package is correct. Since it's just been + // extracted this should work. + let result = validate_package_directory(temp_dir.path(), ValidationMode::Full); if let Err(e) = result { panic!("{e}"); } @@ -295,9 +334,10 @@ mod test { file.write_all(&[255]).unwrap(); drop(file); - // Revalidate the package, given that we changed a file it should now fail with mismatched hashes. + // Revalidate the package, given that we changed a file it should now fail with + // mismatched hashes. assert_matches!( - validate_package_directory_from_paths(temp_dir.path(), &paths), + validate_package_directory_from_paths(temp_dir.path(), &paths, ValidationMode::Full), Err(( path, PackageEntryValidationError::HashMismatch(_, _) @@ -323,9 +363,9 @@ mod test { rattler_package_streaming::fs::extract(&package_path, temp_dir.path()).unwrap(); - // Validate that the extracted package is correct. Since it's just been extracted this should - // work. - let result = validate_package_directory(temp_dir.path()); + // Validate that the extracted package is correct. Since it's just been + // extracted this should work. + let result = validate_package_directory(temp_dir.path(), ValidationMode::Full); if let Err(e) = result { panic!("{e}"); } @@ -348,7 +388,7 @@ mod test { // Revalidate the package, given that we replaced the symlink, it should fail. assert_matches!( - validate_package_directory_from_paths(temp_dir.path(), &paths), + validate_package_directory_from_paths(temp_dir.path(), &paths, ValidationMode::Full), Err(( path, PackageEntryValidationError::ExpectedSymlink @@ -360,7 +400,7 @@ mod test { fn test_missing_metadata() { let temp_dir = tempfile::tempdir().unwrap(); assert_matches!( - validate_package_directory(temp_dir.path()), + validate_package_directory(temp_dir.path(), ValidationMode::Full), Err(PackageValidationError::ReadIndexJsonError(_)) ); }