Skip to content

Commit

Permalink
feat: Add a loom implementation for event-listener
Browse files Browse the repository at this point in the history
  • Loading branch information
jbr authored and notgull committed Mar 30, 2024
1 parent 58dbfc8 commit f402b7e
Show file tree
Hide file tree
Showing 6 changed files with 347 additions and 47 deletions.
11 changes: 11 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,14 @@ jobs:
- uses: rustsec/audit-check@master
with:
token: ${{ secrets.GITHUB_TOKEN }}

loom:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Rust
run: rustup update stable
- name: Loom tests
run: RUSTFLAGS="--cfg=loom" cargo test --release --test loom --features loom


6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@ exclude = ["/.*"]
default = ["std"]
std = ["concurrent-queue/std", "parking"]
portable-atomic = ["portable-atomic-util", "portable_atomic_crate"]
loom = ["concurrent-queue/loom", "parking?/loom", "dep:loom"]

[dependencies]
concurrent-queue = { version = "2.2.0", default-features = false }
concurrent-queue = { version = "2.4.0", default-features = false }
pin-project-lite = "0.2.12"
portable-atomic-util = { version = "0.1.4", default-features = false, optional = true, features = ["alloc"] }

[target.'cfg(not(target_family = "wasm"))'.dependencies]
parking = { version = "2.0.0", optional = true }

[target.'cfg(loom)'.dependencies]
loom = { version = "0.7", optional = true }

[dependencies.portable_atomic_crate]
package = "portable-atomic"
version = "1.2.0"
Expand Down
79 changes: 74 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ use {
};

use sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use sync::{Arc, WithMut};
use sync::Arc;

#[cfg(not(loom))]
use sync::WithMut;

use notify::{Internal, NotificationPrivate};
pub use notify::{IntoNotification, Notification};
Expand Down Expand Up @@ -216,13 +219,20 @@ impl<T> Event<T> {
///
/// let event = Event::<usize>::with_tag();
/// ```
#[cfg(feature = "std")]
#[cfg(all(feature = "std", not(loom)))]
#[inline]
pub const fn with_tag() -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
}
}
#[cfg(all(feature = "std", loom))]
#[inline]
pub fn with_tag() -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
}
}

/// Tell whether any listeners are currently notified.
///
Expand Down Expand Up @@ -543,12 +553,21 @@ impl Event<()> {
/// let event = Event::new();
/// ```
#[inline]
#[cfg(not(loom))]
pub const fn new() -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
}
}

#[inline]
#[cfg(loom)]
pub fn new() -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
}
}

