-
Notifications
You must be signed in to change notification settings - Fork 751
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
7 changed files
with
269 additions
and
46 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
use std::cmp::min; | ||
|
||
use pubgrub::range::Range; | ||
use rustc_hash::FxHashMap; | ||
use tokio::sync::mpsc::Sender; | ||
use tracing::{debug, trace}; | ||
|
||
use distribution_types::{DistributionMetadata, ResolvedDistRef}; | ||
use pep440_rs::Version; | ||
|
||
use crate::candidate_selector::{CandidateDist, CandidateSelector}; | ||
use crate::pubgrub::PubGrubPackage; | ||
use crate::resolver::Request; | ||
use crate::{InMemoryIndex, ResolveError, VersionsResponse}; | ||
|
||
enum BatchPrefetchStrategy { | ||
/// Go through the next versions assuming the existing selection and its constraints | ||
/// remain. | ||
Compatible(Range<Version>, Version), | ||
/// We encounter cases (botocore) where the above doesn't work: Say we previously selected | ||
/// a==x.y.z, which depends on b==x.y.z. a==x.y.z is incompatible, but we don't know that | ||
/// yet. We just select b==x.y.z and want to prefetch, since for all versions of a we try, | ||
/// we have to wait for the matching version of b. The selector gives us only one version of | ||
/// b, so we're now here 0 versions prefetched. Instead, we guess that the next version of b | ||
/// will be x.y.(z-1) and so forth. | ||
InOrder(Version), | ||
} | ||
|
||
/// Prefetch a large number of versions if we already unsuccessfully tried many versions. | ||
/// | ||
/// This is an optimization specifically targeted at cold cache urllib3/boto3/botocore, where we | ||
/// have to fetch the metadata for a lot of versions. | ||
#[derive(Default)] | ||
pub(crate) struct BatchPrefetcher { | ||
tried_versions: FxHashMap<PubGrubPackage, usize>, | ||
last_prefetch: FxHashMap<PubGrubPackage, usize>, | ||
} | ||
|
||
impl BatchPrefetcher { | ||
/// Prefetch a large number of versions if we already unsuccessfully tried many versions. | ||
pub(crate) async fn prefetch_batches( | ||
&mut self, | ||
next: &PubGrubPackage, | ||
version: &Version, | ||
current_range: &Range<Version>, | ||
request_sink: &Sender<Request>, | ||
index: &InMemoryIndex, | ||
selector: &CandidateSelector, | ||
) -> anyhow::Result<(), ResolveError> { | ||
let PubGrubPackage::Package(package_name, _, _) = &next else { | ||
return Ok(()); | ||
}; | ||
|
||
let (num_tried, do_prefetch) = self.should_prefetch(next); | ||
if !do_prefetch { | ||
return Ok(()); | ||
} | ||
let total_prefetch = min(num_tried, 50); | ||
let mut counter = total_prefetch; | ||
|
||
// This is immediate, we already fetched the version map. | ||
let versions_response = index | ||
.packages | ||
.wait(package_name) | ||
.await | ||
.ok_or(ResolveError::Unregistered)?; | ||
|
||
let VersionsResponse::Found(ref version_map) = *versions_response else { | ||
return Ok(()); | ||
}; | ||
|
||
let mut phase = BatchPrefetchStrategy::Compatible(current_range.clone(), version.clone()); | ||
while counter > 0 { | ||
counter -= 1; | ||
let candidate = match phase { | ||
BatchPrefetchStrategy::Compatible(range, last_version) => { | ||
if let Some(candidate) = | ||
selector.select_no_preference(package_name, &range, version_map) | ||
{ | ||
let range = range.intersection( | ||
&Range::singleton(candidate.version().clone()).complement(), | ||
); | ||
phase = | ||
BatchPrefetchStrategy::Compatible(range, candidate.version().clone()); | ||
candidate | ||
} else { | ||
// We exhausted the compatible version, switch to ignoring the existing | ||
// constraints on the package and instead going through versions in order. | ||
phase = BatchPrefetchStrategy::InOrder(last_version); | ||
continue; | ||
} | ||
} | ||
BatchPrefetchStrategy::InOrder(last_version) => { | ||
let range = if selector.use_highest_version(package_name) { | ||
Range::strictly_lower_than(last_version) | ||
} else { | ||
Range::strictly_higher_than(last_version) | ||
}; | ||
if let Some(candidate) = | ||
selector.select_no_preference(package_name, &range, version_map) | ||
{ | ||
phase = BatchPrefetchStrategy::InOrder(candidate.version().clone()); | ||
candidate | ||
} else { | ||
// Both strategies exhausted their candidates. | ||
break; | ||
} | ||
} | ||
}; | ||
|
||
let CandidateDist::Compatible(dist) = candidate.dist() else { | ||
continue; | ||
}; | ||
let dist = dist.for_resolution(); | ||
|
||
// Emit a request to fetch the metadata for this version. | ||
trace!( | ||
"Prefetching ({}) {}", | ||
match phase { | ||
BatchPrefetchStrategy::Compatible(_, _) => "compatible", | ||
BatchPrefetchStrategy::InOrder(_) => "in order", | ||
}, | ||
dist | ||
); | ||
if index.distributions.register(candidate.package_id()) { | ||
let request = match dist { | ||
ResolvedDistRef::Installable(dist) => Request::Dist(dist.clone()), | ||
ResolvedDistRef::Installed(dist) => Request::Installed(dist.clone()), | ||
}; | ||
request_sink.send(request).await?; | ||
} | ||
} | ||
|
||
debug!( | ||
"Prefetching {} {} versions", | ||
total_prefetch - counter, | ||
package_name | ||
); | ||
|
||
self.last_prefetch.insert(next.clone(), num_tried); | ||
Ok(()) | ||
} | ||
|
||
/// Each time we tried a version for a package, we register that here. | ||
pub(crate) fn version_tried(&mut self, package: PubGrubPackage) { | ||
*self.tried_versions.entry(package).or_default() += 1; | ||
} | ||
|
||
/// After 5, 10, 20, 40 tried versions, prefetch that many versions to start early but not | ||
/// too aggressive. Later we schedule the prefetch of 50 versions every 20 versions, this gives | ||
/// us a good buffer until we see prefetch again and is high enough to saturate the task pool. | ||
fn should_prefetch(&self, next: &PubGrubPackage) -> (usize, bool) { | ||
let num_tried = self.tried_versions.get(next).copied().unwrap_or_default(); | ||
let previous_prefetch = self.last_prefetch.get(next).copied().unwrap_or_default(); | ||
let do_prefetch = (num_tried >= 5 && previous_prefetch < 5) | ||
|| (num_tried >= 10 && previous_prefetch < 10) | ||
|| (num_tried >= 20 && previous_prefetch < 20) | ||
|| (num_tried >= 20 && num_tried - previous_prefetch >= 20); | ||
(num_tried, do_prefetch) | ||
} | ||
} |
Oops, something went wrong.