Skip to content
This repository has been archived by the owner on Jan 10, 2021. It is now read-only.

Commit

Permalink
feat(ignore): Allow post-processing at end-of-thread
Browse files Browse the repository at this point in the history
On top of the parallel-walk's closures, this provides a Visitor API.
- Clarifies the role of the two different closures.
- Allows implementing of `Drop` for post-processing at thread
  completion.

The closure API is maintained not just for compatibility but also
convinience for simple cases.

Fixes BurntSushi#469
  • Loading branch information
epage committed Nov 16, 2019
1 parent 8892bf6 commit 2c8f95f
Showing 1 changed file with 70 additions and 16 deletions.
86 changes: 70 additions & 16 deletions ignore/src/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1068,6 +1068,54 @@ impl WalkState {
}
}

/// Create per-thread `ParallelVisitor`s for `WalkParallel`.
pub trait ParallelVisitorBuilder {
/// Create per-thread `ParallelVisitor`s for `WalkParallel`.
fn build(&mut self) -> Box<dyn ParallelVisitor>;
}

/// Receives files and directories for the current thread/
///
/// - Setup can be implemented in `ParallelVisitorBuilder::build`
/// - Teardown can be implemented in `trait Drop`.
pub trait ParallelVisitor: Send {
/// Receives files and directories for the current thread/
fn visit(&mut self, entry: Result<DirEntry, Error>) -> WalkState;
}

struct FnBuilder<F> {
builder: F,
}

impl<F> FnBuilder<F>
where
F: FnMut() -> Box<dyn FnMut(Result<DirEntry, Error>) -> WalkState + Send + 'static>,
{
fn new(mkf: F) -> Self {
Self { builder: mkf }
}
}

impl<F> ParallelVisitorBuilder for FnBuilder<F>
where
F: FnMut() -> Box<dyn FnMut(Result<DirEntry, Error>) -> WalkState + Send + 'static>,
{
fn build(&mut self) -> Box<dyn ParallelVisitor> {
let visitor = (self.builder)();
Box::new(FnVisitor { visitor })
}
}

struct FnVisitor {
visitor: Box<dyn FnMut(Result<DirEntry, Error>) -> WalkState + Send + 'static>,
}

impl ParallelVisitor for FnVisitor {
fn visit(&mut self, entry: Result<DirEntry, Error>) -> WalkState {
(self.visitor)(entry)
}
}

/// WalkParallel is a parallel recursive directory iterator over files paths
/// in one or more directories.
///
Expand All @@ -1091,11 +1139,17 @@ impl WalkParallel {
/// Execute the parallel recursive directory iterator. `mkf` is called
/// for each thread used for iteration. The function produced by `mkf`
/// is then in turn called for each visited file path.
pub fn run<F>(self, mut mkf: F)
pub fn run<F>(self, mkf: F)
where
F: FnMut() -> Box<dyn FnMut(Result<DirEntry, Error>) -> WalkState + Send + 'static>,
{
let mut f = mkf();
let mut builder = FnBuilder::new(mkf);
self.visit(&mut builder)
}

/// Execute the parallel recursive directory iterator.
pub fn visit(self, builder: &mut dyn ParallelVisitorBuilder) {
let mut visitor = builder.build();
let threads = self.threads();
// TODO: Figure out how to use a bounded channel here. With an
// unbounded channel, the workers can run away and fill up memory
Expand All @@ -1121,7 +1175,7 @@ impl WalkParallel {
Ok(root_device) => Some(root_device),
Err(err) => {
let err = Error::Io(err).with_path(path);
if f(Err(err)).is_quit() {
if visitor.visit(Err(err)).is_quit() {
return;
}
continue;
Expand All @@ -1131,7 +1185,7 @@ impl WalkParallel {
match DirEntryRaw::from_path(0, path, false) {
Ok(dent) => (DirEntry::new_raw(dent, None), root_device),
Err(err) => {
if f(Err(err)).is_quit() {
if visitor.visit(Err(err)).is_quit() {
return;
}
continue;
Expand All @@ -1157,7 +1211,7 @@ impl WalkParallel {
let mut handles = vec![];
for _ in 0..threads {
let worker = Worker {
f: mkf(),
visitor: builder.build(),
tx: tx.clone(),
rx: rx.clone(),
quit_now: quit_now.clone(),
Expand Down Expand Up @@ -1269,7 +1323,7 @@ impl Work {
/// Note that a worker is *both* a producer and a consumer.
struct Worker {
/// The caller's callback.
f: Box<dyn FnMut(Result<DirEntry, Error>) -> WalkState + Send + 'static>,
visitor: Box<dyn ParallelVisitor>,
/// The push side of our mpmc queue.
tx: channel::Sender<Message>,
/// The receive side of our mpmc queue.
Expand Down Expand Up @@ -1313,22 +1367,22 @@ impl Worker {
// If the work is not a directory, then we can just execute the
// caller's callback immediately and move on.
if work.is_symlink() || !work.is_dir() {
if (self.f)(Ok(work.dent)).is_quit() {
if self.visitor.visit(Ok(work.dent)).is_quit() {
self.quit_now();
return;
}
continue;
}
if let Some(err) = work.add_parents() {
if (self.f)(Err(err)).is_quit() {
if self.visitor.visit(Err(err)).is_quit() {
self.quit_now();
return;
}
}
let readdir = match work.read_dir() {
Ok(readdir) => readdir,
Err(err) => {
if (self.f)(Err(err)).is_quit() {
if self.visitor.visit(Err(err)).is_quit() {
self.quit_now();
return;
}
Expand All @@ -1340,7 +1394,7 @@ impl Worker {
Ok(true) => true,
Ok(false) => false,
Err(err) => {
if (self.f)(Err(err)).is_quit() {
if self.visitor.visit(Err(err)).is_quit() {
self.quit_now();
return;
}
Expand All @@ -1352,7 +1406,7 @@ impl Worker {
};

let depth = work.dent.depth();
match (self.f)(Ok(work.dent)) {
match self.visitor.visit(Ok(work.dent)) {
WalkState::Continue => {}
WalkState::Skip => continue,
WalkState::Quit => {
Expand Down Expand Up @@ -1398,13 +1452,13 @@ impl Worker {
let fs_dent = match result {
Ok(fs_dent) => fs_dent,
Err(err) => {
return (self.f)(Err(Error::from(err).with_depth(depth)));
return self.visitor.visit(Err(Error::from(err).with_depth(depth)));
}
};
let mut dent = match DirEntryRaw::from_entry(depth, &fs_dent) {
Ok(dent) => DirEntry::new_raw(dent, None),
Err(err) => {
return (self.f)(Err(err));
return self.visitor.visit(Err(err));
}
};
let is_symlink = dent.file_type().map_or(false, |ft| ft.is_symlink());
Expand All @@ -1413,19 +1467,19 @@ impl Worker {
dent = match DirEntryRaw::from_path(depth, path, true) {
Ok(dent) => DirEntry::new_raw(dent, None),
Err(err) => {
return (self.f)(Err(err));
return self.visitor.visit(Err(err));
}
};
if dent.is_dir() {
if let Err(err) = check_symlink_loop(ig, dent.path(), depth) {
return (self.f)(Err(err));
return self.visitor.visit(Err(err));
}
}
}
if let Some(ref stdout) = self.skip {
let is_stdout = match path_equals(&dent, stdout) {
Ok(is_stdout) => is_stdout,
Err(err) => return (self.f)(Err(err)),
Err(err) => return self.visitor.visit(Err(err)),
};
if is_stdout {
return WalkState::Continue;
Expand Down

0 comments on commit 2c8f95f

Please sign in to comment.