/// Notifies a number of active listeners without emitting a `SeqCst` fence.
///
/// The number is allowed to be zero or exceed the current number of listeners.
Expand Down Expand Up @@ -1119,6 +1138,12 @@ impl<T, B: Borrow<Inner<T>> + Unpin> InnerListener<T, B> {
match deadline {
None => parker.park(),

#[cfg(loom)]
Some(_deadline) => {
panic!("parking does not support timeouts under loom");
}

#[cfg(not(loom))]
Some(deadline) => {
// Make sure we're not timed out already.
let now = Instant::now();
Expand Down Expand Up @@ -1330,10 +1355,9 @@ const NEVER_INSERTED_PANIC: &str = "\
EventListener was not inserted into the linked list, make sure you're not polling \
EventListener/listener! after it has finished";

#[cfg(not(loom))]
/// Synchronization primitive implementation.
mod sync {
pub(super) use core::cell;

#[cfg(not(feature = "portable-atomic"))]
pub(super) use alloc::sync::Arc;
#[cfg(not(feature = "portable-atomic"))]
Expand All @@ -1344,7 +1368,7 @@ mod sync {
#[cfg(feature = "portable-atomic")]
pub(super) use portable_atomic_util::Arc;

#[cfg(feature = "std")]
#[cfg(all(feature = "std", not(loom)))]
pub(super) use std::sync::{Mutex, MutexGuard};

pub(super) trait WithMut {
Expand All @@ -1366,6 +1390,51 @@ mod sync {
f(self.get_mut())
}
}

pub(crate) mod cell {
pub(crate) use core::cell::Cell;

/// This newtype around *mut T exists for interoperability with loom::cell::ConstPtr,
/// which works as a guard and performs additional logic to track access scope.
pub(crate) struct ConstPtr<T>(*mut T);
impl<T> ConstPtr<T> {
pub(crate) unsafe fn deref(&self) -> &T {
&*self.0
}

#[allow(unused)] // std code does not need this
pub(crate) unsafe fn deref_mut(&mut self) -> &mut T {
&mut *self.0
}
}

/// This UnsafeCell wrapper exists for interoperability with loom::cell::UnsafeCell, and
/// only contains the interface that is needed for this crate.
#[derive(Debug, Default)]
pub(crate) struct UnsafeCell<T>(core::cell::UnsafeCell<T>);

impl<T> UnsafeCell<T> {
pub(crate) fn new(data: T) -> UnsafeCell<T> {
UnsafeCell(core::cell::UnsafeCell::new(data))
}

pub(crate) fn get(&self) -> ConstPtr<T> {
ConstPtr(self.0.get())
}

#[allow(dead_code)] // no_std does not need this
pub(crate) fn into_inner(self) -> T {
self.0.into_inner()
}
}
}
}

#[cfg(loom)]
/// Synchronization primitive implementation.
mod sync {
pub(super) use loom::cell;
pub(super) use loom::sync::{atomic, Arc, Mutex, MutexGuard};
}

fn __test_send_and_sync() {
Expand Down
17 changes: 12 additions & 5 deletions src/no_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use node::{Node, NothingProducer, TaskWaiting};

use crate::notify::{GenericNotify, Internal, Notification};
use crate::sync::atomic::{AtomicBool, Ordering};
use crate::sync::cell::{Cell, UnsafeCell};
use crate::sync::cell::{Cell, ConstPtr, UnsafeCell};
use crate::sync::Arc;
use crate::{RegisterResult, State, Task, TaskRef};

Expand Down Expand Up @@ -771,7 +771,10 @@ impl<T> Mutex<T> {
.is_ok()
{
// We have successfully locked the mutex.
Some(MutexGuard { mutex: self })
Some(MutexGuard {
mutex: self,
guard: self.value.get(),
})
} else {
self.try_lock_slow()
}
Expand All @@ -790,7 +793,10 @@ impl<T> Mutex<T> {
.is_ok()
{
// We have successfully locked the mutex.
return Some(MutexGuard { mutex: self });
return Some(MutexGuard {
mutex: self,
guard: self.value.get(),
});
}

// Use atomic loads instead of compare-exchange.
Expand All @@ -804,6 +810,7 @@ impl<T> Mutex<T> {

pub(crate) struct MutexGuard<'a, T> {
mutex: &'a Mutex<T>,
guard: ConstPtr<T>,
}

impl<'a, T> Drop for MutexGuard<'a, T> {
Expand All @@ -816,13 +823,13 @@ impl<'a, T> ops::Deref for MutexGuard<'a, T> {
type Target = T;

fn deref(&self) -> &T {
unsafe { &*self.mutex.value.get() }
unsafe { self.guard.deref() }
}
}

impl<'a, T> ops::DerefMut for MutexGuard<'a, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.mutex.value.get() }
unsafe { self.guard.deref_mut() }
}
}

Expand Down
69 changes: 33 additions & 36 deletions src/std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,27 +73,27 @@ impl<T> crate::Inner<T> {
pub(crate) fn insert(&self, mut listener: Pin<&mut Option<Listener<T>>>) {
let mut inner = self.lock();

// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe {
listener.as_mut().set(Some(Listener {
link: UnsafeCell::new(Link {
state: Cell::new(State::Created),
prev: Cell::new(inner.tail),
next: Cell::new(None),
}),
_pin: PhantomPinned,
}));
let listener = listener.as_pin_mut().unwrap();

// Get the inner pointer.
&*listener.link.get()
};

// Replace the tail with the new entry.
match mem::replace(&mut inner.tail, Some(entry.into())) {
None => inner.head = Some(entry.into()),
Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) },
};
listener.as_mut().set(Some(Listener {
link: UnsafeCell::new(Link {
state: Cell::new(State::Created),
prev: Cell::new(inner.tail),
next: Cell::new(None),
}),
_pin: PhantomPinned,
}));
let listener = listener.as_pin_mut().unwrap();

{
let entry_guard = listener.link.get();
// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe { entry_guard.deref() };

// Replace the tail with the new entry.
match mem::replace(&mut inner.tail, Some(entry.into())) {
None => inner.head = Some(entry.into()),
Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) },
};
}

// If there are no unnotified entries, this is the first one.
if inner.next.is_none() {
Expand Down Expand Up @@ -129,15 +129,12 @@ impl<T> crate::Inner<T> {
task: TaskRef<'_>,
) -> RegisterResult<T> {
let mut inner = self.lock();

// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe {
let listener = match listener.as_mut().as_pin_mut() {
Some(listener) => listener,
None => return RegisterResult::NeverInserted,
};
&*listener.link.get()
let entry_guard = match listener.as_mut().as_pin_mut() {
Some(listener) => listener.link.get(),
None => return RegisterResult::NeverInserted,
};
// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe { entry_guard.deref() };

// Take out the state and check it.
match entry.state.replace(State::NotifiedTaken) {
Expand Down Expand Up @@ -175,12 +172,8 @@ impl<T> Inner<T> {
mut listener: Pin<&mut Option<Listener<T>>>,
propagate: bool,
) -> Option<State<T>> {
let entry = unsafe {
let listener = listener.as_mut().as_pin_mut()?;

// Get the inner pointer.
&*listener.link.get()
};
let entry_guard = listener.as_mut().as_pin_mut()?.link.get();
let entry = unsafe { entry_guard.deref() };

let prev = entry.prev.get();
let next = entry.next.get();
Expand Down Expand Up @@ -216,7 +209,11 @@ impl<T> Inner<T> {
.into_inner()
};

let mut state = entry.state.into_inner();
// This State::Created is immediately dropped and exists as a workaround for the absence of
// loom::cell::Cell::into_inner. The intent is `let mut state = entry.state.into_inner();`
//
// refs: https://github.com/tokio-rs/loom/pull/341
let mut state = entry.state.replace(State::Created);

// Update the notified count.
if state.is_notified() {
Expand Down
Loading

0 comments on commit f402b7e

Please sign in to comment.