Skip to content

Commit

Permalink
split spawn_receiver(..) and spawn_senders(..) from scan(..).
Browse files Browse the repository at this point in the history
This is just a split commit, refraining from renaming too much.

The drop(tx) call is no longer necessary, as the first sender
is dropped at the end of spawn_senders(..)
  • Loading branch information
alexmaco committed Jan 26, 2019
1 parent 23d44fc commit a05312e
Showing 1 changed file with 38 additions and 17 deletions.
55 changes: 38 additions & 17 deletions src/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::io;
use std::path::PathBuf;
use std::process;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time;
Expand Down Expand Up @@ -54,7 +54,6 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc<Regex>, config: Arc<FdOptions>) {
.expect("Error: Path vector can not be empty");
let (tx, rx) = channel();
let threads = config.threads;
let show_filesystem_errors = config.show_filesystem_errors;

let mut override_builder = OverrideBuilder::new(first_path_buf.as_path());

Expand Down Expand Up @@ -121,8 +120,34 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc<Regex>, config: Arc<FdOptions>) {
}

// Spawn the thread that receives all results through the channel.
let rx_config = Arc::clone(&config);
let receiver_thread = thread::spawn(move || {
let receiver_thread = spawn_receiver(Arc::clone(&config), receiver_wtq, rx);

// Spawn the sender threads.
spawn_senders(
Arc::clone(&config),
pattern,
sender_wtq,
parallel_walker,
tx,
);

// Wait for the receiver thread to print out all results.
receiver_thread.join().unwrap();

if wants_to_quit.load(Ordering::Relaxed) {
process::exit(ExitCode::KilledBySigint.into());
}
}

fn spawn_receiver(
rx_config: Arc<FdOptions>,
receiver_wtq: Arc<AtomicBool>,
rx: Receiver<WorkerResult>,
) -> thread::JoinHandle<()> {
let show_filesystem_errors = rx_config.show_filesystem_errors;
let threads = rx_config.threads;

thread::spawn(move || {
// This will be set to `Some` if the `--exec` argument was supplied.
if let Some(ref cmd) = rx_config.command {
if cmd.in_batch_mode() {
Expand Down Expand Up @@ -222,9 +247,16 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc<Regex>, config: Arc<FdOptions>) {
}
}
}
});
})
}

// Spawn the sender threads.
fn spawn_senders(
config: Arc<FdOptions>,
pattern: Arc<Regex>,
sender_wtq: Arc<AtomicBool>,
parallel_walker: ignore::WalkParallel,
tx: Sender<WorkerResult>,
) {
parallel_walker.run(|| {
let config = Arc::clone(&config);
let pattern = Arc::clone(&pattern);
Expand Down Expand Up @@ -349,15 +381,4 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc<Regex>, config: Arc<FdOptions>) {
ignore::WalkState::Continue
})
});

// Drop the initial sender. If we don't do this, the receiver will block even
// if all threads have finished, since there is still one sender around.
drop(tx);

// Wait for the receiver thread to print out all results.
receiver_thread.join().unwrap();

if wants_to_quit.load(Ordering::Relaxed) {
process::exit(ExitCode::KilledBySigint.into());
}
}

0 comments on commit a05312e

Please sign in to comment.