diff --git a/src/cargo/sources/git/utils.rs b/src/cargo/sources/git/utils.rs index 5b0d60a9887..434fc35300e 100644 --- a/src/cargo/sources/git/utils.rs +++ b/src/cargo/sources/git/utils.rs @@ -3,7 +3,7 @@ use crate::core::GitReference; use crate::util::errors::CargoResult; -use crate::util::{network, Config, IntoUrl, Progress}; +use crate::util::{network, Config, IntoUrl, MetricsCounter, Progress}; use anyhow::{anyhow, Context as _}; use cargo_util::{paths, ProcessBuilder}; use curl::easy::List; @@ -15,6 +15,7 @@ use std::env; use std::fmt; use std::path::{Path, PathBuf}; use std::process::Command; +use std::time::{Duration, Instant}; use url::Url; fn serialize_str(t: &T, s: S) -> Result @@ -677,7 +678,7 @@ fn reset(repo: &git2::Repository, obj: &git2::Object<'_>, config: &Config) -> Ca let mut pb = Progress::new("Checkout", config); let mut opts = git2::build::CheckoutBuilder::new(); opts.progress(|_, cur, max| { - drop(pb.tick(cur, max)); + drop(pb.tick(cur, max, "")); }); debug!("doing reset"); repo.reset(obj, git2::ResetType::Hard, Some(&mut opts))?; @@ -694,12 +695,49 @@ pub fn with_fetch_options( let mut progress = Progress::new("Fetch", config); network::with_retry(config, || { with_authentication(url, git_config, |f| { + let mut last_update = Instant::now(); let mut rcb = git2::RemoteCallbacks::new(); + // We choose `N=10` here to make a `300ms * 10slots ~= 3000ms` + // sliding window for tracking the data transfer rate (in bytes/s). + let mut counter = MetricsCounter::<10>::new(0, last_update); rcb.credentials(f); - rcb.transfer_progress(|stats| { + let indexed_deltas = stats.indexed_deltas(); + let msg = if indexed_deltas > 0 { + // Resolving deltas. + format!( + ", ({}/{}) resolving deltas", + indexed_deltas, + stats.total_deltas() + ) + } else { + // Receiving objects. + // + // # Caveat + // + // Progress bar relies on git2 calling `transfer_progress` + // to update its transfer rate, but we cannot guarantee a + // periodic call of that callback. Thus if we don't receive + // any data for, say, 10 seconds, the rate will get stuck + // and never go down to 0B/s. + // In the future, we need to find away to update the rate + // even when the callback is not called. + let now = Instant::now(); + // Scrape a `received_bytes` to the counter every 300ms. + if now - last_update > Duration::from_millis(300) { + counter.add(stats.received_bytes(), now); + last_update = now; + } + fn format_bytes(bytes: f32) -> (&'static str, f32) { + static UNITS: [&str; 5] = ["", "Ki", "Mi", "Gi", "Ti"]; + let i = (bytes.log2() / 10.0).min(4.0) as usize; + (UNITS[i], bytes / 1024_f32.powi(i as i32)) + } + let (unit, rate) = format_bytes(counter.rate()); + format!(", {:.2}{}B/s", rate, unit) + }; progress - .tick(stats.indexed_objects(), stats.total_objects()) + .tick(stats.indexed_objects(), stats.total_objects(), &msg) .is_ok() }); diff --git a/src/cargo/util/counter.rs b/src/cargo/util/counter.rs new file mode 100644 index 00000000000..26132afbaad --- /dev/null +++ b/src/cargo/util/counter.rs @@ -0,0 +1,67 @@ +use std::time::Instant; + +/// A metrics counter storing only latest `N` records. +pub struct MetricsCounter { + /// Slots to store metrics. + slots: [(usize, Instant); N], + /// The slot of the oldest record. + /// Also the next slot to store the new record. + index: usize, +} + +impl MetricsCounter { + /// Creates a new counter with an initial value. + pub fn new(init: usize, init_at: Instant) -> Self { + debug_assert!(N > 0, "number of slots must be greater than zero"); + Self { + slots: [(init, init_at); N], + index: 0, + } + } + + /// Adds record to the counter. + pub fn add(&mut self, data: usize, added_at: Instant) { + self.slots[self.index] = (data, added_at); + self.index = (self.index + 1) % N; + } + + /// Calculates per-second average rate of all slots. + pub fn rate(&self) -> f32 { + let latest = self.slots[self.index.checked_sub(1).unwrap_or(N - 1)]; + let oldest = self.slots[self.index]; + let duration = (latest.1 - oldest.1).as_secs_f32(); + let avg = (latest.0 - oldest.0) as f32 / duration; + if f32::is_nan(avg) { + 0f32 + } else { + avg + } + } +} + +#[cfg(test)] +mod tests { + use super::MetricsCounter; + use std::time::{Duration, Instant}; + + #[test] + fn counter() { + let now = Instant::now(); + let mut counter = MetricsCounter::<3>::new(0, now); + assert_eq!(counter.rate(), 0f32); + counter.add(1, now + Duration::from_secs(1)); + assert_eq!(counter.rate(), 1f32); + counter.add(4, now + Duration::from_secs(2)); + assert_eq!(counter.rate(), 2f32); + counter.add(7, now + Duration::from_secs(3)); + assert_eq!(counter.rate(), 3f32); + counter.add(12, now + Duration::from_secs(4)); + assert_eq!(counter.rate(), 4f32); + } + + #[test] + #[should_panic(expected = "number of slots must be greater than zero")] + fn counter_zero_slot() { + let _counter = MetricsCounter::<0>::new(0, Instant::now()); + } +} diff --git a/src/cargo/util/mod.rs b/src/cargo/util/mod.rs index f7759174a79..e00727c4968 100644 --- a/src/cargo/util/mod.rs +++ b/src/cargo/util/mod.rs @@ -3,6 +3,7 @@ use std::time::Duration; pub use self::canonical_url::CanonicalUrl; pub use self::config::{homedir, Config, ConfigValue}; +pub(crate) use self::counter::MetricsCounter; pub use self::dependency_queue::DependencyQueue; pub use self::diagnostic_server::RustfixDiagnosticServer; pub use self::errors::{internal, CargoResult, CliResult, Test}; @@ -29,6 +30,7 @@ pub use self::workspace::{ mod canonical_url; pub mod command_prelude; pub mod config; +mod counter; pub mod cpu; mod dependency_queue; pub mod diagnostic_server; diff --git a/src/cargo/util/progress.rs b/src/cargo/util/progress.rs index 7996d8a959d..45bb4d89eaa 100644 --- a/src/cargo/util/progress.rs +++ b/src/cargo/util/progress.rs @@ -96,7 +96,7 @@ impl<'cfg> Progress<'cfg> { Self::with_style(name, ProgressStyle::Percentage, cfg) } - pub fn tick(&mut self, cur: usize, max: usize) -> CargoResult<()> { + pub fn tick(&mut self, cur: usize, max: usize, msg: &str) -> CargoResult<()> { let s = match &mut self.state { Some(s) => s, None => return Ok(()), @@ -118,7 +118,7 @@ impl<'cfg> Progress<'cfg> { return Ok(()); } - s.tick(cur, max, "") + s.tick(cur, max, msg) } pub fn tick_now(&mut self, cur: usize, max: usize, msg: &str) -> CargoResult<()> {