Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid blocking on CloseHandle #1850

Merged
merged 2 commits into from
May 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ scopeguard = "1"
semver = "0.9"
sha2 = "0.8"
strsim = "0.9.1"
tar = "0.4"
tar = "0.4.26"
tempdir = "0.3.4"
# FIXME(issue #1788)
term = "=0.5.1"
threadpool = "1"
time = "0.1.34"
toml = "0.5"
url = "1"
Expand Down
48 changes: 35 additions & 13 deletions src/cli/download_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub struct DownloadTracker {
/// If we have displayed progress, this is the number of characters we
/// rendered, so we can erase it cleanly.
displayed_charcount: Option<usize>,
/// What units to show progress in
units: Vec<String>,
}

impl DownloadTracker {
Expand All @@ -56,6 +58,7 @@ impl DownloadTracker {
last_sec: None,
term: term2::stdout(),
displayed_charcount: None,
units: vec!["B".into(); 1],
}
}

Expand All @@ -76,6 +79,15 @@ impl DownloadTracker {
self.download_finished();
true
}
Notification::Install(In::Utils(Un::DownloadPushUnits(units))) => {
self.push_units(units.into());
true
}
Notification::Install(In::Utils(Un::DownloadPopUnits)) => {
self.pop_units();
true
}

_ => false,
}
}
Expand Down Expand Up @@ -139,11 +151,13 @@ impl DownloadTracker {
}
/// Display the tracked download information to the terminal.
fn display(&mut self) {
let total_h = Size(self.total_downloaded);
// Panic if someone pops the default bytes unit...
let units = &self.units.last().unwrap();
let total_h = Size(self.total_downloaded, units);
let sum = self.downloaded_last_few_secs.iter().fold(0, |a, &v| a + v);
let len = self.downloaded_last_few_secs.len();
let speed = if len > 0 { sum / len } else { 0 };
let speed_h = Size(speed);
let speed_h = Size(speed, units);
let elapsed_h = Duration(precise_time_s() - self.start_sec);

// First, move to the start of the current line and clear it.
Expand All @@ -163,7 +177,7 @@ impl DownloadTracker {

let output = match self.content_len {
Some(content_len) => {
let content_len_h = Size(content_len);
let content_len_h = Size(content_len, units);
let content_len = content_len as f64;
let percent = (self.total_downloaded as f64 / content_len) * 100.;
let remaining = content_len - self.total_downloaded as f64;
Expand All @@ -184,6 +198,14 @@ impl DownloadTracker {
let _ = self.term.flush();
self.displayed_charcount = Some(output.chars().count());
}

pub fn push_units(&mut self, new_units: String) {
self.units.push(new_units);
}

pub fn pop_units(&mut self) {
self.units.pop();
}
}

/// Human readable representation of duration(seconds).
Expand All @@ -207,21 +229,21 @@ impl fmt::Display for Duration {
}
}

/// Human readable size (bytes)
struct Size(usize);
/// Human readable size (some units)
struct Size<'a>(usize, &'a str);

impl fmt::Display for Size {
impl<'a> fmt::Display for Size<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
const KIB: f64 = 1024.0;
const MIB: f64 = KIB * KIB;
const KI: f64 = 1024.0;
const MI: f64 = KI * KI;
let size = self.0 as f64;

if size >= MIB {
write!(f, "{:5.1} MiB", size / MIB) // XYZ.P MiB
} else if size >= KIB {
write!(f, "{:5.1} KiB", size / KIB)
if size >= MI {
write!(f, "{:5.1} Mi{}", size / MI, self.1) // XYZ.P Mi
} else if size >= KI {
write!(f, "{:5.1} Ki{}", size / KI, self.1)
} else {
write!(f, "{:3.0} B", size)
write!(f, "{:3.0} {}", size, self.1)
}
}
}
Expand Down
147 changes: 140 additions & 7 deletions src/dist/component/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::dist::component::transaction::*;

use crate::dist::temp;
use crate::errors::*;
use crate::utils::notifications::Notification;
use crate::utils::utils;

use std::collections::HashSet;
Expand Down Expand Up @@ -194,13 +195,17 @@ fn set_file_perms(_dest_path: &Path, _src_path: &Path) -> Result<()> {
pub struct TarPackage<'a>(DirectoryPackage, temp::Dir<'a>);

impl<'a> TarPackage<'a> {
pub fn new<R: Read>(stream: R, temp_cfg: &'a temp::Cfg) -> Result<Self> {
pub fn new<R: Read>(
stream: R,
temp_cfg: &'a temp::Cfg,
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
) -> Result<Self> {
let temp_dir = temp_cfg.new_directory()?;
let mut archive = tar::Archive::new(stream);
// The rust-installer packages unpack to a directory called
// $pkgname-$version-$target. Skip that directory when
// unpacking.
unpack_without_first_dir(&mut archive, &*temp_dir)?;
unpack_without_first_dir(&mut archive, &*temp_dir, notify_handler)?;

Ok(TarPackage(
DirectoryPackage::new(temp_dir.to_owned(), false)?,
Expand All @@ -209,11 +214,122 @@ impl<'a> TarPackage<'a> {
}
}

fn unpack_without_first_dir<R: Read>(archive: &mut tar::Archive<R>, path: &Path) -> Result<()> {
#[cfg(windows)]
mod unpacker {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use threadpool;

use crate::utils::notifications::Notification;

pub struct Unpacker<'a> {
n_files: Arc<AtomicUsize>,
pool: threadpool::ThreadPool,
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
}

impl<'a> Unpacker<'a> {
pub fn new(notify_handler: Option<&'a dyn Fn(Notification<'_>)>) -> Unpacker {
// Defaults to hardware thread count threads; this is suitable for
// our needs as IO bound operations tend to show up as write latencies
// rather than close latencies, so we don't need to look at
// more threads to get more IO dispatched at this stage in the process.
let pool = threadpool::Builder::new()
.thread_name("CloseHandle".into())
.build();
Unpacker {
n_files: Arc::new(AtomicUsize::new(0)),
pool: pool,
notify_handler: notify_handler,
}
}

pub fn handle(&mut self, unpacked: tar::Unpacked) {
if let tar::Unpacked::File(f) = unpacked {
self.n_files.fetch_add(1, Ordering::Relaxed);
let n_files = self.n_files.clone();
self.pool.execute(move || {
drop(f);
n_files.fetch_sub(1, Ordering::Relaxed);
});
}
}
}

impl<'a> Drop for Unpacker<'a> {
fn drop(&mut self) {
// Some explanation is in order. Even though the tar we are reading from (if
// any) will have had its FileWithProgress download tracking
// completed before we hit drop, that is not true if we are unwinding due to a
// failure, where the logical ownership of the progress bar is
// ambiguous, and as the tracker itself is abstracted out behind
// notifications etc we cannot just query for that. So: we assume no
// more reads of the underlying tar will take place: either the
// error unwinding will stop reads, or we completed; either way, we
// notify finished to the tracker to force a reset to zero; we set
// the units to files, show our progress, and set our units back
// afterwards. The largest archives today - rust docs - have ~20k
// items, and the download tracker's progress is confounded with
// actual handling of data today, we synthesis a data buffer and
// pretend to have bytes to deliver.
self.notify_handler
.map(|handler| handler(Notification::DownloadFinished));
self.notify_handler
.map(|handler| handler(Notification::DownloadPushUnits("handles")));
let mut prev_files = self.n_files.load(Ordering::Relaxed);
self.notify_handler.map(|handler| {
handler(Notification::DownloadContentLengthReceived(
prev_files as u64,
))
});
if prev_files > 50 {
println!("Closing {} deferred file handles", prev_files);
}
let buf: Vec<u8> = vec![0; prev_files];
assert!(32767 > prev_files);
let mut current_files = prev_files;
while current_files != 0 {
use std::thread::sleep;
sleep(std::time::Duration::from_millis(100));
prev_files = current_files;
current_files = self.n_files.load(Ordering::Relaxed);
let step_count = prev_files - current_files;
self.notify_handler.map(|handler| {
handler(Notification::DownloadDataReceived(&buf[0..step_count]))
});
}
self.pool.join();
self.notify_handler
.map(|handler| handler(Notification::DownloadFinished));
self.notify_handler
.map(|handler| handler(Notification::DownloadPopUnits));
}
}
}

#[cfg(not(windows))]
mod unpacker {
use crate::utils::notifications::Notification;
pub struct Unpacker {}
impl Unpacker {
pub fn new<'a>(_notify_handler: Option<&'a dyn Fn(Notification<'_>)>) -> Unpacker {
Unpacker {}
}
pub fn handle(&mut self, _unpacked: tar::Unpacked) {}
}
}

fn unpack_without_first_dir<'a, R: Read>(
archive: &mut tar::Archive<R>,
path: &Path,
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
) -> Result<()> {
let mut unpacker = unpacker::Unpacker::new(notify_handler);
let entries = archive
.entries()
.chain_err(|| ErrorKind::ExtractingPackage)?;
let mut checked_parents: HashSet<PathBuf> = HashSet::new();

for entry in entries {
let mut entry = entry.chain_err(|| ErrorKind::ExtractingPackage)?;
let relpath = {
Expand Down Expand Up @@ -249,6 +365,7 @@ fn unpack_without_first_dir<R: Read>(archive: &mut tar::Archive<R>, path: &Path)
entry.set_preserve_mtime(false);
entry
.unpack(&full_path)
.map(|unpacked| unpacker.handle(unpacked))
.chain_err(|| ErrorKind::ExtractingPackage)?;
}

Expand Down Expand Up @@ -277,9 +394,17 @@ impl<'a> Package for TarPackage<'a> {
pub struct TarGzPackage<'a>(TarPackage<'a>);

impl<'a> TarGzPackage<'a> {
pub fn new<R: Read>(stream: R, temp_cfg: &'a temp::Cfg) -> Result<Self> {
pub fn new<R: Read>(
stream: R,
temp_cfg: &'a temp::Cfg,
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
) -> Result<Self> {
let stream = flate2::read::GzDecoder::new(stream);
Ok(TarGzPackage(TarPackage::new(stream, temp_cfg)?))
Ok(TarGzPackage(TarPackage::new(
stream,
temp_cfg,
notify_handler,
)?))
}
}

Expand All @@ -305,9 +430,17 @@ impl<'a> Package for TarGzPackage<'a> {
pub struct TarXzPackage<'a>(TarPackage<'a>);

impl<'a> TarXzPackage<'a> {
pub fn new<R: Read>(stream: R, temp_cfg: &'a temp::Cfg) -> Result<Self> {
pub fn new<R: Read>(
stream: R,
temp_cfg: &'a temp::Cfg,
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
) -> Result<Self> {
let stream = xz2::read::XzDecoder::new(stream);
Ok(TarXzPackage(TarPackage::new(stream, temp_cfg)?))
Ok(TarXzPackage(TarPackage::new(
stream,
temp_cfg,
notify_handler,
)?))
}
}

Expand Down
Loading