Skip to content

Commit

Permalink
ignore: use work-stealing stack instead of Arc<Mutex<Vec<_>>>
Browse files Browse the repository at this point in the history
This represents yet another iteration on how `ignore` enqueues and
distributes work in parallel. The original implementation used a
multi-producer/multi-consumer thread safe queue from crossbeam. At some
point, I migrated to a simple `Arc<Mutex<Vec<_>>>` and treated it as a
stack so that we did depth first traversal. This helped with memory
usage in very wide directories.

But it turns out that a naive stack-behind-a-mutex can be quite a bit
slower than something that's a little smarter, such as a work-stealing
stack used in this commit. My hypothesis for why this helps is that
without the stealing component, work distribution can get stuck in
sub-optimal configurations that depend on which directory entries get
assigned to a particular worker. It's likely that this can result in
some workers getting "more" work than others, just by chance, and thus
remain idle. But the work-stealing approach heads that off.

This does re-introduce a dependency on parts of crossbeam which is kind
of a bummer, but it's carrying its weight for now.

Closes #1823, Closes #2591
Ref sharkdp/fd#28
  • Loading branch information
tavianator authored and BurntSushi committed Sep 20, 2023
1 parent cad1f5f commit d938e95
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 22 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ Unreleased changes. Release notes have not yet been written.
`rg -B1 -A2`. That is, `-A` and `-B` no longer completely override `-C`.
Instead, they only partially override `-C`.

Performance improvements:

