Skip to content

Commit

Permalink
Merge #785 #790 #791
Browse files Browse the repository at this point in the history
785: Remove the lifetime constraint from the scope OP r=cuviper a=cuviper

We already know the `OP` lifetime must outlive the `scope` call itself,
and we'll execute it synchronously, but we don't need to tie that to the
`'scope` lifetime. By relaxing this, we can create scopes for different
lifetimes than `OP` can satisfy, even fully `'static`. They will still
await completion by the end of the `scope` call, but this can be useful
for collecting the invariant scopes from different contexts, as the new
nested tests demonstrate (adapted from @pkolaczk's code in #782).

790: Micro-optimize the WorkerThread::steal loop r=cuviper a=cuviper

This lifts up the loop for `Steal::Retry`, so we continue with other
threads before trying again. For most benchmarks, the performance change
is in the noise, but this gives a consistent improvement in most of the
microbenches -- up to 8% faster on `join_microbench::increment_all_max`.

791: Use crossbeam_deque::Injector instead of crossbeam_queue::SegQueue r=cuviper a=cuviper

`Injector` and `SegQueue` are _almost_ identical, down to the very same
comments in their implementations. One difference is that `Injector`
allocates its first block as soon as it's created, but `SegQueue` waits
until its first `push`, which complicates it to allow being `null`.
`Injector` also has methods to steal batches into a deque `Worker`,
which might be useful to us.

At the very least, this lets us trim a dependency.

Co-authored-by: Josh Stone <cuviper@gmail.com>
  • Loading branch information
bors[bot] and cuviper authored Aug 24, 2020
4 parents 9f7357b + 66559fe + 2e88929 + c7d963a commit a0e5833
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 51 deletions.
12 changes: 0 additions & 12 deletions ci/compat-Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion rayon-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ num_cpus = "1.2"
lazy_static = "1"
crossbeam-channel = "0.4.2"
crossbeam-deque = "0.7.2"
crossbeam-queue = "0.2"
crossbeam-utils = "0.7"

[dev-dependencies]
Expand Down
14 changes: 10 additions & 4 deletions rayon-core/src/job.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::latch::Latch;
use crate::unwind;
use crossbeam_queue::SegQueue;
use crossbeam_deque::{Injector, Steal};
use std::any::Any;
use std::cell::UnsafeCell;
use std::mem;
Expand Down Expand Up @@ -184,13 +184,13 @@ impl<T> JobResult<T> {

/// Indirect queue to provide FIFO job priority.
pub(super) struct JobFifo {
inner: SegQueue<JobRef>,
inner: Injector<JobRef>,
}

impl JobFifo {
pub(super) fn new() -> Self {
JobFifo {
inner: SegQueue::new(),
inner: Injector::new(),
}
}

Expand All @@ -206,6 +206,12 @@ impl JobFifo {
impl Job for JobFifo {
unsafe fn execute(this: *const Self) {
// We "execute" a queue by executing its first job, FIFO.
(*this).inner.pop().expect("job in fifo queue").execute()
loop {
match (*this).inner.steal() {
Steal::Success(job_ref) => break job_ref.execute(),
Steal::Empty => panic!("FIFO is empty"),
Steal::Retry => {}
}
}
}
}
61 changes: 36 additions & 25 deletions rayon-core/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use crate::util::leak;
use crate::{
ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
};
use crossbeam_deque::{Steal, Stealer, Worker};
use crossbeam_queue::SegQueue;
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
use std::any::Any;
use std::cell::Cell;
use std::collections::hash_map::DefaultHasher;
Expand Down Expand Up @@ -136,7 +135,7 @@ pub(super) struct Registry {
logger: Logger,
thread_infos: Vec<ThreadInfo>,
sleep: Sleep,
injected_jobs: SegQueue<JobRef>,
injected_jobs: Injector<JobRef>,
panic_handler: Option<Box<PanicHandler>>,
start_handler: Option<Box<StartHandler>>,
exit_handler: Option<Box<ExitHandler>>,
Expand Down Expand Up @@ -240,7 +239,7 @@ impl Registry {
logger: logger.clone(),
thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
sleep: Sleep::new(logger, n_threads),
injected_jobs: SegQueue::new(),
injected_jobs: Injector::new(),
terminate_count: AtomicUsize::new(1),
panic_handler: builder.take_panic_handler(),
start_handler: builder.take_start_handler(),
Expand Down Expand Up @@ -413,13 +412,18 @@ impl Registry {
}

fn pop_injected_job(&self, worker_index: usize) -> Option<JobRef> {
let job = self.injected_jobs.pop().ok();
if job.is_some() {
self.log(|| JobUninjected {
worker: worker_index,
});
loop {
match self.injected_jobs.steal() {
Steal::Success(job) => {
self.log(|| JobUninjected {
worker: worker_index,
});
return Some(job);
}
Steal::Empty => return None,
Steal::Retry => {}
}
}
job
}

/// If already in a worker-thread of this registry, just execute `op`.
Expand Down Expand Up @@ -758,32 +762,39 @@ impl WorkerThread {
debug_assert!(self.local_deque_is_empty());

// otherwise, try to steal
let num_threads = self.registry.thread_infos.len();
let thread_infos = &self.registry.thread_infos.as_slice();
let num_threads = thread_infos.len();
if num_threads <= 1 {
return None;
}

let start = self.rng.next_usize(num_threads);
(start..num_threads)
.chain(0..start)
.filter(|&i| i != self.index)
.filter_map(|victim_index| {
let victim = &self.registry.thread_infos[victim_index];
loop {
loop {
let mut retry = false;
let start = self.rng.next_usize(num_threads);
let job = (start..num_threads)
.chain(0..start)
.filter(move |&i| i != self.index)
.find_map(|victim_index| {
let victim = &thread_infos[victim_index];
match victim.stealer.steal() {
Steal::Empty => return None,
Steal::Success(d) => {
Steal::Success(job) => {
self.log(|| JobStolen {
worker: self.index,
victim: victim_index,
});
return Some(d);
Some(job)
}
Steal::Empty => None,
Steal::Retry => {
retry = true;
None
}
Steal::Retry => {}
}
}
})
.next()
});
if job.is_some() || !retry {
return job;
}
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions rayon-core/src/scope/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ struct ScopeBase<'scope> {
/// propagated at that point.
pub fn scope<'scope, OP, R>(op: OP) -> R
where
OP: for<'s> FnOnce(&'s Scope<'scope>) -> R + 'scope + Send,
OP: FnOnce(&Scope<'scope>) -> R + Send,
R: Send,
{
in_worker(|owner_thread, _| {
Expand Down Expand Up @@ -376,7 +376,7 @@ where
/// panics are propagated at that point.
pub fn scope_fifo<'scope, OP, R>(op: OP) -> R
where
OP: for<'s> FnOnce(&'s ScopeFifo<'scope>) -> R + 'scope + Send,
OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
R: Send,
{
in_worker(|owner_thread, _| {
Expand Down
82 changes: 81 additions & 1 deletion rayon-core/src/scope/test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::unwind;
use crate::ThreadPoolBuilder;
use crate::{scope, scope_fifo, Scope};
use crate::{scope, scope_fifo, Scope, ScopeFifo};
use rand::{Rng, SeedableRng};
use rand_xorshift::XorShiftRng;
use std::cmp;
Expand Down Expand Up @@ -433,3 +433,83 @@ fn mixed_fifo_lifo_order() {
let expected = vec![-3, 0, -2, 1, -1, 2, 3];
assert_eq!(vec, expected);
}

#[test]
fn static_scope() {
static COUNTER: AtomicUsize = AtomicUsize::new(0);

let mut range = 0..100;
let sum = range.clone().sum();
let iter = &mut range;

COUNTER.store(0, Ordering::Relaxed);
scope(|s: &Scope<'static>| {
// While we're allowed the locally borrowed iterator,
// the spawns must be static.
for i in iter {
s.spawn(move |_| {
COUNTER.fetch_add(i, Ordering::Relaxed);
});
}
});

assert_eq!(COUNTER.load(Ordering::Relaxed), sum);
}

#[test]
fn static_scope_fifo() {
static COUNTER: AtomicUsize = AtomicUsize::new(0);

let mut range = 0..100;
let sum = range.clone().sum();
let iter = &mut range;

COUNTER.store(0, Ordering::Relaxed);
scope_fifo(|s: &ScopeFifo<'static>| {
// While we're allowed the locally borrowed iterator,
// the spawns must be static.
for i in iter {
s.spawn_fifo(move |_| {
COUNTER.fetch_add(i, Ordering::Relaxed);
});
}
});

assert_eq!(COUNTER.load(Ordering::Relaxed), sum);
}

#[test]
fn mixed_lifetime_scope() {
fn increment<'slice, 'counter>(counters: &'slice [&'counter AtomicUsize]) {
scope(move |s: &Scope<'counter>| {
// We can borrow 'slice here, but the spawns can only borrow 'counter.
for &c in counters {
s.spawn(move |_| {
c.fetch_add(1, Ordering::Relaxed);
});
}
});
}

let counter = AtomicUsize::new(0);
increment(&[&counter; 100]);
assert_eq!(counter.into_inner(), 100);
}

#[test]
fn mixed_lifetime_scope_fifo() {
fn increment<'slice, 'counter>(counters: &'slice [&'counter AtomicUsize]) {
scope_fifo(move |s: &ScopeFifo<'counter>| {
// We can borrow 'slice here, but the spawns can only borrow 'counter.
for &c in counters {
s.spawn_fifo(move |_| {
c.fetch_add(1, Ordering::Relaxed);
});
}
});
}

let counter = AtomicUsize::new(0);
increment(&[&counter; 100]);
assert_eq!(counter.into_inner(), 100);
}
4 changes: 2 additions & 2 deletions rayon-core/src/thread_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl ThreadPool {
/// [scope]: fn.scope.html
pub fn scope<'scope, OP, R>(&self, op: OP) -> R
where
OP: for<'s> FnOnce(&'s Scope<'scope>) -> R + 'scope + Send,
OP: FnOnce(&Scope<'scope>) -> R + Send,
R: Send,
{
self.install(|| scope(op))
Expand All @@ -215,7 +215,7 @@ impl ThreadPool {
/// [scope_fifo]: fn.scope_fifo.html
pub fn scope_fifo<'scope, OP, R>(&self, op: OP) -> R
where
OP: for<'s> FnOnce(&'s ScopeFifo<'scope>) -> R + 'scope + Send,
OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
R: Send,
{
self.install(|| scope_fifo(op))
Expand Down
Loading

0 comments on commit a0e5833

Please sign in to comment.