Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: download wheels in parallel to avoid deadlock #752

Merged
merged 2 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ pub async fn update_prefix_pypi(
name: &str,
prefix: &Prefix,
platform: Platform,
package_db: &PackageDb,
package_db: Arc<PackageDb>,
conda_records: &[RepoDataRecord],
pypi_records: &[(PypiPackageData, PypiPackageEnvironmentData)],
status: &PythonStatus,
Expand Down Expand Up @@ -388,7 +388,7 @@ impl<'p> LockFileDerivedData<'p> {
environment.name().as_str(),
&prefix,
platform,
&package_db,
package_db,
&repodata_records,
&pypi_records,
&python_status,
Expand Down
90 changes: 61 additions & 29 deletions src/install_pypi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ use crate::consts::PROJECT_MANIFEST;
use crate::project::manifest::SystemRequirements;
use crate::pypi_marker_env::determine_marker_environment;
use crate::pypi_tags::{is_python_record, project_platform_tags};
use pep508_rs::MarkerEnvironment;
use rattler_conda_types::{Platform, RepoDataRecord};
use rattler_lock::{PypiPackageData, PypiPackageEnvironmentData};
use rip::artifacts::wheel::{InstallPaths, UnpackWheelOptions};
use rip::artifacts::Wheel;
use rip::index::PackageDb;
use rip::python_env::{
find_distributions_in_venv, uninstall_distribution, Distribution, PythonLocation, WheelTag,
WheelTags,
};
use rip::resolve::{ResolveOptions, SDistResolution};
use rip::types::{
Expand All @@ -28,6 +30,7 @@ use rip::wheel_builder::WheelBuilder;
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinError;

Expand All @@ -40,7 +43,7 @@ type CombinedPypiPackageData = (PypiPackageData, PypiPackageEnvironmentData);
// TODO: refactor arguments in struct
#[allow(clippy::too_many_arguments)]
pub async fn update_python_distributions(
package_db: &PackageDb,
package_db: Arc<PackageDb>,
prefix: &Prefix,
conda_package: &[RepoDataRecord],
python_packages: &[CombinedPypiPackageData],
Expand Down Expand Up @@ -91,31 +94,32 @@ pub async fn update_python_distributions(
.ok_or_else(|| miette::miette!("could not resolve pypi dependencies because no python interpreter is added to the dependencies of the project.\nMake sure to add a python interpreter to the [dependencies] section of the {PROJECT_MANIFEST}, or run:\n\n\tpixi add python"))?;

// Determine the environment markers
let marker_environment = determine_marker_environment(platform, python_record.as_ref())?;
let marker_environment = Arc::new(determine_marker_environment(
platform,
python_record.as_ref(),
)?);

// Determine the compatible tags
let compatible_tags =
project_platform_tags(platform, system_requirements, python_record.as_ref());

let wheel_builder = WheelBuilder::new(
package_db,
&marker_environment,
Some(&compatible_tags),
&ResolveOptions {
sdist_resolution,
python_location: PythonLocation::Custom(python_location),
..Default::default()
},
HashMap::default(),
)
.into_diagnostic()
.context("error in construction of WheelBuilder for `pypi-dependencies` installation")?;
let compatible_tags = Arc::new(project_platform_tags(
platform,
system_requirements,
python_record.as_ref(),
));

// Define the resolve options for local wheel building
let resolve_options = Arc::new(ResolveOptions {
sdist_resolution,
python_location: PythonLocation::Custom(python_location),
..Default::default()
});

// Start downloading the python packages that we want in the background.
let (package_stream, package_stream_pb) = stream_python_artifacts(
package_db,
marker_environment,
compatible_tags,
resolve_options,
python_distributions_to_install.clone(),
Some(&wheel_builder),
);

// Remove python packages that need to be removed
Expand Down Expand Up @@ -229,12 +233,14 @@ async fn install_python_distributions(

/// Creates a stream which downloads the specified python packages. The stream will download the
/// packages in parallel and yield them as soon as they become available.
fn stream_python_artifacts<'a>(
package_db: &'a PackageDb,
packages_to_download: Vec<&'a CombinedPypiPackageData>,
wheel_builder: Option<&'a WheelBuilder<'a, 'a>>,
fn stream_python_artifacts(
package_db: Arc<PackageDb>,
marker_environment: Arc<MarkerEnvironment>,
compatible_tags: Arc<WheelTags>,
resolve_options: Arc<ResolveOptions>,
packages_to_download: Vec<&CombinedPypiPackageData>,
) -> (
impl Stream<Item = miette::Result<(Option<String>, HashSet<Extra>, Wheel)>> + 'a,
impl Stream<Item = miette::Result<(Option<String>, HashSet<Extra>, Wheel)>> + '_,
Option<ProgressBar>,
) {
if packages_to_download.is_empty() {
Expand All @@ -261,6 +267,11 @@ fn stream_python_artifacts<'a>(
.map(move |(pkg_data, pkg_env_data)| {
let pb = stream_pb.clone();
let message_formatter = message_formatter.clone();
let marker_environment = marker_environment.clone();
let compatible_tags = compatible_tags.clone();
let resolve_options = resolve_options.clone();
let package_db = package_db.clone();

async move {
// Determine the filename from the
let filename = pkg_data
Expand Down Expand Up @@ -293,9 +304,31 @@ fn stream_python_artifacts<'a>(
yanked: Default::default(),
};

// TODO: Maybe we should have a cache of wheels separate from the package_db. Since a
// wheel can just be identified by its hash or url.
let wheel: Wheel = package_db.get_wheel(&artifact_info, wheel_builder).await?;
let wheel = tokio::spawn({
let marker_environment = marker_environment.clone();
let compatible_tags = compatible_tags.clone();
let resolve_options = resolve_options.clone();
let package_db = package_db.clone();
async move {
let wheel_builder = WheelBuilder::new(
&package_db,
&marker_environment,
Some(&compatible_tags),
&resolve_options,
HashMap::default(),
)
.into_diagnostic()
.context("error in construction of WheelBuilder for `pypi-dependencies` installation")?;

// TODO: Maybe we should have a cache of wheels separate from the package_db. Since a
// wheel can just be identified by its hash or url.
package_db.get_wheel(&artifact_info, Some(&wheel_builder)).await
}
})
.await.unwrap_or_else(|e| match e.try_into_panic() {
Ok(panic) => std::panic::resume_unwind(panic),
Err(_) => Err(miette::miette!("operation was cancelled"))
})?;

// Update the progress bar
pb_task.finish().await;
Expand All @@ -322,8 +355,7 @@ fn stream_python_artifacts<'a>(
))
}
})
// TODO: put this back on 20 when there is not deadlock anymore.
.buffer_unordered(1)
.buffer_unordered(20)
.right_stream();

(download_stream, Some(pb))
Expand Down
Loading