Skip to content

Commit

Permalink
crates/ignore: switch to depth first traversal
Browse files Browse the repository at this point in the history
This replaces the use of channels in the parallel directory traversal
with a simple stack. The primary motivation for this change is to reduce
peak memory usage. In particular, when using a channel (which is a
queue), we wind up visiting files in a breadth first fashion. Using a
stack switches us to a depth first traversal. While there are no real
intrinsic differences, depth first traversal generally tends to use less
memory because directory trees are more commonly wide than they are
deep.

In particular, the queue/stack size itself is not the only concern. In
one recent case documented in #1550, a user wanted to search all Rust
crates. The directory structure was shallow but extremely wide, with a
single directory containing all crates. This in turn results is in
descending into each of those directories and building a gitignore
matcher for each (since most crates have `.gitignore` files) before ever
searching a single file. This means that ripgrep has all such matchers
in memory simultaneously, which winds up using quite a bit of memory.

In a depth first traversal, peak memory usage is much lower because
gitignore matches are built and discarded more quickly. In the case of
searching all crates, the peak memory usage decrease is dramatic. On my
system, it shrinks by an order magnitude, from almost 1GB to 50MB. The
decline in peak memory usage is consistent across other use cases as
well, but is typically more modest. For example, searching the Linux
repo has a 50% decrease in peak memory usage and searching the Chromium
repo has a 25% decrease in peak memory usage.

Search times generally remain unchanged, although some ad hoc benchmarks
that I typically run have gotten a bit slower. As far as I can tell,
this appears to be result of scheduling changes. Namely, the depth first
traversal seems to result in searching some very large files towards the
end of the search, which reduces the effectiveness of parallelism and
makes the overall search take longer. This seems to suggest that a stack
isn't optimal. It would instead perhaps be better to prioritize
searching larger files first, but it's not quite clear how to do this
without introducing more overhead (getting the file size for each file
requires a stat call).

Fixes #1550
  • Loading branch information
BurntSushi committed Apr 18, 2020
1 parent afb325f commit b1c2964
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 43 deletions.
4 changes: 3 additions & 1 deletion crates/ignore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ name = "ignore"
bench = false

[dependencies]
crossbeam-channel = "0.4.0"
crossbeam-utils = "0.7.0"
globset = { version = "0.4.3", path = "../globset" }
lazy_static = "1.1"
Expand All @@ -32,5 +31,8 @@ walkdir = "2.2.7"
[target.'cfg(windows)'.dependencies.winapi-util]
version = "0.1.2"

[dev-dependencies]
crossbeam-channel = "0.4.0"

[features]
simd-accel = ["globset/simd-accel"]
1 change: 0 additions & 1 deletion crates/ignore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ See the documentation for `WalkBuilder` for many other options.

#![deny(missing_docs)]

extern crate crossbeam_channel as channel;
extern crate globset;
#[macro_use]
extern crate lazy_static;
Expand Down
95 changes: 54 additions & 41 deletions crates/ignore/src/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use std::fs::{self, FileType, Metadata};
use std::io;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use std::vec;

use channel::{self, TryRecvError};
use same_file::Handle;
use walkdir::{self, WalkDir};

Expand Down Expand Up @@ -364,7 +365,8 @@ impl DirEntryRaw {
})
}

// Placeholder implementation to allow compiling on non-standard platforms (e.g. wasm32).
// Placeholder implementation to allow compiling on non-standard platforms
// (e.g. wasm32).
#[cfg(not(any(windows, unix)))]
fn from_entry_os(
depth: usize,
Expand Down Expand Up @@ -413,7 +415,8 @@ impl DirEntryRaw {
})
}

