Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show transfer rate when fetching/updating registry index #9395

Merged
merged 10 commits into from
Apr 30, 2021
46 changes: 42 additions & 4 deletions src/cargo/sources/git/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::core::GitReference;
use crate::util::errors::CargoResult;
use crate::util::{network, Config, IntoUrl, Progress};
use crate::util::{network, Config, IntoUrl, MetricsCounter, Progress};
use anyhow::{anyhow, Context as _};
use cargo_util::{paths, ProcessBuilder};
use curl::easy::List;
Expand All @@ -15,6 +15,7 @@ use std::env;
use std::fmt;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::{Duration, Instant};
use url::Url;

fn serialize_str<T, S>(t: &T, s: S) -> Result<S::Ok, S::Error>
Expand Down Expand Up @@ -677,7 +678,7 @@ fn reset(repo: &git2::Repository, obj: &git2::Object<'_>, config: &Config) -> Ca
let mut pb = Progress::new("Checkout", config);
let mut opts = git2::build::CheckoutBuilder::new();
opts.progress(|_, cur, max| {
drop(pb.tick(cur, max));
drop(pb.tick(cur, max, ""));
});
debug!("doing reset");
repo.reset(obj, git2::ResetType::Hard, Some(&mut opts))?;
Expand All @@ -694,12 +695,49 @@ pub fn with_fetch_options(
let mut progress = Progress::new("Fetch", config);
network::with_retry(config, || {
with_authentication(url, git_config, |f| {
let mut last_update = Instant::now();
let mut rcb = git2::RemoteCallbacks::new();
// We choose `N=10` here to make a `300ms * 10slots ~= 3000ms`
// sliding window for tracking the data transfer rate (in bytes/s).
let mut counter = MetricsCounter::<10>::new(0, last_update);
rcb.credentials(f);

rcb.transfer_progress(|stats| {
let indexed_deltas = stats.indexed_deltas();
let msg = if indexed_deltas > 0 {
// Resolving deltas.
format!(
", ({}/{}) resolving deltas",
indexed_deltas,
stats.total_deltas()
)
} else {
// Receiving objects.
//
// # Caveat
//
// Progress bar relies on git2 calling `transfer_progress`
// to update its transfer rate, but we cannot guarantee a
// periodic call of that callback. Thus if we don't receive
// any data for, say, 10 seconds, the rate will get stuck
// and never go down to 0B/s.
// In the future, we need to find away to update the rate
// even when the callback is not called.
let now = Instant::now();
// Scrape a `received_bytes` to the counter every 300ms.
if now - last_update > Duration::from_millis(300) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As one thought I had while reading this, do you know if libgit2 is guaranteed to call this callback perioidically? If we receive data and then don't receive anything else for 10s, does that mean the transfer rate is "constant" for that 10s even though we don't actually receive anything?

(basically I think for the transfer rate to be accurate we would have to guarantee that this transfer_progress callback is called periodically even if the network is idle.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that libgit2 does not periodically call the callback. So yes it maybe be stuck. Git CLI has a similar issue too. IIRC all Git operations share the same timeout config from the same curl handle, so its nearly impossible to reuse it. Therefore we have some options:

  1. Should we implement another timeout logic to update the transfer rate?
  2. Or do we accept a "constant" transfer rate while encountering network issue?
  3. Is there any other possible solution to handle it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think our options are pretty limited here. One option is to remove the download rate (boo) and the other one would be to add some sort of threading that calls the update on a tick. I don't think those are really worth it for now though. Perhaps you can leave a comment here to this effect and we can tackle this in the future if it's actually an issue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment added. Also fixed an unit display issue 😂
Thanks for your suggestion!

counter.add(stats.received_bytes(), now);
last_update = now;
}
fn format_bytes(bytes: f32) -> (&'static str, f32) {
static UNITS: [&str; 5] = ["", "Ki", "Mi", "Gi", "Ti"];
let i = (bytes.log2() / 10.0).min(4.0) as usize;
(UNITS[i], bytes / 1024_f32.powi(i as i32))
}
let (unit, rate) = format_bytes(counter.rate());
format!(", {:.2}{}B/s", rate, unit)
};
progress
.tick(stats.indexed_objects(), stats.total_objects())
.tick(stats.indexed_objects(), stats.total_objects(), &msg)
.is_ok()
});

Expand Down
67 changes: 67 additions & 0 deletions src/cargo/util/counter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use std::time::Instant;

/// A metrics counter storing only latest `N` records.
pub struct MetricsCounter<const N: usize> {
/// Slots to store metrics.
slots: [(usize, Instant); N],
/// The slot of the oldest record.
/// Also the next slot to store the new record.
index: usize,
}

impl<const N: usize> MetricsCounter<N> {
/// Creates a new counter with an initial value.
pub fn new(init: usize, init_at: Instant) -> Self {
debug_assert!(N > 0, "number of slots must be greater than zero");
Self {
slots: [(init, init_at); N],
index: 0,
}
}

/// Adds record to the counter.
pub fn add(&mut self, data: usize, added_at: Instant) {
self.slots[self.index] = (data, added_at);
self.index = (self.index + 1) % N;
}

/// Calculates per-second average rate of all slots.
pub fn rate(&self) -> f32 {
let latest = self.slots[self.index.checked_sub(1).unwrap_or(N - 1)];
let oldest = self.slots[self.index];
let duration = (latest.1 - oldest.1).as_secs_f32();
let avg = (latest.0 - oldest.0) as f32 / duration;
if f32::is_nan(avg) {
0f32
} else {
avg
}
}
}

#[cfg(test)]
mod tests {
use super::MetricsCounter;
use std::time::{Duration, Instant};

#[test]
fn counter() {
let now = Instant::now();
let mut counter = MetricsCounter::<3>::new(0, now);
assert_eq!(counter.rate(), 0f32);
counter.add(1, now + Duration::from_secs(1));
assert_eq!(counter.rate(), 1f32);
counter.add(4, now + Duration::from_secs(2));
assert_eq!(counter.rate(), 2f32);
counter.add(7, now + Duration::from_secs(3));
assert_eq!(counter.rate(), 3f32);
counter.add(12, now + Duration::from_secs(4));
assert_eq!(counter.rate(), 4f32);
}

#[test]
#[should_panic(expected = "number of slots must be greater than zero")]
fn counter_zero_slot() {
let _counter = MetricsCounter::<0>::new(0, Instant::now());
}
}
2 changes: 2 additions & 0 deletions src/cargo/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::time::Duration;

pub use self::canonical_url::CanonicalUrl;
pub use self::config::{homedir, Config, ConfigValue};
pub(crate) use self::counter::MetricsCounter;
pub use self::dependency_queue::DependencyQueue;
pub use self::diagnostic_server::RustfixDiagnosticServer;
pub use self::errors::{internal, CargoResult, CliResult, Test};
Expand All @@ -29,6 +30,7 @@ pub use self::workspace::{
mod canonical_url;
pub mod command_prelude;
pub mod config;
mod counter;
pub mod cpu;
mod dependency_queue;
pub mod diagnostic_server;
Expand Down
4 changes: 2 additions & 2 deletions src/cargo/util/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl<'cfg> Progress<'cfg> {
Self::with_style(name, ProgressStyle::Percentage, cfg)
}

pub fn tick(&mut self, cur: usize, max: usize) -> CargoResult<()> {
pub fn tick(&mut self, cur: usize, max: usize, msg: &str) -> CargoResult<()> {
let s = match &mut self.state {
Some(s) => s,
None => return Ok(()),
Expand All @@ -118,7 +118,7 @@ impl<'cfg> Progress<'cfg> {
return Ok(());
}

s.tick(cur, max, "")
s.tick(cur, max, msg)
}

pub fn tick_now(&mut self, cur: usize, max: usize, msg: &str) -> CargoResult<()> {
Expand Down