* [PERF #2591](https://github.com/BurntSushi/ripgrep/pull/2591):
Parallel directory traversal now uses work stealing for faster searches.

Feature enhancements:

* Added or improved file type filtering for Ada, DITA, Elixir, Fuchsia, Gentoo,
Expand Down
40 changes: 40 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/ignore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ name = "ignore"
bench = false

[dependencies]
crossbeam-deque = "0.8.3"
globset = { version = "0.4.10", path = "../globset" }
lazy_static = "1.1"
log = "0.4.5"
Expand Down
108 changes: 86 additions & 22 deletions crates/ignore/src/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ use std::ffi::OsStr;
use std::fmt;
use std::fs::{self, FileType, Metadata};
use std::io;
use std::iter::FusedIterator;
use std::iter::{self, FusedIterator};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::vec;

use crossbeam_deque::{Stealer, Worker as Deque};
use same_file::Handle;
use walkdir::{self, WalkDir};

Expand Down Expand Up @@ -1231,9 +1232,8 @@ impl WalkParallel {
/// can be merged together into a single data structure.
pub fn visit(mut self, builder: &mut dyn ParallelVisitorBuilder<'_>) {
let threads = self.threads();
let stack = Arc::new(Mutex::new(vec![]));
let mut stack = vec![];
{
let mut stack = stack.lock().unwrap();
let mut visitor = builder.build();
let mut paths = Vec::new().into_iter();
std::mem::swap(&mut paths, &mut self.paths);
Expand Down Expand Up @@ -1283,24 +1283,24 @@ impl WalkParallel {
}
// Create the workers and then wait for them to finish.
let quit_now = Arc::new(AtomicBool::new(false));
let num_pending =
Arc::new(AtomicUsize::new(stack.lock().unwrap().len()));
let num_pending = Arc::new(AtomicUsize::new(stack.len()));
let stacks = Stack::new_for_each_thread(threads, stack);
std::thread::scope(|s| {
let mut handles = vec![];
for _ in 0..threads {
let worker = Worker {
let handles: Vec<_> = stacks
.into_iter()
.map(|stack| Worker {
visitor: builder.build(),
stack: stack.clone(),
stack,
quit_now: quit_now.clone(),
num_pending: num_pending.clone(),
max_depth: self.max_depth,
max_filesize: self.max_filesize,
follow_links: self.follow_links,
skip: self.skip.clone(),
filter: self.filter.clone(),
};
handles.push(s.spawn(|| worker.run()));
}
})
.map(|worker| s.spawn(|| worker.run()))
.collect();
for handle in handles {
handle.join().unwrap();
}
Expand Down Expand Up @@ -1390,20 +1390,87 @@ impl Work {
}
}

/// A work-stealing stack.
#[derive(Debug)]
struct Stack {
/// This thread's index.
index: usize,
/// The thread-local stack.
deque: Deque<Message>,
/// The work stealers.
stealers: Arc<[Stealer<Message>]>,
}

impl Stack {
/// Create a work-stealing stack for each thread. The given messages
/// correspond to the initial paths to start the search at. They will
/// be distributed automatically to each stack in a round-robin fashion.
fn new_for_each_thread(threads: usize, init: Vec<Message>) -> Vec<Stack> {
// Using new_lifo() ensures each worker operates depth-first, not
// breadth-first. We do depth-first because a breadth first traversal
// on wide directories with a lot of gitignores is disastrous (for
// example, searching a directory tree containing all of crates.io).
let deques: Vec<Deque<Message>> =
iter::repeat_with(Deque::new_lifo).take(threads).collect();
let stealers = Arc::<[Stealer<Message>]>::from(
deques.iter().map(Deque::stealer).collect::<Vec<_>>(),
);
let stacks: Vec<Stack> = deques
.into_iter()
.enumerate()
.map(|(index, deque)| Stack {
index,
deque,
stealers: stealers.clone(),
})
.collect();
// Distribute the initial messages.
init.into_iter()
.zip(stacks.iter().cycle())
.for_each(|(m, s)| s.push(m));
stacks
}

/// Push a message.
fn push(&self, msg: Message) {
self.deque.push(msg);
}

/// Pop a message.
fn pop(&self) -> Option<Message> {
self.deque.pop().or_else(|| self.steal())
}

/// Steal a message from another queue.
fn steal(&self) -> Option<Message> {
// For fairness, try to steal from index - 1, then index - 2, ... 0,
// then wrap around to len - 1, len - 2, ... index + 1.
let (left, right) = self.stealers.split_at(self.index);
// Don't steal from ourselves
let right = &right[1..];

left.iter()
.rev()
.chain(right.iter().rev())
.map(|s| s.steal_batch_and_pop(&self.deque))
.find_map(|s| s.success())
}
}

/// A worker is responsible for descending into directories, updating the
/// ignore matchers, producing new work and invoking the caller's callback.
///
/// Note that a worker is *both* a producer and a consumer.
struct Worker<'s> {
/// The caller's callback.
visitor: Box<dyn ParallelVisitor + 's>,
/// A stack of work to do.
/// A work-stealing stack of work to do.
///
/// We use a stack instead of a channel because a stack lets us visit
/// directories in depth first order. This can substantially reduce peak
/// memory usage by keeping both the number of files path and gitignore
/// memory usage by keeping both the number of file paths and gitignore
/// matchers in memory lower.
stack: Arc<Mutex<Vec<Message>>>,
stack: Stack,
/// Whether all workers should terminate at the next opportunity. Note
/// that we need this because we don't want other `Work` to be done after
/// we quit. We wouldn't need this if have a priority channel.
Expand Down Expand Up @@ -1668,20 +1735,17 @@ impl<'s> Worker<'s> {
/// Send work.
fn send(&self, work: Work) {
self.num_pending.fetch_add(1, Ordering::SeqCst);
let mut stack = self.stack.lock().unwrap();
stack.push(Message::Work(work));
self.stack.push(Message::Work(work));
}

/// Send a quit message.
fn send_quit(&self) {
let mut stack = self.stack.lock().unwrap();
stack.push(Message::Quit);
self.stack.push(Message::Quit);
}

/// Receive work.
fn recv(&self) -> Option<Message> {
let mut stack = self.stack.lock().unwrap();
stack.pop()
self.stack.pop()
}

/// Signal that work has been finished.
Expand Down

0 comments on commit d938e95

Please sign in to comment.