From 77d993cb86b928de664c24e66fa92dec0b8c6144 Mon Sep 17 00:00:00 2001 From: Weihang Lo Date: Fri, 23 Apr 2021 21:29:24 +0800 Subject: [PATCH 01/10] refactor: progress.tick support attaching message --- src/cargo/sources/git/utils.rs | 4 ++-- src/cargo/util/progress.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/cargo/sources/git/utils.rs b/src/cargo/sources/git/utils.rs index 5b0d60a9887..ab7849ddc39 100644 --- a/src/cargo/sources/git/utils.rs +++ b/src/cargo/sources/git/utils.rs @@ -677,7 +677,7 @@ fn reset(repo: &git2::Repository, obj: &git2::Object<'_>, config: &Config) -> Ca let mut pb = Progress::new("Checkout", config); let mut opts = git2::build::CheckoutBuilder::new(); opts.progress(|_, cur, max| { - drop(pb.tick(cur, max)); + drop(pb.tick(cur, max, "")); }); debug!("doing reset"); repo.reset(obj, git2::ResetType::Hard, Some(&mut opts))?; @@ -699,7 +699,7 @@ pub fn with_fetch_options( rcb.transfer_progress(|stats| { progress - .tick(stats.indexed_objects(), stats.total_objects()) + .tick(stats.indexed_objects(), stats.total_objects(), "") .is_ok() }); diff --git a/src/cargo/util/progress.rs b/src/cargo/util/progress.rs index 7996d8a959d..45bb4d89eaa 100644 --- a/src/cargo/util/progress.rs +++ b/src/cargo/util/progress.rs @@ -96,7 +96,7 @@ impl<'cfg> Progress<'cfg> { Self::with_style(name, ProgressStyle::Percentage, cfg) } - pub fn tick(&mut self, cur: usize, max: usize) -> CargoResult<()> { + pub fn tick(&mut self, cur: usize, max: usize, msg: &str) -> CargoResult<()> { let s = match &mut self.state { Some(s) => s, None => return Ok(()), @@ -118,7 +118,7 @@ impl<'cfg> Progress<'cfg> { return Ok(()); } - s.tick(cur, max, "") + s.tick(cur, max, msg) } pub fn tick_now(&mut self, cur: usize, max: usize, msg: &str) -> CargoResult<()> { From d58aa8ab4034fe96253666167bb02df9def4bbe2 Mon Sep 17 00:00:00 2001 From: Weihang Lo Date: Fri, 23 Apr 2021 21:32:27 +0800 Subject: [PATCH 02/10] feat: show transfer rate and total bytes received --- src/cargo/sources/git/utils.rs | 33 +++++++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/src/cargo/sources/git/utils.rs b/src/cargo/sources/git/utils.rs index ab7849ddc39..49b3b3946ed 100644 --- a/src/cargo/sources/git/utils.rs +++ b/src/cargo/sources/git/utils.rs @@ -15,6 +15,7 @@ use std::env; use std::fmt; use std::path::{Path, PathBuf}; use std::process::Command; +use std::time::{Duration, Instant}; use url::Url; fn serialize_str(t: &T, s: S) -> Result @@ -694,12 +695,40 @@ pub fn with_fetch_options( let mut progress = Progress::new("Fetch", config); network::with_retry(config, || { with_authentication(url, git_config, |f| { + let mut last_recv = 0.0; // in Byte + let mut last_rate = 0.0; // in Byte/s + let mut last_update = Instant::now(); let mut rcb = git2::RemoteCallbacks::new(); rcb.credentials(f); - rcb.transfer_progress(|stats| { + let indexed_deltas = stats.indexed_deltas(); + let msg = if indexed_deltas > 0 { + // Resolving deltas. + format!(" ({}/{})", indexed_deltas, stats.total_deltas()) + } else { + // Receiving objects. + let duration = last_update.elapsed(); + let (recv, rate) = if duration > Duration::from_secs(1) { + let recv = stats.received_bytes() as f32; + let rate = (recv - last_recv) / duration.as_secs_f32(); + last_recv = recv; + last_rate = rate; + last_update = Instant::now(); + (recv, rate) + } else { + (last_recv, last_rate) + }; + fn format_bytes(bytes: f32) -> (&'static str, f32) { + static UNITS: [&str; 5] = ["", "K", "M", "G", "T"]; + let i = (bytes.log2() / 10.0).min(4.0) as usize; + (UNITS[i], bytes / 1024_f32.powi(i as i32)) + } + let (rate_unit, rate) = format_bytes(rate); + let (unit, recv) = format_bytes(recv); + format!(" | {:.2}{}iB | {:.2}{}iB/s", recv, unit, rate, rate_unit) + }; progress - .tick(stats.indexed_objects(), stats.total_objects(), "") + .tick(stats.indexed_objects(), stats.total_objects(), &msg) .is_ok() }); From 7e3f7d64b016445f078f448eb62d75c49286fa81 Mon Sep 17 00:00:00 2001 From: Weihang Lo Date: Sat, 24 Apr 2021 00:42:45 +0800 Subject: [PATCH 03/10] fix: remove unnecessary progress tick rate limiting --- src/cargo/sources/git/utils.rs | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/cargo/sources/git/utils.rs b/src/cargo/sources/git/utils.rs index 49b3b3946ed..81a08d71412 100644 --- a/src/cargo/sources/git/utils.rs +++ b/src/cargo/sources/git/utils.rs @@ -696,7 +696,6 @@ pub fn with_fetch_options( network::with_retry(config, || { with_authentication(url, git_config, |f| { let mut last_recv = 0.0; // in Byte - let mut last_rate = 0.0; // in Byte/s let mut last_update = Instant::now(); let mut rcb = git2::RemoteCallbacks::new(); rcb.credentials(f); @@ -708,16 +707,12 @@ pub fn with_fetch_options( } else { // Receiving objects. let duration = last_update.elapsed(); - let (recv, rate) = if duration > Duration::from_secs(1) { - let recv = stats.received_bytes() as f32; - let rate = (recv - last_recv) / duration.as_secs_f32(); + let recv = stats.received_bytes() as f32; + let rate = (recv - last_recv) / duration.as_secs_f32(); + if duration > Duration::from_secs(3) { last_recv = recv; - last_rate = rate; last_update = Instant::now(); - (recv, rate) - } else { - (last_recv, last_rate) - }; + } fn format_bytes(bytes: f32) -> (&'static str, f32) { static UNITS: [&str; 5] = ["", "K", "M", "G", "T"]; let i = (bytes.log2() / 10.0).min(4.0) as usize; @@ -725,7 +720,7 @@ pub fn with_fetch_options( } let (rate_unit, rate) = format_bytes(rate); let (unit, recv) = format_bytes(recv); - format!(" | {:.2}{}iB | {:.2}{}iB/s", recv, unit, rate, rate_unit) + format!(", {:.2}{}iB | {:.2}{}iB/s", recv, unit, rate, rate_unit) }; progress .tick(stats.indexed_objects(), stats.total_objects(), &msg) From 507d7e42b064cd7a10e1f9b0957031e3a2da456f Mon Sep 17 00:00:00 2001 From: Weihang Lo Date: Sat, 24 Apr 2021 00:56:34 +0800 Subject: [PATCH 04/10] fix: remove total bytes received from progress message --- src/cargo/sources/git/utils.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/cargo/sources/git/utils.rs b/src/cargo/sources/git/utils.rs index 81a08d71412..5dcdc3df53a 100644 --- a/src/cargo/sources/git/utils.rs +++ b/src/cargo/sources/git/utils.rs @@ -695,7 +695,7 @@ pub fn with_fetch_options( let mut progress = Progress::new("Fetch", config); network::with_retry(config, || { with_authentication(url, git_config, |f| { - let mut last_recv = 0.0; // in Byte + let mut last_recv = 0; // in Byte let mut last_update = Instant::now(); let mut rcb = git2::RemoteCallbacks::new(); rcb.credentials(f); @@ -707,8 +707,8 @@ pub fn with_fetch_options( } else { // Receiving objects. let duration = last_update.elapsed(); - let recv = stats.received_bytes() as f32; - let rate = (recv - last_recv) / duration.as_secs_f32(); + let recv = stats.received_bytes(); + let rate = (recv - last_recv) as f32 / duration.as_secs_f32(); if duration > Duration::from_secs(3) { last_recv = recv; last_update = Instant::now(); @@ -718,9 +718,8 @@ pub fn with_fetch_options( let i = (bytes.log2() / 10.0).min(4.0) as usize; (UNITS[i], bytes / 1024_f32.powi(i as i32)) } - let (rate_unit, rate) = format_bytes(rate); - let (unit, recv) = format_bytes(recv); - format!(", {:.2}{}iB | {:.2}{}iB/s", recv, unit, rate, rate_unit) + let (unit, rate) = format_bytes(rate); + format!(", {:.2}{}iB/s", rate, unit) }; progress .tick(stats.indexed_objects(), stats.total_objects(), &msg) From f2172db6da85e0b42d727ae4089c93c123256936 Mon Sep 17 00:00:00 2001 From: Weihang Lo Date: Sat, 24 Apr 2021 03:18:30 +0800 Subject: [PATCH 05/10] fix: message of progress when resolving deltas --- src/cargo/sources/git/utils.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/cargo/sources/git/utils.rs b/src/cargo/sources/git/utils.rs index 5dcdc3df53a..e2462528d1a 100644 --- a/src/cargo/sources/git/utils.rs +++ b/src/cargo/sources/git/utils.rs @@ -703,7 +703,11 @@ pub fn with_fetch_options( let indexed_deltas = stats.indexed_deltas(); let msg = if indexed_deltas > 0 { // Resolving deltas. - format!(" ({}/{})", indexed_deltas, stats.total_deltas()) + format!( + ", ({}/{}) resolving deltas", + indexed_deltas, + stats.total_deltas() + ) } else { // Receiving objects. let duration = last_update.elapsed(); From a813ba0070171cfdf0495f07889a3a808892b3ff Mon Sep 17 00:00:00 2001 From: Weihang Lo Date: Tue, 27 Apr 2021 13:47:36 +0800 Subject: [PATCH 06/10] feat: add metrics counter utility --- src/cargo/util/counter.rs | 61 +++++++++++++++++++++++++++++++++++++++ src/cargo/util/mod.rs | 2 ++ 2 files changed, 63 insertions(+) create mode 100644 src/cargo/util/counter.rs diff --git a/src/cargo/util/counter.rs b/src/cargo/util/counter.rs new file mode 100644 index 00000000000..04dd1647006 --- /dev/null +++ b/src/cargo/util/counter.rs @@ -0,0 +1,61 @@ +use std::time::Instant; + +/// A metrics counter storing only latest `N` records. +pub struct MetricsCounter { + /// Slots to store metrics. + slots: [(usize, Instant); N], + /// The slot of the oldest record. + /// Also the next slot to store the new record. + index: usize, +} + +impl MetricsCounter { + /// Creates a new counter with an initial value. + pub fn new(init: usize) -> Self { + debug_assert!(N > 0, "number of slots must be greater than zero"); + Self { + slots: [(init, Instant::now()); N], + index: 0, + } + } + + /// Adds record to the counter. + pub fn add(&mut self, data: usize) { + self.slots[self.index] = (data, Instant::now()); + self.index = (self.index + 1) % N; + } + + /// Calculates per-second average rate of all slots. + pub fn rate(&self) -> f32 { + let latest = self.slots[self.index.checked_sub(1).unwrap_or(N - 1)]; + let oldest = self.slots[self.index]; + let duration = (latest.1 - oldest.1).as_secs_f32(); + let avg = (latest.0 - oldest.0) as f32 / duration; + if f32::is_nan(avg) { + 0f32 + } else { + avg + } + } +} + +#[cfg(test)] +mod tests { + use super::MetricsCounter; + + #[test] + fn counter() { + let mut counter = MetricsCounter::<3>::new(0); + assert_eq!(counter.rate(), 0f32); + for i in 1..=5 { + counter.add(i); + assert!(counter.rate() > 0f32); + } + } + + #[test] + #[should_panic(expected = "number of slots must be greater than zero")] + fn counter_zero_slot() { + let _counter = MetricsCounter::<0>::new(0); + } +} diff --git a/src/cargo/util/mod.rs b/src/cargo/util/mod.rs index f7759174a79..e00727c4968 100644 --- a/src/cargo/util/mod.rs +++ b/src/cargo/util/mod.rs @@ -3,6 +3,7 @@ use std::time::Duration; pub use self::canonical_url::CanonicalUrl; pub use self::config::{homedir, Config, ConfigValue}; +pub(crate) use self::counter::MetricsCounter; pub use self::dependency_queue::DependencyQueue; pub use self::diagnostic_server::RustfixDiagnosticServer; pub use self::errors::{internal, CargoResult, CliResult, Test}; @@ -29,6 +30,7 @@ pub use self::workspace::{ mod canonical_url; pub mod command_prelude; pub mod config; +mod counter; pub mod cpu; mod dependency_queue; pub mod diagnostic_server; From de84f352e42f58b4dff43ead13e8412c8923e685 Mon Sep 17 00:00:00 2001 From: Weihang Lo Date: Tue, 27 Apr 2021 13:48:36 +0800 Subject: [PATCH 07/10] refactor: use metrics counter as an abstaction --- src/cargo/sources/git/utils.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/cargo/sources/git/utils.rs b/src/cargo/sources/git/utils.rs index e2462528d1a..de2c7f70dbb 100644 --- a/src/cargo/sources/git/utils.rs +++ b/src/cargo/sources/git/utils.rs @@ -3,7 +3,7 @@ use crate::core::GitReference; use crate::util::errors::CargoResult; -use crate::util::{network, Config, IntoUrl, Progress}; +use crate::util::{network, Config, IntoUrl, MetricsCounter, Progress}; use anyhow::{anyhow, Context as _}; use cargo_util::{paths, ProcessBuilder}; use curl::easy::List; @@ -695,9 +695,9 @@ pub fn with_fetch_options( let mut progress = Progress::new("Fetch", config); network::with_retry(config, || { with_authentication(url, git_config, |f| { - let mut last_recv = 0; // in Byte let mut last_update = Instant::now(); let mut rcb = git2::RemoteCallbacks::new(); + let mut counter = MetricsCounter::<10>::new(0); rcb.credentials(f); rcb.transfer_progress(|stats| { let indexed_deltas = stats.indexed_deltas(); @@ -711,10 +711,8 @@ pub fn with_fetch_options( } else { // Receiving objects. let duration = last_update.elapsed(); - let recv = stats.received_bytes(); - let rate = (recv - last_recv) as f32 / duration.as_secs_f32(); - if duration > Duration::from_secs(3) { - last_recv = recv; + if duration > Duration::from_millis(300) { + counter.add(stats.received_bytes()); last_update = Instant::now(); } fn format_bytes(bytes: f32) -> (&'static str, f32) { @@ -722,7 +720,7 @@ pub fn with_fetch_options( let i = (bytes.log2() / 10.0).min(4.0) as usize; (UNITS[i], bytes / 1024_f32.powi(i as i32)) } - let (unit, rate) = format_bytes(rate); + let (unit, rate) = format_bytes(counter.rate()); format!(", {:.2}{}iB/s", rate, unit) }; progress From a89b1e8a9f4ebc3490fd518bd8488929275f083c Mon Sep 17 00:00:00 2001 From: Weihang Lo Date: Tue, 27 Apr 2021 22:46:46 +0800 Subject: [PATCH 08/10] refactor: pass Instant to MetricsCounter --- src/cargo/sources/git/utils.rs | 13 ++++++++----- src/cargo/util/counter.rs | 26 ++++++++++++++++---------- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/src/cargo/sources/git/utils.rs b/src/cargo/sources/git/utils.rs index de2c7f70dbb..54ff436a19f 100644 --- a/src/cargo/sources/git/utils.rs +++ b/src/cargo/sources/git/utils.rs @@ -697,7 +697,9 @@ pub fn with_fetch_options( with_authentication(url, git_config, |f| { let mut last_update = Instant::now(); let mut rcb = git2::RemoteCallbacks::new(); - let mut counter = MetricsCounter::<10>::new(0); + // We choose `N=10` here to make a `300ms * 10slots ~= 3000ms` + // sliding window for tracking the data transfer rate (in bytes/s). + let mut counter = MetricsCounter::<10>::new(0, last_update); rcb.credentials(f); rcb.transfer_progress(|stats| { let indexed_deltas = stats.indexed_deltas(); @@ -710,10 +712,11 @@ pub fn with_fetch_options( ) } else { // Receiving objects. - let duration = last_update.elapsed(); - if duration > Duration::from_millis(300) { - counter.add(stats.received_bytes()); - last_update = Instant::now(); + let now = Instant::now(); + // Scrape a `received_bytes` to the counter every 300ms. + if now - last_update > Duration::from_millis(300) { + counter.add(stats.received_bytes(), now); + last_update = now; } fn format_bytes(bytes: f32) -> (&'static str, f32) { static UNITS: [&str; 5] = ["", "K", "M", "G", "T"]; diff --git a/src/cargo/util/counter.rs b/src/cargo/util/counter.rs index 04dd1647006..26132afbaad 100644 --- a/src/cargo/util/counter.rs +++ b/src/cargo/util/counter.rs @@ -11,17 +11,17 @@ pub struct MetricsCounter { impl MetricsCounter { /// Creates a new counter with an initial value. - pub fn new(init: usize) -> Self { + pub fn new(init: usize, init_at: Instant) -> Self { debug_assert!(N > 0, "number of slots must be greater than zero"); Self { - slots: [(init, Instant::now()); N], + slots: [(init, init_at); N], index: 0, } } /// Adds record to the counter. - pub fn add(&mut self, data: usize) { - self.slots[self.index] = (data, Instant::now()); + pub fn add(&mut self, data: usize, added_at: Instant) { + self.slots[self.index] = (data, added_at); self.index = (self.index + 1) % N; } @@ -42,20 +42,26 @@ impl MetricsCounter { #[cfg(test)] mod tests { use super::MetricsCounter; + use std::time::{Duration, Instant}; #[test] fn counter() { - let mut counter = MetricsCounter::<3>::new(0); + let now = Instant::now(); + let mut counter = MetricsCounter::<3>::new(0, now); assert_eq!(counter.rate(), 0f32); - for i in 1..=5 { - counter.add(i); - assert!(counter.rate() > 0f32); - } + counter.add(1, now + Duration::from_secs(1)); + assert_eq!(counter.rate(), 1f32); + counter.add(4, now + Duration::from_secs(2)); + assert_eq!(counter.rate(), 2f32); + counter.add(7, now + Duration::from_secs(3)); + assert_eq!(counter.rate(), 3f32); + counter.add(12, now + Duration::from_secs(4)); + assert_eq!(counter.rate(), 4f32); } #[test] #[should_panic(expected = "number of slots must be greater than zero")] fn counter_zero_slot() { - let _counter = MetricsCounter::<0>::new(0); + let _counter = MetricsCounter::<0>::new(0, Instant::now()); } } From 9df531b223e169c6f4b46a7188ca5cae341a5e03 Mon Sep 17 00:00:00 2001 From: Weihang Lo Date: Thu, 29 Apr 2021 00:58:58 +0800 Subject: [PATCH 09/10] fix: bytes per second should not prefix with `i` --- src/cargo/sources/git/utils.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cargo/sources/git/utils.rs b/src/cargo/sources/git/utils.rs index 54ff436a19f..c72fb8e1dac 100644 --- a/src/cargo/sources/git/utils.rs +++ b/src/cargo/sources/git/utils.rs @@ -719,12 +719,12 @@ pub fn with_fetch_options( last_update = now; } fn format_bytes(bytes: f32) -> (&'static str, f32) { - static UNITS: [&str; 5] = ["", "K", "M", "G", "T"]; + static UNITS: [&str; 5] = ["", "Ki", "Mi", "Gi", "Ti"]; let i = (bytes.log2() / 10.0).min(4.0) as usize; (UNITS[i], bytes / 1024_f32.powi(i as i32)) } let (unit, rate) = format_bytes(counter.rate()); - format!(", {:.2}{}iB/s", rate, unit) + format!(", {:.2}{}B/s", rate, unit) }; progress .tick(stats.indexed_objects(), stats.total_objects(), &msg) From e4d4347223cfef58b7a2820c97dfcaebfe800535 Mon Sep 17 00:00:00 2001 From: Weihang Lo Date: Thu, 29 Apr 2021 01:00:14 +0800 Subject: [PATCH 10/10] comment about caveat of current transfer rate refresh --- src/cargo/sources/git/utils.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/cargo/sources/git/utils.rs b/src/cargo/sources/git/utils.rs index c72fb8e1dac..434fc35300e 100644 --- a/src/cargo/sources/git/utils.rs +++ b/src/cargo/sources/git/utils.rs @@ -712,6 +712,16 @@ pub fn with_fetch_options( ) } else { // Receiving objects. + // + // # Caveat + // + // Progress bar relies on git2 calling `transfer_progress` + // to update its transfer rate, but we cannot guarantee a + // periodic call of that callback. Thus if we don't receive + // any data for, say, 10 seconds, the rate will get stuck + // and never go down to 0B/s. + // In the future, we need to find away to update the rate + // even when the callback is not called. let now = Instant::now(); // Scrape a `received_bytes` to the counter every 300ms. if now - last_update > Duration::from_millis(300) {