Skip to content

Commit

Permalink
feat: improve performance when linking files using rayon (#985)
Browse files Browse the repository at this point in the history
  • Loading branch information
baszalmstra authored Dec 16, 2024
1 parent 42de2b9 commit 2f571eb
Show file tree
Hide file tree
Showing 11 changed files with 546 additions and 125 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ plist = "1"
purl = { version = "0.1.3", features = ["serde"] }
quote = "1.0.37"
rand = "0.8.5"
rayon = "1.10.0"
reflink-copy = "0.1.20"
regex = "1.11.1"
reqwest = { version = "0.12.9", default-features = false }
Expand Down
17 changes: 11 additions & 6 deletions crates/rattler-bin/src/commands/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use rattler_solve::{
libsolv_c::{self},
resolvo, SolverImpl, SolverTask,
};
use reqwest::Client;
use reqwest::{Client, Url};

use crate::global_multi_progress;

Expand Down Expand Up @@ -165,11 +165,16 @@ pub async fn create(opt: Opt) -> anyhow::Result<()> {
))
.with_client(download_client.clone())
.with_channel_config(rattler_repodata_gateway::ChannelConfig {
default: SourceConfig {
sharded_enabled: false,
..SourceConfig::default()
},
..rattler_repodata_gateway::ChannelConfig::default()
default: SourceConfig::default(),
per_channel: [(
Url::parse("https://prefix.dev")?,
SourceConfig {
sharded_enabled: true,
..SourceConfig::default()
},
)]
.into_iter()
.collect(),
})
.finish();

