Skip to content

Commit

Permalink
Avoid blocking on CloseHandle
Browse files Browse the repository at this point in the history
On Windows closing a file involves CloseHandle, which can be quite
slow (6+ms) for various reasons, including circumstances outside our
control such as virus scanners, content indexing, device driver cache
write-back synchronisation and so forth. Rather than having a super
long list of all the things users need to do to optimise the performance
of CloseHandle, use a small threadpool to avoid blocking package
extraction when closing file handles.

This does run a risk of resource exhaustion, but as we only have 20k
files in the largest package today that should not be a problem in
practice.

https://www.mercurial-scm.org/pipermail/mercurial-devel/2016-January/078404.html
provided inspiration for this.

My benchmark system went from 21/22s to 11s with this change with both
4 or 8 threads.
  • Loading branch information
rbtcollins committed May 18, 2019
1 parent fd6c31f commit ff0dc0d
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 20 deletions.
23 changes: 17 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ scopeguard = "1"
semver = "0.9"
sha2 = "0.8"
strsim = "0.9.1"
tar = "0.4"
tar = "0.4.26"
tempdir = "0.3.4"
# FIXME(issue #1788)
term = "=0.5.1"
threadpool = "1"
time = "0.1.34"
toml = "0.5"
url = "1"
Expand Down
147 changes: 140 additions & 7 deletions src/dist/component/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::dist::component::transaction::*;

use crate::dist::temp;
use crate::errors::*;
use crate::utils::notifications::Notification;
use crate::utils::utils;

use std::collections::HashSet;
Expand Down Expand Up @@ -194,13 +195,17 @@ fn set_file_perms(_dest_path: &Path, _src_path: &Path) -> Result<()> {
pub struct TarPackage<'a>(DirectoryPackage, temp::Dir<'a>);