// Placeholder implementation to allow compiling on non-standard platforms (e.g. wasm32).
// Placeholder implementation to allow compiling on non-standard platforms
// (e.g. wasm32).
#[cfg(not(any(windows, unix)))]
fn from_path(
depth: usize,
Expand Down Expand Up @@ -1186,16 +1189,9 @@ impl WalkParallel {
/// can be merged together into a single data structure.
pub fn visit(mut self, builder: &mut dyn ParallelVisitorBuilder) {
let threads = self.threads();
// TODO: Figure out how to use a bounded channel here. With an
// unbounded channel, the workers can run away and fill up memory
// with all of the file paths. But a bounded channel doesn't work since
// our producers are also are consumers, so they end up getting stuck.
//
// We probably need to rethink parallel traversal completely to fix
// this. The best case scenario would be finding a way to use rayon
// to do this.
let (tx, rx) = channel::unbounded();
let stack = Arc::new(Mutex::new(vec![]));
{
let mut stack = stack.lock().unwrap();
let mut visitor = builder.build();
let mut paths = Vec::new().into_iter();
std::mem::swap(&mut paths, &mut self.paths);
Expand Down Expand Up @@ -1232,28 +1228,27 @@ impl WalkParallel {
}
}
};
tx.send(Message::Work(Work {
stack.push(Message::Work(Work {
dent: dent,
ignore: self.ig_root.clone(),
root_device: root_device,
}))
.unwrap();
}));
}
// ... but there's no need to start workers if we don't need them.
if tx.is_empty() {
if stack.is_empty() {
return;
}
}
// Create the workers and then wait for them to finish.
let quit_now = Arc::new(AtomicBool::new(false));
let num_pending = Arc::new(AtomicUsize::new(tx.len()));
let num_pending =
Arc::new(AtomicUsize::new(stack.lock().unwrap().len()));
crossbeam_utils::thread::scope(|s| {
let mut handles = vec![];
for _ in 0..threads {
let worker = Worker {
visitor: builder.build(),
tx: tx.clone(),
rx: rx.clone(),
stack: stack.clone(),
quit_now: quit_now.clone(),
num_pending: num_pending.clone(),
max_depth: self.max_depth,
Expand All @@ -1263,8 +1258,6 @@ impl WalkParallel {
};
handles.push(s.spawn(|_| worker.run()));
}
drop(tx);
drop(rx);
for handle in handles {
handle.join().unwrap();
}
Expand Down Expand Up @@ -1362,10 +1355,13 @@ impl Work {
struct Worker<'s> {
/// The caller's callback.
visitor: Box<dyn ParallelVisitor + 's>,
/// The push side of our mpmc queue.
tx: channel::Sender<Message>,
/// The receive side of our mpmc queue.
rx: channel::Receiver<Message>,
/// A stack of work to do.
///
/// We use a stack instead of a channel because a stack lets us visit
/// directories in depth first order. This can substantially reduce peak
/// memory usage by keeping both the number of files path and gitignore
/// matchers in memory lower.
stack: Arc<Mutex<Vec<Message>>>,
/// Whether all workers should terminate at the next opportunity. Note
/// that we need this because we don't want other `Work` to be done after
/// we quit. We wouldn't need this if have a priority channel.
Expand Down Expand Up @@ -1550,25 +1546,25 @@ impl<'s> Worker<'s> {
/// If all work has been exhausted, then this returns None. The worker
/// should then subsequently quit.
fn get_work(&mut self) -> Option<Work> {
let mut value = self.rx.try_recv();
let mut value = self.recv();
loop {
// Simulate a priority channel: If quit_now flag is set, we can
// receive only quit messages.
if self.is_quit_now() {
value = Ok(Message::Quit)
value = Some(Message::Quit)
}
match value {
Ok(Message::Work(work)) => {
Some(Message::Work(work)) => {
return Some(work);
}
Ok(Message::Quit) => {
Some(Message::Quit) => {
// Repeat quit message to wake up sleeping threads, if
// any. The domino effect will ensure that every thread
// will quit.
self.tx.send(Message::Quit).unwrap();
self.send_quit();
return None;
}
Err(TryRecvError::Empty) => {
None => {
// Once num_pending reaches 0, it is impossible for it to
// ever increase again. Namely, it only reaches 0 once
// all jobs have run such that no jobs have produced more
Expand All @@ -1580,17 +1576,21 @@ impl<'s> Worker<'s> {
if self.num_pending() == 0 {
// Every other thread is blocked at the next recv().
// Send the initial quit message and quit.
self.tx.send(Message::Quit).unwrap();
self.send_quit();
return None;
}
// Wait for next `Work` or `Quit` message.
value = Ok(self
.rx
.recv()
.expect("channel disconnected while worker is alive"));
}
Err(TryRecvError::Disconnected) => {
unreachable!("channel disconnected while worker is alive");
loop {
if let Some(v) = self.recv() {
value = Some(v);
break;
}
// Our stack isn't blocking. Instead of burning the
// CPU waiting, we let the thread sleep for a bit. In
// general, this tends to only occur once the search is
// approaching termination.
thread::sleep(Duration::from_millis(1));
}
}
}
}
Expand All @@ -1614,7 +1614,20 @@ impl<'s> Worker<'s> {
/// Send work.
fn send(&self, work: Work) {
self.num_pending.fetch_add(1, Ordering::SeqCst);
self.tx.send(Message::Work(work)).unwrap();
let mut stack = self.stack.lock().unwrap();
stack.push(Message::Work(work));
}

/// Send a quit message.
fn send_quit(&self) {
let mut stack = self.stack.lock().unwrap();
stack.push(Message::Quit);
}

/// Receive work.
fn recv(&self) -> Option<Message> {
let mut stack = self.stack.lock().unwrap();
stack.pop()
}

/// Signal that work has been received.
Expand Down

0 comments on commit b1c2964

Please sign in to comment.