Expand Down
1 change: 1 addition & 0 deletions crates/rattler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ rattler_digest = { path = "../rattler_digest", version = "1.0.3", default-featur
rattler_networking = { path = "../rattler_networking", version = "0.21.8", default-features = false }
rattler_shell = { path = "../rattler_shell", version = "0.22.10", default-features = false }
rattler_package_streaming = { path = "../rattler_package_streaming", version = "0.22.18", default-features = false, features = ["reqwest"] }
rayon = { workspace = true }
reflink-copy = { workspace = true }
regex = { workspace = true }
reqwest = { workspace = true, features = ["stream", "json", "gzip"] }
Expand Down
2 changes: 1 addition & 1 deletion crates/rattler/src/install/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::install::link_script::LinkScriptError;
/// system has available.
pub struct InstallDriver {
io_concurrency_semaphore: Option<Arc<Semaphore>>,
clobber_registry: Arc<Mutex<ClobberRegistry>>,
pub(crate) clobber_registry: Arc<Mutex<ClobberRegistry>>,
execute_link_scripts: bool,
}

Expand Down
4 changes: 2 additions & 2 deletions crates/rattler/src/install/installer/indicatif.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,8 @@ impl<F: ProgressFormatter> IndicatifReporterInner<F> {
.iter()
.map(|&idx| (self.package_sizes[idx], &self.package_names[idx]));

let largest_package = package_iter.max_by_key(|(size, _)| *size);
if let Some((_, first)) = largest_package {
let smallest_package = package_iter.min_by_key(|(size, _)| *size);
if let Some((_, first)) = smallest_package {
msg.push_str(first);
}

Expand Down
109 changes: 70 additions & 39 deletions crates/rattler/src/install/installer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,15 @@ use std::{
sync::Arc,
};

use super::{unlink_package, AppleCodeSignBehavior, InstallDriver, InstallOptions, Transaction};
use crate::install::link_script::LinkScriptError;
use crate::{
default_cache_dir,
install::{clobber_registry::ClobberedPath, link_script::PrePostLinkResult},
package_cache::PackageCache,
};
pub use error::InstallerError;
use futures::{stream::FuturesUnordered, FutureExt, StreamExt, TryFutureExt};
#[cfg(feature = "indicatif")]
pub use indicatif::{
DefaultProgressFormatter, IndicatifReporter, IndicatifReporterBuilder, Placement,
ProgressFormatter,
};
use rattler_cache::package_cache::CacheLock;
use rattler_cache::package_cache::CacheReporter;
use itertools::Itertools;
use rattler_cache::package_cache::{CacheLock, CacheReporter};
use rattler_conda_types::{
prefix_record::{Link, LinkType},
Platform, PrefixRecord, RepoDataRecord,
Expand All @@ -35,6 +28,16 @@ use reqwest::Client;
use simple_spawn_blocking::tokio::run_blocking_task;
use tokio::{sync::Semaphore, task::JoinError};

use super::{unlink_package, AppleCodeSignBehavior, InstallDriver, InstallOptions, Transaction};
use crate::{
default_cache_dir,
install::{
clobber_registry::ClobberedPath,
link_script::{LinkScriptError, PrePostLinkResult},
},
package_cache::PackageCache,
};

/// An installer that can install packages into a prefix.
#[derive(Default)]
pub struct Installer {
Expand Down Expand Up @@ -344,7 +347,17 @@ impl Installer {

// Execute the operations in the transaction.
let mut pending_futures = FuturesUnordered::new();
for (idx, operation) in transaction.operations.iter().enumerate() {
for (idx, operation) in transaction
.operations
.iter()
.enumerate()
.sorted_by_key(|(_, op)| {
op.record_to_install()
.and_then(|r| r.package_record.size)
.unwrap_or(0)
})
.rev()
{
let downloader = &downloader;
let package_cache = &package_cache;
let reporter = self.reporter.clone();
Expand Down Expand Up @@ -462,35 +475,45 @@ async fn link_package(
install_options: InstallOptions,
driver: &InstallDriver,
) -> Result<(), InstallerError> {
// Link the contents of the package into the prefix.
let paths =
crate::install::link_package(cached_package_dir, target_prefix, driver, install_options)
.await
let record = record.clone();
let target_prefix = target_prefix.to_path_buf();
let cached_package_dir = cached_package_dir.to_path_buf();
let clobber_registry = driver.clobber_registry.clone();

let (tx, rx) = tokio::sync::oneshot::channel();

rayon::spawn_fifo(move || {
let inner = move || {
// Link the contents of the package into the prefix.
let paths = crate::install::link_package_sync(
&cached_package_dir,
&target_prefix,
clobber_registry,
install_options,
)
.map_err(|e| InstallerError::LinkError(record.file_name.clone(), e))?;

// Construct a PrefixRecord for the package
let prefix_record = PrefixRecord {
repodata_record: record.clone(),
package_tarball_full_path: None,
extracted_package_dir: Some(cached_package_dir.to_path_buf()),
files: paths
.iter()
.map(|entry| entry.relative_path.clone())
.collect(),
paths_data: paths.into(),
// TODO: Retrieve the requested spec for this package from the request
requested_spec: None,

link: Some(Link {
source: cached_package_dir.to_path_buf(),
// TODO: compute the right value here based on the options and `can_hard_link` ...
link_type: Some(LinkType::HardLink),
}),
};
// Construct a PrefixRecord for the package
let prefix_record = PrefixRecord {
repodata_record: record.clone(),
package_tarball_full_path: None,
extracted_package_dir: Some(cached_package_dir.clone()),
files: paths
.iter()
.map(|entry| entry.relative_path.clone())
.collect(),
paths_data: paths.into(),
// TODO: Retrieve the requested spec for this package from the request
requested_spec: None,

link: Some(Link {
source: cached_package_dir,
// TODO: compute the right value here based on the options and `can_hard_link`
// ...
link_type: Some(LinkType::HardLink),
}),
};

let target_prefix = target_prefix.to_path_buf();
driver
.run_blocking_io_task(move || {
let conda_meta_path = target_prefix.join("conda-meta");
std::fs::create_dir_all(&conda_meta_path).map_err(|e| {
InstallerError::IoError("failed to create conda-meta directory".to_string(), e)
Expand All @@ -508,9 +531,17 @@ async fn link_package(
);
prefix_record
.write_to_path(conda_meta_path.join(&pkg_meta_path), true)
.map_err(|e| InstallerError::IoError(format!("failed to write {pkg_meta_path}"), e))
})
.await
.map_err(|e| {
InstallerError::IoError(format!("failed to write {pkg_meta_path}"), e)
})?;

Ok(())
};

let _ = tx.send(inner());
});

rx.await.unwrap_or(Err(InstallerError::Cancelled))
}

/// Given a repodata record, fetch the package into the cache if its not already
Expand Down
15 changes: 9 additions & 6 deletions crates/rattler/src/install/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,12 +375,15 @@ fn reflink_to_destination(
loop {
match reflink(source_path, destination_path) {
Ok(_) => {
// Copy over filesystem permissions. We do this to ensure that the destination file has the
// same permissions as the source file.
let metadata = std::fs::metadata(source_path)
.map_err(LinkFileError::FailedToReadSourceFileMetadata)?;
std::fs::set_permissions(destination_path, metadata.permissions())
.map_err(LinkFileError::FailedToUpdateDestinationFilePermissions)?;
#[cfg(target_os = "linux")]
{
// Copy over filesystem permissions. We do this to ensure that the destination file has the
// same permissions as the source file.
let metadata = std::fs::metadata(source_path)
.map_err(LinkFileError::FailedToReadSourceFileMetadata)?;
std::fs::set_permissions(destination_path, metadata.permissions())
.map_err(LinkFileError::FailedToUpdateDestinationFilePermissions)?;
}
return Ok(LinkMethod::Reflink);
}
Err(e) if e.kind() == ErrorKind::AlreadyExists => {
Expand Down
Loading

0 comments on commit 2f571eb

Please sign in to comment.