Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix timeouts firing while tarballs are extracted #6130

Merged
merged 3 commits into from
Nov 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ crates-io = { path = "src/crates-io", version = "0.20" }
crossbeam-utils = "0.5"
crypto-hash = "0.3.1"
curl = { version = "0.4.17", features = ['http2'] }
curl-sys = "0.4.12"
env_logger = "0.5.11"
failure = "0.1.2"
filetime = "0.2"
Expand Down
171 changes: 150 additions & 21 deletions src/cargo/core/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use std::path::{Path, PathBuf};
use std::time::{Instant, Duration};

use bytesize::ByteSize;
use curl;
use curl_sys;
use curl::easy::{Easy, HttpVersion};
use curl::multi::{Multi, EasyHandle};
use lazycell::LazyCell;
Expand Down Expand Up @@ -255,11 +257,10 @@ pub struct PackageSet<'cfg> {

pub struct Downloads<'a, 'cfg: 'a> {
set: &'a PackageSet<'cfg>,
pending: HashMap<usize, (Download, EasyHandle)>,
pending: HashMap<usize, (Download<'cfg>, EasyHandle)>,
pending_ids: HashSet<PackageId>,
results: Vec<(usize, CargoResult<()>)>,
results: Vec<(usize, Result<(), curl::Error>)>,
next: usize,
retry: Retry<'cfg>,
progress: RefCell<Option<Progress<'cfg>>>,
downloads_finished: usize,
downloaded_bytes: u64,
Expand All @@ -268,15 +269,51 @@ pub struct Downloads<'a, 'cfg: 'a> {
success: bool,
}

struct Download {
struct Download<'cfg> {
/// Token for this download, used as the key of the `Downloads::pending` map
/// and stored in `EasyHandle` as well.
token: usize,

/// Package that we're downloading
id: PackageId,

/// Actual downloaded data, updated throughout the lifetime of this download
data: RefCell<Vec<u8>>,

/// The URL that we're downloading from, cached here for error messages and
/// reenqueuing.
url: String,

/// A descriptive string to print when we've finished downloading this crate
descriptor: String,

/// Statistics updated from the progress callback in libcurl
total: Cell<u64>,
current: Cell<u64>,

/// The moment we started this transfer at
start: Instant,

/// Last time we noticed that we got some more data from libcurl
updated_at: Cell<Instant>,

/// Timeout management, both of timeout thresholds as well as whether or not
/// our connection has timed out (and accompanying message if it has).
///
/// Note that timeout management is done manually here because we have a
/// `Multi` with a lot of active transfers but between transfers finishing
/// we perform some possibly slow synchronous work (like grabbing file
/// locks, extracting tarballs, etc). The default timers on our `Multi` keep
/// running during this work, but we don't want them to count towards timing
/// everythig out. As a result, we manage this manually and take the time
/// for synchronous work into account manually.
timeout: ops::HttpTimeout,
timed_out: Cell<Option<String>>,
next_speed_check: Cell<Instant>,
next_speed_check_bytes_threshold: Cell<u64>,

/// Logic used to track retrying this download if it's a spurious failure.
retry: Retry<'cfg>,
}