impl<'a> TarPackage<'a> {
pub fn new<R: Read>(stream: R, temp_cfg: &'a temp::Cfg) -> Result<Self> {
pub fn new<R: Read>(
stream: R,
temp_cfg: &'a temp::Cfg,
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
) -> Result<Self> {
let temp_dir = temp_cfg.new_directory()?;
let mut archive = tar::Archive::new(stream);
// The rust-installer packages unpack to a directory called
// $pkgname-$version-$target. Skip that directory when
// unpacking.
unpack_without_first_dir(&mut archive, &*temp_dir)?;
unpack_without_first_dir(&mut archive, &*temp_dir, notify_handler)?;

Ok(TarPackage(
DirectoryPackage::new(temp_dir.to_owned(), false)?,
Expand All @@ -209,11 +214,122 @@ impl<'a> TarPackage<'a> {
}
}

fn unpack_without_first_dir<R: Read>(archive: &mut tar::Archive<R>, path: &Path) -> Result<()> {
#[cfg(windows)]
mod unpacker {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use threadpool;

use crate::utils::notifications::Notification;

pub struct Unpacker<'a> {
n_files: Arc<AtomicUsize>,
pool: threadpool::ThreadPool,
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
}

impl<'a> Unpacker<'a> {
pub fn new(notify_handler: Option<&'a dyn Fn(Notification<'_>)>) -> Unpacker {
// Defaults to hardware thread count threads; this is suitable for
// our needs as IO bound operations tend to show up as write latencies
// rather than close latencies, so we don't need to look at
// more threads to get more IO dispatched at this stage in the process.
let pool = threadpool::Builder::new()
.thread_name("CloseHandle".into())
.build();
Unpacker {
n_files: Arc::new(AtomicUsize::new(0)),
pool: pool,
notify_handler: notify_handler,
}
}

pub fn handle(&mut self, unpacked: tar::Unpacked) {
if let tar::Unpacked::File(f) = unpacked {
self.n_files.fetch_add(1, Ordering::Relaxed);
let n_files = self.n_files.clone();
self.pool.execute(move || {
drop(f);
n_files.fetch_sub(1, Ordering::Relaxed);
});
}
}
}

impl<'a> Drop for Unpacker<'a> {
fn drop(&mut self) {
// Some explanation is in order. Even though the tar we are reading from (if
// any) will have had its FileWithProgress download tracking
// completed before we hit drop, that is not true if we are unwinding due to a
// failure, where the logical ownership of the progress bar is
// ambiguous, and as the tracker itself is abstracted out behind
// notifications etc we cannot just query for that. So: we assume no
// more reads of the underlying tar will take place: either the
// error unwinding will stop reads, or we completed; either way, we
// notify finished to the tracker to force a reset to zero; we set
// the units to files, show our progress, and set our units back
// afterwards. The largest archives today - rust docs - have ~20k
// items, and the download tracker's progress is confounded with
// actual handling of data today, we synthesis a data buffer and
// pretend to have bytes to deliver.
self.notify_handler
.map(|handler| handler(Notification::DownloadFinished));
self.notify_handler
.map(|handler| handler(Notification::DownloadPushUnits("handles")));
let mut prev_files = self.n_files.load(Ordering::Relaxed);
self.notify_handler.map(|handler| {
handler(Notification::DownloadContentLengthReceived(
prev_files as u64,
))
});
if prev_files > 50 {
println!("Closing {} deferred file handles", prev_files);
}
let buf: Vec<u8> = vec![0; prev_files];
assert!(32767 > prev_files);
let mut current_files = prev_files;
while current_files != 0 {
use std::thread::sleep;
sleep(std::time::Duration::from_millis(100));
prev_files = current_files;
current_files = self.n_files.load(Ordering::Relaxed);
let step_count = prev_files - current_files;
self.notify_handler.map(|handler| {
handler(Notification::DownloadDataReceived(&buf[0..step_count]))
});
}
self.pool.join();
self.notify_handler
.map(|handler| handler(Notification::DownloadFinished));
self.notify_handler
.map(|handler| handler(Notification::DownloadPopUnits));
}
}
}

#[cfg(not(windows))]
mod unpacker {
use crate::utils::notifications::Notification;
pub struct Unpacker {}
impl Unpacker {
pub fn new<'a>(_notify_handler: Option<&'a dyn Fn(Notification<'_>)>) -> Unpacker {
Unpacker {}
}
pub fn handle(&mut self, _unpacked: tar::Unpacked) {}
}
}

fn unpack_without_first_dir<'a, R: Read>(
archive: &mut tar::Archive<R>,
path: &Path,
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
) -> Result<()> {
let mut unpacker = unpacker::Unpacker::new(notify_handler);
let entries = archive
.entries()
.chain_err(|| ErrorKind::ExtractingPackage)?;
let mut checked_parents: HashSet<PathBuf> = HashSet::new();

for entry in entries {
let mut entry = entry.chain_err(|| ErrorKind::ExtractingPackage)?;
let relpath = {
Expand Down Expand Up @@ -249,6 +365,7 @@ fn unpack_without_first_dir<R: Read>(archive: &mut tar::Archive<R>, path: &Path)
entry.set_preserve_mtime(false);
entry
.unpack(&full_path)
.map(|unpacked| unpacker.handle(unpacked))
.chain_err(|| ErrorKind::ExtractingPackage)?;
}

Expand Down Expand Up @@ -277,9 +394,17 @@ impl<'a> Package for TarPackage<'a> {
pub struct TarGzPackage<'a>(TarPackage<'a>);

impl<'a> TarGzPackage<'a> {
pub fn new<R: Read>(stream: R, temp_cfg: &'a temp::Cfg) -> Result<Self> {
pub fn new<R: Read>(
stream: R,
temp_cfg: &'a temp::Cfg,
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
) -> Result<Self> {
let stream = flate2::read::GzDecoder::new(stream);
Ok(TarGzPackage(TarPackage::new(stream, temp_cfg)?))
Ok(TarGzPackage(TarPackage::new(
stream,
temp_cfg,
notify_handler,
)?))
}
}

Expand All @@ -305,9 +430,17 @@ impl<'a> Package for TarGzPackage<'a> {
pub struct TarXzPackage<'a>(TarPackage<'a>);

impl<'a> TarXzPackage<'a> {
pub fn new<R: Read>(stream: R, temp_cfg: &'a temp::Cfg) -> Result<Self> {
pub fn new<R: Read>(
stream: R,
temp_cfg: &'a temp::Cfg,
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
) -> Result<Self> {
let stream = xz2::read::XzDecoder::new(stream);
Ok(TarXzPackage(TarPackage::new(stream, temp_cfg)?))
Ok(TarXzPackage(TarPackage::new(
stream,
temp_cfg,
notify_handler,
)?))
}
}

Expand Down
11 changes: 6 additions & 5 deletions src/dist/manifestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,20 +204,20 @@ impl Manifestation {
component.target.as_ref(),
));

let gz;
let xz;
let notification_converter = |notification: crate::utils::Notification<'_>| {
notify_handler(notification.into());
};
let gz;
let xz;
let reader =
utils::FileReaderWithProgress::new_file(&installer_file, &notification_converter)?;
let package: &dyn Package = match format {
Format::Gz => {
gz = TarGzPackage::new(reader, temp_cfg)?;
gz = TarGzPackage::new(reader, temp_cfg, Some(&notification_converter))?;
&gz
}
Format::Xz => {
xz = TarXzPackage::new(reader, temp_cfg)?;
xz = TarXzPackage::new(reader, temp_cfg, Some(&notification_converter))?;
&xz
}
};
Expand Down Expand Up @@ -407,7 +407,8 @@ impl Manifestation {
};
let reader =
utils::FileReaderWithProgress::new_file(&installer_file, &notification_converter)?;
let package: &dyn Package = &TarGzPackage::new(reader, temp_cfg)?;
let package: &dyn Package =
&TarGzPackage::new(reader, temp_cfg, Some(&notification_converter))?;

for component in package.components() {
tx = package.install(&self.installation, &component, None, tx)?;
Expand Down
3 changes: 2 additions & 1 deletion src/install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ impl<'a> InstallMethod<'a> {
notify_handler(notification.into());
};
let reader = utils::FileReaderWithProgress::new_file(&src, &notification_converter)?;
let package: &dyn Package = &TarGzPackage::new(reader, temp_cfg)?;
let package: &dyn Package =
&TarGzPackage::new(reader, temp_cfg, Some(&notification_converter))?;

let mut tx = Transaction::new(prefix.clone(), temp_cfg, notify_handler);

Expand Down

0 comments on commit ff0dc0d

Please sign in to comment.