diff --git a/src/environment.rs b/src/environment.rs index 8e28f4221..dd6c3957a 100644 --- a/src/environment.rs +++ b/src/environment.rs @@ -225,7 +225,7 @@ pub async fn update_prefix_pypi( name: &str, prefix: &Prefix, platform: Platform, - package_db: &PackageDb, + package_db: Arc<PackageDb>, conda_records: &[RepoDataRecord], pypi_records: &[(PypiPackageData, PypiPackageEnvironmentData)], status: &PythonStatus, @@ -388,7 +388,7 @@ impl<'p> LockFileDerivedData<'p> { environment.name().as_str(), &prefix, platform, - &package_db, + package_db, &repodata_records, &pypi_records, &python_status, diff --git a/src/install_pypi.rs b/src/install_pypi.rs index 014203d17..79f108b85 100644 --- a/src/install_pypi.rs +++ b/src/install_pypi.rs @@ -12,6 +12,7 @@ use crate::consts::PROJECT_MANIFEST; use crate::project::manifest::SystemRequirements; use crate::pypi_marker_env::determine_marker_environment; use crate::pypi_tags::{is_python_record, project_platform_tags}; +use pep508_rs::MarkerEnvironment; use rattler_conda_types::{Platform, RepoDataRecord}; use rattler_lock::{PypiPackageData, PypiPackageEnvironmentData}; use rip::artifacts::wheel::{InstallPaths, UnpackWheelOptions}; @@ -19,6 +20,7 @@ use rip::artifacts::Wheel; use rip::index::PackageDb; use rip::python_env::{ find_distributions_in_venv, uninstall_distribution, Distribution, PythonLocation, WheelTag, + WheelTags, }; use rip::resolve::{ResolveOptions, SDistResolution}; use rip::types::{ @@ -28,6 +30,7 @@ use rip::wheel_builder::WheelBuilder; use std::collections::{HashMap, HashSet}; use std::path::Path; use std::str::FromStr; +use std::sync::Arc; use std::time::Duration; use tokio::task::JoinError; @@ -40,7 +43,7 @@ type CombinedPypiPackageData = (PypiPackageData, PypiPackageEnvironmentData); // TODO: refactor arguments in struct #[allow(clippy::too_many_arguments)] pub async fn update_python_distributions( - package_db: &PackageDb, + package_db: Arc<PackageDb>, prefix: &Prefix, conda_package: &[RepoDataRecord], python_packages: &[CombinedPypiPackageData], @@ -91,31 +94,32 @@ pub async fn update_python_distributions( .ok_or_else(|| miette::miette!("could not resolve pypi dependencies because no python interpreter is added to the dependencies of the project.\nMake sure to add a python interpreter to the [dependencies] section of the {PROJECT_MANIFEST}, or run:\n\n\tpixi add python"))?; // Determine the environment markers - let marker_environment = determine_marker_environment(platform, python_record.as_ref())?; + let marker_environment = Arc::new(determine_marker_environment( + platform, + python_record.as_ref(), + )?); // Determine the compatible tags - let compatible_tags = - project_platform_tags(platform, system_requirements, python_record.as_ref()); - - let wheel_builder = WheelBuilder::new( - package_db, - &marker_environment, - Some(&compatible_tags), - &ResolveOptions { - sdist_resolution, - python_location: PythonLocation::Custom(python_location), - ..Default::default() - }, - HashMap::default(), - ) - .into_diagnostic() - .context("error in construction of WheelBuilder for `pypi-dependencies` installation")?; + let compatible_tags = Arc::new(project_platform_tags( + platform, + system_requirements, + python_record.as_ref(), + )); + + // Define the resolve options for local wheel building + let resolve_options = Arc::new(ResolveOptions { + sdist_resolution, + python_location: PythonLocation::Custom(python_location), + ..Default::default() + }); // Start downloading the python packages that we want in the background. let (package_stream, package_stream_pb) = stream_python_artifacts( package_db, + marker_environment, + compatible_tags, + resolve_options, python_distributions_to_install.clone(), - Some(&wheel_builder), ); // Remove python packages that need to be removed @@ -229,12 +233,14 @@ async fn install_python_distributions( /// Creates a stream which downloads the specified python packages. The stream will download the /// packages in parallel and yield them as soon as they become available. -fn stream_python_artifacts<'a>( - package_db: &'a PackageDb, - packages_to_download: Vec<&'a CombinedPypiPackageData>, - wheel_builder: Option<&'a WheelBuilder<'a, 'a>>, +fn stream_python_artifacts( + package_db: Arc<PackageDb>, + marker_environment: Arc<MarkerEnvironment>, + compatible_tags: Arc<WheelTags>, + resolve_options: Arc<ResolveOptions>, + packages_to_download: Vec<&CombinedPypiPackageData>, ) -> ( - impl Stream<Item = miette::Result<(Option<String>, HashSet<Extra>, Wheel)>> + 'a, + impl Stream<Item = miette::Result<(Option<String>, HashSet<Extra>, Wheel)>> + '_, Option<ProgressBar>, ) { if packages_to_download.is_empty() { @@ -261,6 +267,11 @@ fn stream_python_artifacts<'a>( .map(move |(pkg_data, pkg_env_data)| { let pb = stream_pb.clone(); let message_formatter = message_formatter.clone(); + let marker_environment = marker_environment.clone(); + let compatible_tags = compatible_tags.clone(); + let resolve_options = resolve_options.clone(); + let package_db = package_db.clone(); + async move { // Determine the filename from the let filename = pkg_data @@ -293,9 +304,31 @@ fn stream_python_artifacts<'a>( yanked: Default::default(), }; - // TODO: Maybe we should have a cache of wheels separate from the package_db. Since a - // wheel can just be identified by its hash or url. - let wheel: Wheel = package_db.get_wheel(&artifact_info, wheel_builder).await?; + let wheel = tokio::spawn({ + let marker_environment = marker_environment.clone(); + let compatible_tags = compatible_tags.clone(); + let resolve_options = resolve_options.clone(); + let package_db = package_db.clone(); + async move { + let wheel_builder = WheelBuilder::new( + &package_db, + &marker_environment, + Some(&compatible_tags), + &resolve_options, + HashMap::default(), + ) + .into_diagnostic() + .context("error in construction of WheelBuilder for `pypi-dependencies` installation")?; + + // TODO: Maybe we should have a cache of wheels separate from the package_db. Since a + // wheel can just be identified by its hash or url. + package_db.get_wheel(&artifact_info, Some(&wheel_builder)).await + } + }) + .await.unwrap_or_else(|e| match e.try_into_panic() { + Ok(panic) => std::panic::resume_unwind(panic), + Err(_) => Err(miette::miette!("operation was cancelled")) + })?; // Update the progress bar pb_task.finish().await; @@ -322,8 +355,7 @@ fn stream_python_artifacts<'a>( )) } }) - // TODO: put this back on 20 when there is not deadlock anymore. - .buffer_unordered(1) + .buffer_unordered(20) .right_stream(); (download_stream, Some(pb))