Skip to content

Commit

Permalink
Add preliminary support for loom (#487)
Browse files Browse the repository at this point in the history
This patch only adds support to parts of `utils` and to `epoch`. Some
parts of `utils` had to be left out, since they rely on
`AtomicUsize::new` being `const` (which it is not in `loom`). Other
parts had to be left out due to the lack of `thread::Thread` in `loom`.
All the parts needed for `epoch` were successfully moved to loom.

For this initial patch, there are two loom tests, both in `epoch`. One
is a simple test of defer_destroy while a pin is held, and the other is
the Triber stack example. They both pass loom with
`LOOM_MAX_PREEMPTIONS=3` and `LOOM_MAX_PREEMPTIONS=2`. The latter tests
fewer possible interleavings, but completes in 13 minutes on my laptop
rather than ~2 hours. I have added loom testing of `epoch` to CI as
well.

Note that the uses of `UnsafeCell` in `utils` have not been moved to
`loom::cell::UnsafeCell`, as loom's `UnsafeCell` does not support `T:
?Sized`, which `AtomicCell` depends on.

Fixes #486.
  • Loading branch information
jonhoo authored Dec 30, 2020
1 parent 469b27f commit e4c6650
Show file tree
Hide file tree
Showing 24 changed files with 405 additions and 143 deletions.
12 changes: 12 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,17 @@ jobs:
- name: clippy
run: ./ci/clippy.sh

# Run loom tests.
loom:
name: loom
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- name: Install Rust
run: rustup update stable && rustup default stable
- name: loom
run: ./ci/crossbeam-epoch-loom.sh

# This job doesn't actually test anything, but they're used to tell bors the
# build completed, as there is no practical way to detect when a workflow is
# successful listening to webhooks only.
Expand All @@ -120,6 +131,7 @@ jobs:
- dependencies
- rustfmt
- clippy
- loom
runs-on: ubuntu-latest
steps:
- name: Mark the job as a success
Expand Down
11 changes: 11 additions & 0 deletions ci/crossbeam-epoch-loom.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash

cd "$(dirname "$0")"/../crossbeam-epoch
set -ex

export RUSTFLAGS="-D warnings --cfg=loom_crossbeam"

# With MAX_PREEMPTIONS=2 the loom tests (currently) take around 11m.
# If we were to run with =3, they would take several times that,
# which is probably too costly for CI.
env LOOM_MAX_PREEMPTIONS=2 cargo test --test loom --features sanitize --release -- --nocapture
7 changes: 7 additions & 0 deletions crossbeam-epoch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ cfg-if = "1"
const_fn = { version = "0.4.4", optional = true }
memoffset = "0.6"

# Enable the use of loom for concurrency testing.
#
# This configuration option is outside of the normal semver guarantees: minor
# versions of crossbeam may make breaking changes to it at any time.
[target.'cfg(loom_crossbeam)'.dependencies]
loom = "0.4"

[dependencies.crossbeam-utils]
version = "0.8"
path = "../crossbeam-utils"
Expand Down
107 changes: 0 additions & 107 deletions crossbeam-epoch/examples/treiber_stack.rs

This file was deleted.

19 changes: 15 additions & 4 deletions crossbeam-epoch/src/atomic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use core::marker::PhantomData;
use core::mem::{self, MaybeUninit};
use core::ops::{Deref, DerefMut};
use core::slice;
use core::sync::atomic::{AtomicUsize, Ordering};
use core::sync::atomic::Ordering;

use crate::alloc::alloc;
use crate::alloc::boxed::Box;
use crate::primitive::sync::atomic::AtomicUsize;
use crate::guard::Guard;
use crossbeam_utils::atomic::AtomicConsume;

Expand Down Expand Up @@ -326,7 +327,7 @@ impl<T: ?Sized + Pointable> Atomic<T> {
/// let a = Atomic::<i32>::null();
/// ```
///
#[cfg_attr(feature = "nightly", const_fn::const_fn)]
#[cfg_attr(all(feature = "nightly", not(loom_crossbeam)), const_fn::const_fn)]
pub fn null() -> Atomic<T> {
Self {
data: AtomicUsize::new(0),
Expand Down Expand Up @@ -637,7 +638,17 @@ impl<T: ?Sized + Pointable> Atomic<T> {
/// }
/// ```
pub unsafe fn into_owned(self) -> Owned<T> {
Owned::from_usize(self.data.into_inner())
#[cfg(loom_crossbeam)]
{
// FIXME: loom does not yet support into_inner, so we use unsync_load for now,
// which should have the same synchronization properties:
// https://github.com/tokio-rs/loom/issues/117
Owned::from_usize(self.data.unsync_load())
}
#[cfg(not(loom_crossbeam))]
{
Owned::from_usize(self.data.into_inner())
}
}
}

Expand Down Expand Up @@ -1357,7 +1368,7 @@ impl<T: ?Sized + Pointable> Default for Shared<'_, T> {
}
}

#[cfg(test)]
#[cfg(all(test, not(loom_crossbeam)))]
mod tests {
use super::Shared;

Expand Down
10 changes: 5 additions & 5 deletions crossbeam-epoch/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
/// ```
use core::fmt;

use crate::alloc::sync::Arc;
use crate::primitive::sync::Arc;
use crate::guard::Guard;
use crate::internal::{Global, Local};

Expand Down Expand Up @@ -109,7 +109,7 @@ impl fmt::Debug for LocalHandle {
}
}

#[cfg(test)]
#[cfg(all(test, not(loom_crossbeam)))]
mod tests {
use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -151,9 +151,9 @@ mod tests {
let a = Owned::new(7).into_shared(guard);
guard.defer_destroy(a);

assert!(!(*(*guard.local).bag.get()).is_empty());
assert!(!(*guard.local).bag.with(|b| (*b).is_empty()));

while !(*(*guard.local).bag.get()).is_empty() {
while !(*guard.local).bag.with(|b| (*b).is_empty()) {
guard.flush();
}
}
Expand All @@ -172,7 +172,7 @@ mod tests {
let a = Owned::new(7).into_shared(guard);
guard.defer_destroy(a);
}
assert!(!(*(*guard.local).bag.get()).is_empty());
assert!(!(*guard.local).bag.with(|b| (*b).is_empty()));
}
}

Expand Down
4 changes: 2 additions & 2 deletions crossbeam-epoch/src/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
//! destructed on thread exit, which in turn unregisters the thread.
use crate::collector::{Collector, LocalHandle};
use crate::primitive::{lazy_static, thread_local};
use crate::guard::Guard;
use lazy_static::lazy_static;

lazy_static! {
/// The global data for the default garbage collector.
Expand Down Expand Up @@ -45,7 +45,7 @@ where
.unwrap_or_else(|_| f(&COLLECTOR.register()))
}

#[cfg(test)]
#[cfg(all(test, not(loom_crossbeam)))]
mod tests {
use crossbeam_utils::thread;

Expand Down
2 changes: 1 addition & 1 deletion crossbeam-epoch/src/deferred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl Deferred {
}
}

#[cfg(test)]
#[cfg(all(test, not(loom_crossbeam)))]
mod tests {
use super::Deferred;
use std::cell::Cell;
Expand Down
3 changes: 2 additions & 1 deletion crossbeam-epoch/src/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
//! If an object became garbage in some epoch, then we can be sure that after two advancements no
//! participant will hold a reference to it. That is the crux of safe memory reclamation.
use core::sync::atomic::{AtomicUsize, Ordering};
use crate::primitive::sync::atomic::AtomicUsize;
use core::sync::atomic::Ordering;

/// An epoch that can be marked as pinned or unpinned.
///
Expand Down
18 changes: 10 additions & 8 deletions crossbeam-epoch/src/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@
//! Ideally each instance of concurrent data structure may have its own queue that gets fully
//! destroyed as soon as the data structure gets dropped.
use core::cell::{Cell, UnsafeCell};
use crate::primitive::cell::UnsafeCell;
use crate::primitive::sync::atomic;
use core::cell::Cell;
use core::mem::{self, ManuallyDrop};
use core::num::Wrapping;
use core::sync::atomic;
use core::sync::atomic::Ordering;
use core::{fmt, ptr};

Expand Down Expand Up @@ -418,7 +419,7 @@ impl Local {
/// Returns a reference to the `Collector` in which this `Local` resides.
#[inline]
pub fn collector(&self) -> &Collector {
unsafe { &**self.collector.get() }
self.collector.with(|c| unsafe { &**c })
}

/// Returns `true` if the current participant is pinned.
Expand All @@ -433,7 +434,7 @@ impl Local {
///
/// It should be safe for another thread to execute the given function.
pub unsafe fn defer(&self, mut deferred: Deferred, guard: &Guard) {
let bag = &mut *self.bag.get();
let bag = self.bag.with_mut(|b| &mut *b);

while let Err(d) = bag.try_push(deferred) {
self.global().push_bag(bag, guard);
Expand All @@ -442,7 +443,7 @@ impl Local {
}

pub fn flush(&self, guard: &Guard) {
let bag = unsafe { &mut *self.bag.get() };
let bag = self.bag.with_mut(|b| unsafe { &mut *b });

if !bag.is_empty() {
self.global().push_bag(bag, guard);
Expand Down Expand Up @@ -583,7 +584,8 @@ impl Local {
// Pin and move the local bag into the global queue. It's important that `push_bag`
// doesn't defer destruction on any new garbage.
let guard = &self.pin();
self.global().push_bag(&mut *self.bag.get(), guard);
self.global()
.push_bag(self.bag.with_mut(|b| &mut *b), guard);
}
// Revert the handle count back to zero.
self.handle_count.set(0);
Expand All @@ -592,7 +594,7 @@ impl Local {
// Take the reference to the `Global` out of this `Local`. Since we're not protected
// by a guard at this time, it's crucial that the reference is read before marking the
// `Local` as deleted.
let collector: Collector = ptr::read(&*(*self.collector.get()));
let collector: Collector = ptr::read(self.collector.with(|c| &*(*c)));

// Mark this node in the linked list as deleted.
self.entry.delete(unprotected());
Expand Down Expand Up @@ -623,7 +625,7 @@ impl IsElement<Local> for Local {
}
}

#[cfg(test)]
#[cfg(all(test, not(loom_crossbeam)))]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};

Expand Down
Loading

0 comments on commit e4c6650

Please sign in to comment.