Skip to content

Commit

Permalink
Merge pull request #1850 from rbtcollins/new-tar-rs
Browse files Browse the repository at this point in the history
Avoid blocking on CloseHandle
  • Loading branch information
kinnison authored May 18, 2019
2 parents d78ce04 + ff0dc0d commit 5580890
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 33 deletions.
23 changes: 17 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ scopeguard = "1"
semver = "0.9"
sha2 = "0.8"
strsim = "0.9.1"
tar = "0.4"
tar = "0.4.26"
tempdir = "0.3.4"
# FIXME(issue #1788)
term = "=0.5.1"
threadpool = "1"
time = "0.1.34"
toml = "0.5"
url = "1"
Expand Down
48 changes: 35 additions & 13 deletions src/cli/download_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub struct DownloadTracker {
/// If we have displayed progress, this is the number of characters we
/// rendered, so we can erase it cleanly.
displayed_charcount: Option<usize>,
/// What units to show progress in
units: Vec<String>,
}

impl DownloadTracker {
Expand All @@ -56,6 +58,7 @@ impl DownloadTracker {
last_sec: None,
term: term2::stdout(),
displayed_charcount: None,
units: vec!["B".into(); 1],
}
}

Expand All @@ -76,6 +79,15 @@ impl DownloadTracker {
self.download_finished();
true
}
Notification::Install(In::Utils(Un::DownloadPushUnits(units))) => {
self.push_units(units.into());
true
}
Notification::Install(In::Utils(Un::DownloadPopUnits)) => {
self.pop_units();
true
}

