From 99778fe7f58159672504209ac6861208c27bb557 Mon Sep 17 00:00:00 2001 From: konstin Date: Mon, 18 Mar 2024 13:43:57 +0100 Subject: [PATCH] Faster boto3, first half --- crates/uv-resolver/src/candidate_selector.rs | 8 +- .../uv-resolver/src/pubgrub/dependencies.rs | 3 +- crates/uv-resolver/src/pubgrub/package.rs | 2 +- crates/uv-resolver/src/resolver/mod.rs | 101 ++++++++++++++++-- 4 files changed, 104 insertions(+), 10 deletions(-) diff --git a/crates/uv-resolver/src/candidate_selector.rs b/crates/uv-resolver/src/candidate_selector.rs index de78926450a7..d22e8e9e6549 100644 --- a/crates/uv-resolver/src/candidate_selector.rs +++ b/crates/uv-resolver/src/candidate_selector.rs @@ -103,7 +103,7 @@ impl CandidateSelector { pub(crate) fn select<'a>( &'a self, package_name: &'a PackageName, - range: &'a Range, + range: &Range, version_map: &'a VersionMap, ) -> Option> { // If the package has a preference (e.g., an existing version from an existing lockfile), @@ -327,7 +327,11 @@ pub(crate) struct Candidate<'a> { } impl<'a> Candidate<'a> { - fn new(name: &'a PackageName, version: &'a Version, dist: &'a PrioritizedDist) -> Self { + pub(crate) fn new( + name: &'a PackageName, + version: &'a Version, + dist: &'a PrioritizedDist, + ) -> Self { Self { name, version, diff --git a/crates/uv-resolver/src/pubgrub/dependencies.rs b/crates/uv-resolver/src/pubgrub/dependencies.rs index 97de05b40e5a..58b37707ee17 100644 --- a/crates/uv-resolver/src/pubgrub/dependencies.rs +++ b/crates/uv-resolver/src/pubgrub/dependencies.rs @@ -11,7 +11,8 @@ use crate::constraints::Constraints; use crate::overrides::Overrides; use crate::pubgrub::specifier::PubGrubSpecifier; use crate::pubgrub::PubGrubPackage; -use crate::resolver::{Locals, Urls}; +use crate::resolver::urls::Urls; +use crate::resolver::Locals; use crate::ResolveError; #[derive(Debug)] diff --git a/crates/uv-resolver/src/pubgrub/package.rs b/crates/uv-resolver/src/pubgrub/package.rs index 922b2b54bcb5..fe20d6aadfb8 100644 --- a/crates/uv-resolver/src/pubgrub/package.rs +++ b/crates/uv-resolver/src/pubgrub/package.rs @@ -3,7 +3,7 @@ use derivative::Derivative; use pep508_rs::VerbatimUrl; use uv_normalize::{ExtraName, PackageName}; -use crate::resolver::Urls; +use crate::resolver::urls::Urls; /// A PubGrub-compatible wrapper around a "Python package", with two notable characteristics: /// diff --git a/crates/uv-resolver/src/resolver/mod.rs b/crates/uv-resolver/src/resolver/mod.rs index 3d56d0caebc4..355e46c0fb13 100644 --- a/crates/uv-resolver/src/resolver/mod.rs +++ b/crates/uv-resolver/src/resolver/mod.rs @@ -1,6 +1,7 @@ //! Given a set of requirements, find a set of compatible packages. use std::borrow::Cow; +use std::cmp::min; use std::fmt::{Display, Formatter}; use std::ops::Deref; use std::sync::Arc; @@ -13,6 +14,7 @@ use pubgrub::error::PubGrubError; use pubgrub::range::Range; use pubgrub::solver::{Incompatibility, State}; use rustc_hash::{FxHashMap, FxHashSet}; +use tokio::sync::mpsc::Sender; use tokio_stream::wrappers::ReceiverStream; use tracing::{debug, info_span, instrument, trace, Instrument}; use url::Url; @@ -25,14 +27,14 @@ use pep440_rs::{Version, MIN_VERSION}; use pep508_rs::{MarkerEnvironment, Requirement}; use platform_tags::{IncompatibleTag, Tags}; use pypi_types::{Metadata23, Yanked}; -pub(crate) use urls::Urls; +use urls::Urls; use uv_client::{FlatIndex, RegistryClient}; use uv_distribution::DistributionDatabase; use uv_interpreter::Interpreter; use uv_normalize::PackageName; use uv_traits::BuildContext; -use crate::candidate_selector::{CandidateDist, CandidateSelector}; +use crate::candidate_selector::{Candidate, CandidateDist, CandidateSelector}; use crate::constraints::Constraints; use crate::editables::Editables; use crate::error::ResolveError; @@ -52,7 +54,6 @@ pub use crate::resolver::provider::{ }; use crate::resolver::reporter::Facade; pub use crate::resolver::reporter::{BuildId, Reporter}; - use crate::yanks::AllowedYanks; use crate::{DependencyMode, Options}; pub(crate) use locals::Locals; @@ -61,7 +62,7 @@ mod index; mod locals; mod provider; mod reporter; -mod urls; +pub(crate) mod urls; /// The package version is unavailable and cannot be used /// Unlike [`PackageUnavailable`] this applies to a single version of the package @@ -194,13 +195,13 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { // A channel to fetch package metadata (e.g., given `flask`, fetch all versions) and version // metadata (e.g., given `flask==1.0.0`, fetch the metadata for that version). // Channel size is set to the same size as the task buffer for simplicity. - let (request_sink, request_stream) = tokio::sync::mpsc::channel(50); + let (request_sink, request_stream) = tokio::sync::mpsc::channel(300); // Run the fetcher. let requests_fut = self.fetch(request_stream).fuse(); // Run the solver. - let resolve_fut = self.solve(request_sink).fuse(); + let resolve_fut = self.solve(request_sink).boxed().fuse(); // Wait for both to complete. match tokio::try_join!(requests_fut, resolve_fut) { @@ -236,6 +237,8 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { request_sink: tokio::sync::mpsc::Sender, ) -> Result { let root = PubGrubPackage::Root(self.project.clone()); + let mut tried_versions = FxHashMap::default(); + let mut last_prefetch = FxHashMap::default(); // Keep track of the packages for which we've requested metadata. let mut pins = FilePins::default(); @@ -284,6 +287,8 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { }; next = highest_priority_pkg; + *tried_versions.entry(next.clone()).or_default() += 1; + let term_intersection = state .partial_solution .term_intersection_for_package(&next) @@ -425,6 +430,16 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { } }; + self.prefetch_batches( + &next, + &version, + term_intersection.unwrap_positive(), + &mut tried_versions, + &mut last_prefetch, + &request_sink, + ) + .await?; + self.on_progress(&next, &version); if added_dependencies @@ -489,6 +504,80 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { } } + async fn prefetch_batches( + &self, + next: &PubGrubPackage, + version: &Version, + current_range: &Range, + tried_versions: &mut FxHashMap, + last_prefetch: &mut FxHashMap, + request_sink: &Sender, + ) -> Result<(), ResolveError> { + let PubGrubPackage::Package(package_name, _, _) = &next else { + return Ok(()); + }; + + // Figure out if we should prefetch. + let num_tried = tried_versions.get(next).copied().unwrap_or_default(); + let previous_prefetch = last_prefetch.get(next).copied().unwrap_or_default(); + // After 5, 10, 20, 40 tried versions prefetch that many versions, then every + // 20 versions prefetch 50 versions. + // We can skip versions early so we have to `num_tried - previous_prefetch` with greater + // equal not equal. + let do_prefetch = (num_tried >= 5 && previous_prefetch < 5) + || (num_tried >= 10 && previous_prefetch < 10) + || (num_tried >= 20 && previous_prefetch < 20) + || (num_tried >= 20 && num_tried - previous_prefetch >= 20); + if !do_prefetch { + return Ok(()); + } + let total_prefetch = min(num_tried, 50); + let mut counter = total_prefetch; + + // This is immediate, we already fetched the version map. + let versions_response = self + .index + .packages + .wait(package_name) + .await + .ok_or(ResolveError::Unregistered)?; + + let VersionsResponse::Found(ref version_map) = *versions_response else { + return Ok(()); + }; + + let mut last_version = version.clone(); + let mut range = current_range.clone(); + while counter > 0 { + let Some(candidate) = self.selector.select(package_name, &range, version_map) else { + break; + }; + range = range.intersection(&Range::singleton(candidate.version().clone()).complement()); + counter -= 1; + last_version = candidate.version().clone(); + + let CandidateDist::Compatible(dist) = candidate.dist() else { + break; + }; + let dist = dist.for_resolution(); + + // Emit a request to fetch the metadata for this version. + trace!("Prefetching {}", dist); + if self.index.distributions.register(candidate.package_id()) { + request_sink.send(Request::Dist(dist.clone())).await?; + } + } + + debug!( + "Prefetching {} {} versions", + total_prefetch - counter, + package_name + ); + + last_prefetch.insert(next.clone(), num_tried); + Ok(()) + } + /// Visit a [`PubGrubPackage`] prior to selection. This should be called on a [`PubGrubPackage`] /// before it is selected, to allow metadata to be fetched in parallel. async fn visit_package(