impl<'cfg> PackageSet<'cfg> {
Expand Down Expand Up @@ -329,7 +366,6 @@ impl<'cfg> PackageSet<'cfg> {
pending: HashMap::new(),
pending_ids: HashSet::new(),
results: Vec::new(),
retry: Retry::new(self.config)?,
progress: RefCell::new(Some(Progress::with_style(
"Downloading",
ProgressStyle::Ratio,
Expand Down Expand Up @@ -410,7 +446,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
debug!("downloading {} as {}", id, token);
assert!(self.pending_ids.insert(id.clone()));

let mut handle = ops::http_handle(self.set.config)?;
let (mut handle, timeout) = ops::http_handle_and_timeout(self.set.config)?;
handle.get(true)?;
handle.url(&url)?;
handle.follow_location(true)?; // follow redirects
Expand Down Expand Up @@ -448,14 +484,10 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
handle.progress(true)?;
handle.progress_function(move |dl_total, dl_cur, _, _| {
tls::with(|downloads| {
let downloads = match downloads {
Some(d) => d,
None => return false,
};
let dl = &downloads.pending[&token].0;
dl.total.set(dl_total as u64);
dl.current.set(dl_cur as u64);
downloads.tick(WhyTick::DownloadUpdate).is_ok()
match downloads {
Some(d) => d.progress(token, dl_total as u64, dl_cur as u64),
None => false,
}
})
})?;

Expand All @@ -469,6 +501,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
self.set.config.shell().status("Downloading", "crates ...")?;
}

let now = Instant::now();
let dl = Download {
token,
data: RefCell::new(Vec::new()),
Expand All @@ -478,6 +511,12 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
total: Cell::new(0),
current: Cell::new(0),
start: Instant::now(),
updated_at: Cell::new(now),
timeout,
timed_out: Cell::new(None),
next_speed_check: Cell::new(now),
next_speed_check_bytes_threshold: Cell::new(0),
retry: Retry::new(self.set.config)?,
};
self.enqueue(dl, handle)?;
self.tick(WhyTick::DownloadStarted)?;
Expand Down Expand Up @@ -514,12 +553,35 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
// then we want to re-enqueue our request for another attempt and
// then we wait for another request to finish.
let ret = {
self.retry.try(|| {
result?;
let timed_out = &dl.timed_out;
let url = &dl.url;
dl.retry.try(|| {
if let Err(e) = result {
// If this error is "aborted by callback" then that's
// probably because our progress callback aborted due to
// a timeout. We'll find out by looking at the
// `timed_out` field, looking for a descriptive message.
// If one is found we switch the error code (to ensure
// it's flagged as spurious) and then attach our extra
// information to the error.
if !e.is_aborted_by_callback() {
return Err(e.into())
}

return Err(match timed_out.replace(None) {
Some(msg) => {
let code = curl_sys::CURLE_OPERATION_TIMEDOUT;
let mut err = curl::Error::new(code);
err.set_extra(msg);
err
}
None => e,
}.into())
}

let code = handle.response_code()?;
if code != 200 && code != 0 {
let url = handle.effective_url()?.unwrap_or(&dl.url);
let url = handle.effective_url()?.unwrap_or(url);
return Err(HttpNot200 {
code,
url: url.to_string(),
Expand Down Expand Up @@ -568,20 +630,39 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
let source = sources
.get_mut(dl.id.source_id())
.ok_or_else(|| internal(format!("couldn't find source for `{}`", dl.id)))?;
let start = Instant::now();
let pkg = source.finish_download(&dl.id, data)?;

// Assume that no time has passed while we were calling
// `finish_download`, update all speed checks and timeout limits of all
// active downloads to make sure they don't fire because of a slowly
// extracted tarball.
let finish_dur = start.elapsed();
for (dl, _) in self.pending.values_mut() {
dl.updated_at.set(dl.updated_at.get() + finish_dur);
dl.next_speed_check.set(dl.next_speed_check.get() + finish_dur);
}

let slot = &self.set.packages[&dl.id];
assert!(slot.fill(pkg).is_ok());
Ok(slot.borrow().unwrap())
}

fn enqueue(&mut self, dl: Download, handle: Easy) -> CargoResult<()> {
fn enqueue(&mut self, dl: Download<'cfg>, handle: Easy) -> CargoResult<()> {
let mut handle = self.set.multi.add(handle)?;
let now = Instant::now();
handle.set_token(dl.token)?;
dl.timed_out.set(None);
dl.updated_at.set(now);
dl.current.set(0);
dl.total.set(0);
dl.next_speed_check.set(now + dl.timeout.dur);
dl.next_speed_check_bytes_threshold.set(dl.timeout.low_speed_limit as u64);
self.pending.insert(dl.token, (dl, handle));
Ok(())
}

fn wait_for_curl(&mut self) -> CargoResult<(usize, CargoResult<()>)> {
fn wait_for_curl(&mut self) -> CargoResult<(usize, Result<(), curl::Error>)> {
// This is the main workhorse loop. We use libcurl's portable `wait`
// method to actually perform blocking. This isn't necessarily too
// efficient in terms of fd management, but we should only be juggling
Expand Down Expand Up @@ -609,7 +690,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
let token = msg.token().expect("failed to read token");
let handle = &pending[&token].1;
if let Some(result) = msg.result_for(&handle) {
results.push((token, result.map_err(|e| e.into())));
results.push((token, result));
} else {
debug!("message without a result (?)");
}
Expand All @@ -619,11 +700,59 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
break Ok(pair)
}
assert!(self.pending.len() > 0);
self.set.multi.wait(&mut [], Duration::new(60, 0))
let timeout = self.set.multi.get_timeout()?
.unwrap_or(Duration::new(5, 0));
self.set.multi.wait(&mut [], timeout)
.chain_err(|| "failed to wait on curl `Multi`")?;
}
}

fn progress(&self, token: usize, total: u64, cur: u64) -> bool {
let dl = &self.pending[&token].0;
dl.total.set(total);
let now = Instant::now();
if cur != dl.current.get() {
dl.current.set(cur);
dl.updated_at.set(now);

if dl.current.get() >= dl.next_speed_check_bytes_threshold.get() {
dl.next_speed_check.set(now + dl.timeout.dur);
dl.next_speed_check_bytes_threshold.set(
dl.current.get() + dl.timeout.low_speed_limit as u64,
);
}
}
if !self.tick(WhyTick::DownloadUpdate).is_ok() {
return false
}

// If we've spent too long not actually receiving any data we time out.
if now - dl.updated_at.get() > dl.timeout.dur {
let msg = format!("failed to download any data for `{}` within {}s",
dl.id,
dl.timeout.dur.as_secs());
dl.timed_out.set(Some(msg));
return false
}

// If we reached the point in time that we need to check our speed
// limit, see if we've transferred enough data during this threshold. If
// it fails this check then we fail because the download is going too
// slowly.
if now >= dl.next_speed_check.get() {
assert!(dl.current.get() < dl.next_speed_check_bytes_threshold.get());
let msg = format!("download of `{}` failed to transfer more \
than {} bytes in {}s",
dl.id,
dl.timeout.low_speed_limit,
dl.timeout.dur.as_secs());
dl.timed_out.set(Some(msg));
return false
}

true
}

fn tick(&self, why: WhyTick) -> CargoResult<()> {
let mut progress = self.progress.borrow_mut();
let progress = progress.as_mut().unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/cargo/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ extern crate core_foundation;
extern crate crates_io as registry;
extern crate crossbeam_utils;
extern crate curl;
extern crate curl_sys;
#[macro_use]
extern crate failure;
extern crate filetime;
Expand Down
3 changes: 2 additions & 1 deletion src/cargo/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ pub use self::cargo_package::{package, PackageOpts};
pub use self::registry::{publish, registry_configuration, RegistryConfig};
pub use self::registry::{http_handle, needs_custom_http_transport, registry_login, search};
pub use self::registry::{modify_owners, yank, OwnersOptions, PublishOpts};
pub use self::registry::configure_http_handle;
pub use self::registry::{configure_http_handle, http_handle_and_timeout};
pub use self::registry::HttpTimeout;
pub use self::cargo_fetch::{fetch, FetchOptions};
pub use self::cargo_pkgid::pkgid;
pub use self::resolve::{add_overrides, get_resolved_packages, resolve_with_previous, resolve_ws,
Expand Down
Loading