Skip to content

Commit

Permalink
progress sketch
Browse files Browse the repository at this point in the history
  • Loading branch information
rbtcollins committed May 18, 2019
1 parent baaf6f6 commit 33c4cc7
Showing 1 changed file with 113 additions and 21 deletions.
134 changes: 113 additions & 21 deletions src/dist/component/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::dist::component::transaction::*;

use crate::dist::temp;
use crate::errors::*;
use crate::utils::notifications::Notification;
use crate::utils::utils;

use std::collections::HashSet;
Expand Down Expand Up @@ -200,7 +201,7 @@ impl<'a> TarPackage<'a> {
// The rust-installer packages unpack to a directory called
// $pkgname-$version-$target. Skip that directory when
// unpacking.
unpack_without_first_dir(&mut archive, &*temp_dir)?;
unpack_without_first_dir(&mut archive, &*temp_dir, None)?;

Ok(TarPackage(
DirectoryPackage::new(temp_dir.to_owned(), false)?,
Expand All @@ -209,17 +210,120 @@ impl<'a> TarPackage<'a> {
}
}

fn unpack_without_first_dir<R: Read>(archive: &mut tar::Archive<R>, path: &Path) -> Result<()> {
#[cfg(windows)]

// impl<'a> FileReaderWithProgress<'a> {
// pub fn new_file(path: &Path, notify_handler: &'a dyn Fn(Notification<'_>)) -> Result<Self> {
// let fh = match std::fs::File::open(path) {
// Ok(fh) => fh,
// Err(_) => Err(ErrorKind::ReadingFile {
// name: "downloaded",
// path: path.to_path_buf(),
// })?,
// };

// // Inform the tracker of the file size
// let flen = fh.metadata()?.len();
// (notify_handler)(Notification::DownloadContentLengthReceived(flen));

// let fh = BufReader::with_capacity(8 * 1024 * 1024, fh);

// Ok(FileReaderWithProgress {
// fh,
// notify_handler,
// nbytes: 0,
// flen: flen,
// })
// }
// }

// impl<'a> std::io::Read for FileReaderWithProgress<'a> {
// fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
// match self.fh.read(buf) {
// Ok(nbytes) => {
// self.nbytes += nbytes as u64;
// if nbytes != 0 {
// (self.notify_handler)(Notification::DownloadDataReceived(&buf[0..nbytes]));
// }
// if (nbytes == 0) || (self.flen == self.nbytes) {
// (self.notify_handler)(Notification::DownloadFinished);
// }
// Ok(nbytes)

mod unpacker {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use threadpool::ThreadPool;

use crate::utils::notifications::Notification;

pub struct Unpacker<'a> {
n_files: Arc<AtomicUsize>,
pool: threadpool::ThreadPool,
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
}

impl<'a> Unpacker<'a> {
pub fn new(notify_handler: Option<&'a dyn Fn(Notification<'_>)>) -> Unpacker {
let n_workers = 4;
let pool = ThreadPool::with_name("CloseHandle".into(), n_workers);
Unpacker {
n_files: Arc::new(AtomicUsize::new(0)),
pool: pool,
notify_handler: notify_handler,
}
}

pub fn handle(&mut self, unpacked: tar::Unpacked) {
if let tar::Unpacked::File(f) = unpacked {
self.n_files.fetch_add(1, Ordering::Relaxed);
let n_files = self.n_files.clone();
self.pool.execute(move || {
drop(f);
n_files.fetch_sub(1, Ordering::Relaxed);
});
}
}
}

impl<'a> Drop for Unpacker<'a> {
fn drop(&mut self) {
let mut n_files = self.n_files.load(Ordering::Relaxed);
while n_files != 0 {
use std::thread::sleep;
sleep(std::time::Duration::from_millis(100));
n_files = self.n_files.load(Ordering::Relaxed);

println!("pending close of {} files", n_files);
}
// (notify_handler)(Notification::DownloadContentLengthReceived(flen))
self.pool.join();
}
}
}

#[cfg(not(windows))]
mod unpacker {
use crate::utils::notifications::Notification;
pub struct Unpacker<'a> {}
impl<'a> Unpacker<'a> {
pub fn new(_notify_handler: Option<&'a dyn Fn(Notification<'_>)>) -> Unpacker {
Unpacker {}
}
pub fn handle(&mut self, _unpacked: tar::Unpacked) {}
}
}

fn unpack_without_first_dir<'a, R: Read>(
archive: &mut tar::Archive<R>,
path: &Path,
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
) -> Result<()> {
let mut unpacker = unpacker::Unpacker::new(notify_handler);
let entries = archive
.entries()
.chain_err(|| ErrorKind::ExtractingPackage)?;
let mut checked_parents: HashSet<PathBuf> = HashSet::new();
use threadpool::ThreadPool;
use std::sync::mpsc::channel;
let n_workers = 4;
let mut n_files = 0;
let pool = ThreadPool::new(n_workers);
let(tx, rx) = channel();

for entry in entries {
let mut entry = entry.chain_err(|| ErrorKind::ExtractingPackage)?;
Expand Down Expand Up @@ -256,22 +360,10 @@ fn unpack_without_first_dir<R: Read>(archive: &mut tar::Archive<R>, path: &Path)
entry.set_preserve_mtime(false);
entry
.unpack(&full_path)
.map(|unpacked| {
if let tar::Unpacked::File(f) = unpacked {
let tx = tx.clone();
pool.execute(move|| { drop(f);
tx.send(1).expect("channel should be open");

});
n_files += 1;
}
})
.map(|unpacked| unpacker.handle(unpacked))
.chain_err(|| ErrorKind::ExtractingPackage)?;
}

// ensure all files have been closed.
rx.iter().take(n_files).fold(0, |a, b| a + b);

Ok(())
}

Expand Down

0 comments on commit 33c4cc7

Please sign in to comment.