_ => false,
}
}
Expand Down Expand Up @@ -139,11 +151,13 @@ impl DownloadTracker {
}
/// Display the tracked download information to the terminal.
fn display(&mut self) {
let total_h = Size(self.total_downloaded);
// Panic if someone pops the default bytes unit...
let units = &self.units.last().unwrap();
let total_h = Size(self.total_downloaded, units);
let sum = self.downloaded_last_few_secs.iter().fold(0, |a, &v| a + v);
let len = self.downloaded_last_few_secs.len();
let speed = if len > 0 { sum / len } else { 0 };
let speed_h = Size(speed);
let speed_h = Size(speed, units);
let elapsed_h = Duration(precise_time_s() - self.start_sec);

// First, move to the start of the current line and clear it.
Expand All @@ -163,7 +177,7 @@ impl DownloadTracker {

let output = match self.content_len {
Some(content_len) => {
let content_len_h = Size(content_len);
let content_len_h = Size(content_len, units);
let content_len = content_len as f64;
let percent = (self.total_downloaded as f64 / content_len) * 100.;
let remaining = content_len - self.total_downloaded as f64;
Expand All @@ -184,6 +198,14 @@ impl DownloadTracker {
let _ = self.term.flush();
self.displayed_charcount = Some(output.chars().count());
}

pub fn push_units(&mut self, new_units: String) {
self.units.push(new_units);
}

pub fn pop_units(&mut self) {
self.units.pop();
}
}

/// Human readable representation of duration(seconds).
Expand All @@ -207,21 +229,21 @@ impl fmt::Display for Duration {
}
}

/// Human readable size (bytes)
struct Size(usize);
/// Human readable size (some units)
struct Size<'a>(usize, &'a str);

impl fmt::Display for Size {
impl<'a> fmt::Display for Size<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
const KIB: f64 = 1024.0;
const MIB: f64 = KIB * KIB;
const KI: f64 = 1024.0;
const MI: f64 = KI * KI;
let size = self.0 as f64;

if size >= MIB {
write!(f, "{:5.1} MiB", size / MIB) // XYZ.P MiB
} else if size >= KIB {
write!(f, "{:5.1} KiB", size / KIB)
if size >= MI {
write!(f, "{:5.1} Mi{}", size / MI, self.1) // XYZ.P Mi
} else if size >= KI {
write!(f, "{:5.1} Ki{}", size / KI, self.1)
} else {
write!(f, "{:3.0} B", size)
write!(f, "{:3.0} {}", size, self.1)
}
}
}
Expand Down
147 changes: 140 additions & 7 deletions src/dist/component/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::dist::component::transaction::*;

use crate::dist::temp;
use crate::errors::*;
use crate::utils::notifications::Notification;
use crate::utils::utils;

use std::collections::HashSet;
Expand Down Expand Up @@ -194,13 +195,17 @@ fn set_file_perms(_dest_path: &Path, _src_path: &Path) -> Result<()> {
pub struct TarPackage<'a>(DirectoryPackage, temp::Dir<'a>);

impl<'a> TarPackage<'a> {
pub fn new<R: Read>(stream: R, temp_cfg: &'a temp::Cfg) -> Result<Self> {
pub fn new<R: Read>(
stream: R,
temp_cfg: &'a temp::Cfg,
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
) -> Result<Self> {
let temp_dir = temp_cfg.new_directory()?;
let mut archive = tar::Archive::new(stream);
// The rust-installer packages unpack to a directory called
// $pkgname-$version-$target. Skip that directory when
// unpacking.
unpack_without_first_dir(&mut archive, &*temp_dir)?;
unpack_without_first_dir(&mut archive, &*temp_dir, notify_handler)?;

Ok(TarPackage(
DirectoryPackage::new(temp_dir.to_owned(), false)?,
Expand All @@ -209,11 +214,122 @@ impl<'a> TarPackage<'a> {
}
}

fn unpack_without_first_dir<R: Read>(archive: &mut tar::Archive<R>, path: &Path) -> Result<()> {
#[cfg(windows)]
mod unpacker {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use threadpool;

use crate::utils::notifications::Notification;

pub struct Unpacker<'a> {
n_files: Arc<AtomicUsize>,
pool: threadpool::ThreadPool,
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
}

impl<'a> Unpacker<'a> {
pub fn new(notify_handler: Option<&'a dyn Fn(Notification<'_>)>) -> Unpacker {
// Defaults to hardware thread count threads; this is suitable for
// our needs as IO bound operations tend to show up as write latencies
// rather than close latencies, so we don't need to look at
// more threads to get more IO dispatched at this stage in the process.
let pool = threadpool::Builder::new()
.thread_name("CloseHandle".into())
.build();
Unpacker {
n_files: Arc::new(AtomicUsize::new(0)),
pool: pool,
notify_handler: notify_handler,
}
}

pub fn handle(&mut self, unpacked: tar::Unpacked) {
if let tar::Unpacked::File(f) = unpacked {
self.n_files.fetch_add(1, Ordering::Relaxed);
let n_files = self.n_files.clone();
self.pool.execute(move || {
drop(f);
n_files.fetch_sub(1, Ordering::Relaxed);
});
}
}
}

impl<'a> Drop for Unpacker<'a> {
fn drop(&mut self) {
// Some explanation is in order. Even though the tar we are reading from (if
// any) will have had its FileWithProgress download tracking
// completed before we hit drop, that is not true if we are unwinding due to a
// failure, where the logical ownership of the progress bar is
// ambiguous, and as the tracker itself is abstracted out behind
// notifications etc we cannot just query for that. So: we assume no
// more reads of the underlying tar will take place: either the
// error unwinding will stop reads, or we completed; either way, we
// notify finished to the tracker to force a reset to zero; we set
// the units to files, show our progress, and set our units back
// afterwards. The largest archives today - rust docs - have ~20k
// items, and the download tracker's progress is confounded with
// actual handling of data today, we synthesis a data buffer and
// pretend to have bytes to deliver.
self.notify_handler
.map(|handler| handler(Notification::DownloadFinished));
self.notify_handler
.map(|handler| handler(Notification::DownloadPushUnits("handles")));
let mut prev_files = self.n_files.load(Ordering::Relaxed);
self.notify_handler.map(|handler| {
handler(Notification::DownloadContentLengthReceived(
prev_files as u64,
))
});
if prev_files > 50 {
println!("Closing {} deferred file handles", prev_files);
}
let buf: Vec<u8> = vec![0; prev_files];
assert!(32767 > prev_files);
let mut current_files = prev_files;
while current_files != 0 {
use std::thread::sleep;
sleep(std::time::Duration::from_millis(100));
prev_files = current_files;
current_files = self.n_files.load(Ordering::Relaxed);
let step_count = prev_files - current_files;
self.notify_handler.map(|handler| {
handler(Notification::DownloadDataReceived(&buf[0..step_count]))
});
}
self.pool.join();
self.notify_handler
.map(|handler| handler(Notification::DownloadFinished));
self.notify_handler
.map(|handler| handler(Notification::DownloadPopUnits));
}
}
}

#[cfg(not(windows))]
mod unpacker {
use crate::utils::notifications::Notification;
pub struct Unpacker {}
impl Unpacker {
pub fn new<'a>(_notify_handler: Option<&'a dyn Fn(Notification<'_>)>) -> Unpacker {
Unpacker {}
}
pub fn handle(&mut self, _unpacked: tar::Unpacked) {}
}
}

fn unpack_without_first_dir<'a, R: Read>(
archive: &mut tar::Archive<R>,
path: &Path,
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
) -> Result<()> {
let mut unpacker = unpacker::Unpacker::new(notify_handler);
let entries = archive
.entries()
.chain_err(|| ErrorKind::ExtractingPackage)?;
let mut checked_parents: HashSet<PathBuf> = HashSet::new();

for entry in entries {
let mut entry = entry.chain_err(|| ErrorKind::ExtractingPackage)?;
let relpath = {
Expand Down Expand Up @@ -249,6 +365,7 @@ fn unpack_without_first_dir<R: Read>(archive: &mut tar::Archive<R>, path: &Path)
entry.set_preserve_mtime(false);
entry
.unpack(&full_path)
.map(|unpacked| unpacker.handle(unpacked))
.chain_err(|| ErrorKind::ExtractingPackage)?;
}

Expand Down Expand Up @@ -277,9 +394,17 @@ impl<'a> Package for TarPackage<'a> {
pub struct TarGzPackage<'a>(TarPackage<'a>);

impl<'a> TarGzPackage<'a> {
pub fn new<R: Read>(stream: R, temp_cfg: &'a temp::Cfg) -> Result<Self> {
pub fn new<R: Read>(
stream: R,
temp_cfg: &'a temp::Cfg,
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
) -> Result<Self> {
let stream = flate2::read::GzDecoder::new(stream);
Ok(TarGzPackage(TarPackage::new(stream, temp_cfg)?))
Ok(TarGzPackage(TarPackage::new(
stream,
temp_cfg,
notify_handler,
)?))
}
}

Expand All @@ -305,9 +430,17 @@ impl<'a> Package for TarGzPackage<'a> {
pub struct TarXzPackage<'a>(TarPackage<'a>);

impl<'a> TarXzPackage<'a> {
pub fn new<R: Read>(stream: R, temp_cfg: &'a temp::Cfg) -> Result<Self> {
pub fn new<R: Read>(
stream: R,
temp_cfg: &'a temp::Cfg,
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
) -> Result<Self> {
let stream = xz2::read::XzDecoder::new(stream);
Ok(TarXzPackage(TarPackage::new(stream, temp_cfg)?))
Ok(TarXzPackage(TarPackage::new(
stream,
temp_cfg,
notify_handler,
)?))
}
}

Expand Down
Loading

0 comments on commit 5580890

Please sign in to comment.