From f402b7e24cd0558d174a4a51b7757e9109920084 Mon Sep 17 00:00:00 2001 From: Jacob Rothstein Date: Thu, 28 Mar 2024 15:24:35 -0700 Subject: [PATCH] feat: Add a loom implementation for event-listener --- .github/workflows/ci.yml | 11 ++ Cargo.toml | 6 +- src/lib.rs | 79 ++++++++++++++- src/no_std.rs | 17 +++- src/std.rs | 69 ++++++------- tests/loom.rs | 212 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 347 insertions(+), 47 deletions(-) create mode 100644 tests/loom.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9998336..a0113c2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -114,3 +114,14 @@ jobs: - uses: rustsec/audit-check@master with: token: ${{ secrets.GITHUB_TOKEN }} + + loom: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Install Rust + run: rustup update stable + - name: Loom tests + run: RUSTFLAGS="--cfg=loom" cargo test --release --test loom --features loom + + diff --git a/Cargo.toml b/Cargo.toml index 185d6e0..ff118b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,15 +18,19 @@ exclude = ["/.*"] default = ["std"] std = ["concurrent-queue/std", "parking"] portable-atomic = ["portable-atomic-util", "portable_atomic_crate"] +loom = ["concurrent-queue/loom", "parking?/loom", "dep:loom"] [dependencies] -concurrent-queue = { version = "2.2.0", default-features = false } +concurrent-queue = { version = "2.4.0", default-features = false } pin-project-lite = "0.2.12" portable-atomic-util = { version = "0.1.4", default-features = false, optional = true, features = ["alloc"] } [target.'cfg(not(target_family = "wasm"))'.dependencies] parking = { version = "2.0.0", optional = true } +[target.'cfg(loom)'.dependencies] +loom = { version = "0.7", optional = true } + [dependencies.portable_atomic_crate] package = "portable-atomic" version = "1.2.0" diff --git a/src/lib.rs b/src/lib.rs index e6123a9..08d6037 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -105,7 +105,10 @@ use { }; use sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; -use sync::{Arc, WithMut}; +use sync::Arc; + +#[cfg(not(loom))] +use sync::WithMut; use notify::{Internal, NotificationPrivate}; pub use notify::{IntoNotification, Notification}; @@ -216,13 +219,20 @@ impl Event { /// /// let event = Event::::with_tag(); /// ``` - #[cfg(feature = "std")] + #[cfg(all(feature = "std", not(loom)))] #[inline] pub const fn with_tag() -> Self { Self { inner: AtomicPtr::new(ptr::null_mut()), } } + #[cfg(all(feature = "std", loom))] + #[inline] + pub fn with_tag() -> Self { + Self { + inner: AtomicPtr::new(ptr::null_mut()), + } + } /// Tell whether any listeners are currently notified. /// @@ -543,12 +553,21 @@ impl Event<()> { /// let event = Event::new(); /// ``` #[inline] + #[cfg(not(loom))] pub const fn new() -> Self { Self { inner: AtomicPtr::new(ptr::null_mut()), } } + #[inline] + #[cfg(loom)] + pub fn new() -> Self { + Self { + inner: AtomicPtr::new(ptr::null_mut()), + } + } + /// Notifies a number of active listeners without emitting a `SeqCst` fence. /// /// The number is allowed to be zero or exceed the current number of listeners. @@ -1119,6 +1138,12 @@ impl> + Unpin> InnerListener { match deadline { None => parker.park(), + #[cfg(loom)] + Some(_deadline) => { + panic!("parking does not support timeouts under loom"); + } + + #[cfg(not(loom))] Some(deadline) => { // Make sure we're not timed out already. let now = Instant::now(); @@ -1330,10 +1355,9 @@ const NEVER_INSERTED_PANIC: &str = "\ EventListener was not inserted into the linked list, make sure you're not polling \ EventListener/listener! after it has finished"; +#[cfg(not(loom))] /// Synchronization primitive implementation. mod sync { - pub(super) use core::cell; - #[cfg(not(feature = "portable-atomic"))] pub(super) use alloc::sync::Arc; #[cfg(not(feature = "portable-atomic"))] @@ -1344,7 +1368,7 @@ mod sync { #[cfg(feature = "portable-atomic")] pub(super) use portable_atomic_util::Arc; - #[cfg(feature = "std")] + #[cfg(all(feature = "std", not(loom)))] pub(super) use std::sync::{Mutex, MutexGuard}; pub(super) trait WithMut { @@ -1366,6 +1390,51 @@ mod sync { f(self.get_mut()) } } + + pub(crate) mod cell { + pub(crate) use core::cell::Cell; + + /// This newtype around *mut T exists for interoperability with loom::cell::ConstPtr, + /// which works as a guard and performs additional logic to track access scope. + pub(crate) struct ConstPtr(*mut T); + impl ConstPtr { + pub(crate) unsafe fn deref(&self) -> &T { + &*self.0 + } + + #[allow(unused)] // std code does not need this + pub(crate) unsafe fn deref_mut(&mut self) -> &mut T { + &mut *self.0 + } + } + + /// This UnsafeCell wrapper exists for interoperability with loom::cell::UnsafeCell, and + /// only contains the interface that is needed for this crate. + #[derive(Debug, Default)] + pub(crate) struct UnsafeCell(core::cell::UnsafeCell); + + impl UnsafeCell { + pub(crate) fn new(data: T) -> UnsafeCell { + UnsafeCell(core::cell::UnsafeCell::new(data)) + } + + pub(crate) fn get(&self) -> ConstPtr { + ConstPtr(self.0.get()) + } + + #[allow(dead_code)] // no_std does not need this + pub(crate) fn into_inner(self) -> T { + self.0.into_inner() + } + } + } +} + +#[cfg(loom)] +/// Synchronization primitive implementation. +mod sync { + pub(super) use loom::cell; + pub(super) use loom::sync::{atomic, Arc, Mutex, MutexGuard}; } fn __test_send_and_sync() { diff --git a/src/no_std.rs b/src/no_std.rs index f445ea6..a2e6dc0 100644 --- a/src/no_std.rs +++ b/src/no_std.rs @@ -16,7 +16,7 @@ use node::{Node, NothingProducer, TaskWaiting}; use crate::notify::{GenericNotify, Internal, Notification}; use crate::sync::atomic::{AtomicBool, Ordering}; -use crate::sync::cell::{Cell, UnsafeCell}; +use crate::sync::cell::{Cell, ConstPtr, UnsafeCell}; use crate::sync::Arc; use crate::{RegisterResult, State, Task, TaskRef}; @@ -771,7 +771,10 @@ impl Mutex { .is_ok() { // We have successfully locked the mutex. - Some(MutexGuard { mutex: self }) + Some(MutexGuard { + mutex: self, + guard: self.value.get(), + }) } else { self.try_lock_slow() } @@ -790,7 +793,10 @@ impl Mutex { .is_ok() { // We have successfully locked the mutex. - return Some(MutexGuard { mutex: self }); + return Some(MutexGuard { + mutex: self, + guard: self.value.get(), + }); } // Use atomic loads instead of compare-exchange. @@ -804,6 +810,7 @@ impl Mutex { pub(crate) struct MutexGuard<'a, T> { mutex: &'a Mutex, + guard: ConstPtr, } impl<'a, T> Drop for MutexGuard<'a, T> { @@ -816,13 +823,13 @@ impl<'a, T> ops::Deref for MutexGuard<'a, T> { type Target = T; fn deref(&self) -> &T { - unsafe { &*self.mutex.value.get() } + unsafe { self.guard.deref() } } } impl<'a, T> ops::DerefMut for MutexGuard<'a, T> { fn deref_mut(&mut self) -> &mut T { - unsafe { &mut *self.mutex.value.get() } + unsafe { self.guard.deref_mut() } } } diff --git a/src/std.rs b/src/std.rs index 239dc74..3e7f76a 100644 --- a/src/std.rs +++ b/src/std.rs @@ -73,27 +73,27 @@ impl crate::Inner { pub(crate) fn insert(&self, mut listener: Pin<&mut Option>>) { let mut inner = self.lock(); - // SAFETY: We are locked, so we can access the inner `link`. - let entry = unsafe { - listener.as_mut().set(Some(Listener { - link: UnsafeCell::new(Link { - state: Cell::new(State::Created), - prev: Cell::new(inner.tail), - next: Cell::new(None), - }), - _pin: PhantomPinned, - })); - let listener = listener.as_pin_mut().unwrap(); - - // Get the inner pointer. - &*listener.link.get() - }; - - // Replace the tail with the new entry. - match mem::replace(&mut inner.tail, Some(entry.into())) { - None => inner.head = Some(entry.into()), - Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) }, - }; + listener.as_mut().set(Some(Listener { + link: UnsafeCell::new(Link { + state: Cell::new(State::Created), + prev: Cell::new(inner.tail), + next: Cell::new(None), + }), + _pin: PhantomPinned, + })); + let listener = listener.as_pin_mut().unwrap(); + + { + let entry_guard = listener.link.get(); + // SAFETY: We are locked, so we can access the inner `link`. + let entry = unsafe { entry_guard.deref() }; + + // Replace the tail with the new entry. + match mem::replace(&mut inner.tail, Some(entry.into())) { + None => inner.head = Some(entry.into()), + Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) }, + }; + } // If there are no unnotified entries, this is the first one. if inner.next.is_none() { @@ -129,15 +129,12 @@ impl crate::Inner { task: TaskRef<'_>, ) -> RegisterResult { let mut inner = self.lock(); - - // SAFETY: We are locked, so we can access the inner `link`. - let entry = unsafe { - let listener = match listener.as_mut().as_pin_mut() { - Some(listener) => listener, - None => return RegisterResult::NeverInserted, - }; - &*listener.link.get() + let entry_guard = match listener.as_mut().as_pin_mut() { + Some(listener) => listener.link.get(), + None => return RegisterResult::NeverInserted, }; + // SAFETY: We are locked, so we can access the inner `link`. + let entry = unsafe { entry_guard.deref() }; // Take out the state and check it. match entry.state.replace(State::NotifiedTaken) { @@ -175,12 +172,8 @@ impl Inner { mut listener: Pin<&mut Option>>, propagate: bool, ) -> Option> { - let entry = unsafe { - let listener = listener.as_mut().as_pin_mut()?; - - // Get the inner pointer. - &*listener.link.get() - }; + let entry_guard = listener.as_mut().as_pin_mut()?.link.get(); + let entry = unsafe { entry_guard.deref() }; let prev = entry.prev.get(); let next = entry.next.get(); @@ -216,7 +209,11 @@ impl Inner { .into_inner() }; - let mut state = entry.state.into_inner(); + // This State::Created is immediately dropped and exists as a workaround for the absence of + // loom::cell::Cell::into_inner. The intent is `let mut state = entry.state.into_inner();` + // + // refs: https://github.com/tokio-rs/loom/pull/341 + let mut state = entry.state.replace(State::Created); // Update the notified count. if state.is_notified() { diff --git a/tests/loom.rs b/tests/loom.rs new file mode 100644 index 0000000..6ef1d05 --- /dev/null +++ b/tests/loom.rs @@ -0,0 +1,212 @@ +#![cfg(loom)] +use std::future::Future; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::Context; +use std::usize; + +use event_listener::{Event, EventListener}; +use waker_fn::waker_fn; + +#[cfg(target_family = "wasm")] +use wasm_bindgen_test::wasm_bindgen_test as test; + +fn is_notified(listener: &mut EventListener) -> bool { + let waker = waker_fn(|| ()); + Pin::new(listener) + .poll(&mut Context::from_waker(&waker)) + .is_ready() +} + +#[test] +fn notify() { + loom::model(|| { + let event = Event::new(); + + let mut l1 = event.listen(); + let mut l2 = event.listen(); + let mut l3 = event.listen(); + + assert!(!is_notified(&mut l1)); + assert!(!is_notified(&mut l2)); + assert!(!is_notified(&mut l3)); + + assert_eq!(event.notify(2), 2); + assert_eq!(event.notify(1), 0); + + assert!(is_notified(&mut l1)); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l3)); + }); +} + +#[test] +fn notify_additional() { + loom::model(|| { + let event = Event::new(); + + let mut l1 = event.listen(); + let mut l2 = event.listen(); + let mut l3 = event.listen(); + + assert_eq!(event.notify_additional(1), 1); + assert_eq!(event.notify(1), 0); + assert_eq!(event.notify_additional(1), 1); + + assert!(is_notified(&mut l1)); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l3)); + }) +} + +#[test] +fn notify_one() { + loom::model(|| { + let event = Event::new(); + + let mut l1 = event.listen(); + let mut l2 = event.listen(); + + assert!(!is_notified(&mut l1)); + assert!(!is_notified(&mut l2)); + + assert_eq!(event.notify(1), 1); + assert!(is_notified(&mut l1)); + assert!(!is_notified(&mut l2)); + + assert_eq!(event.notify(1), 1); + assert!(is_notified(&mut l2)); + }); +} + +#[test] +fn notify_all() { + loom::model(|| { + let event = Event::new(); + + let mut l1 = event.listen(); + let mut l2 = event.listen(); + + assert!(!is_notified(&mut l1)); + assert!(!is_notified(&mut l2)); + + assert_eq!(event.notify(usize::MAX), 2); + assert!(is_notified(&mut l1)); + assert!(is_notified(&mut l2)); + }); +} + +#[test] +fn drop_notified() { + loom::model(|| { + let event = Event::new(); + + let l1 = event.listen(); + let mut l2 = event.listen(); + let mut l3 = event.listen(); + + assert_eq!(event.notify(1), 1); + drop(l1); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l3)); + }); +} + +#[test] +fn drop_notified2() { + loom::model(|| { + let event = Event::new(); + + let l1 = event.listen(); + let mut l2 = event.listen(); + let mut l3 = event.listen(); + + assert_eq!(event.notify(2), 2); + drop(l1); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l3)); + }); +} + +#[test] +fn drop_notified_additional() { + loom::model(|| { + let event = Event::new(); + + let l1 = event.listen(); + let mut l2 = event.listen(); + let mut l3 = event.listen(); + let mut l4 = event.listen(); + + assert_eq!(event.notify_additional(1), 1); + assert_eq!(event.notify(2), 1); + drop(l1); + assert!(is_notified(&mut l2)); + assert!(is_notified(&mut l3)); + assert!(!is_notified(&mut l4)); + }); +} + +#[test] +fn drop_non_notified() { + loom::model(|| { + let event = Event::new(); + + let mut l1 = event.listen(); + let mut l2 = event.listen(); + let l3 = event.listen(); + + assert_eq!(event.notify(1), 1); + drop(l3); + assert!(is_notified(&mut l1)); + assert!(!is_notified(&mut l2)); + }) +} + +#[test] +fn notify_all_fair() { + loom::model(|| { + let event = Event::new(); + let v = Arc::new(Mutex::new(vec![])); + + let mut l1 = event.listen(); + let mut l2 = event.listen(); + let mut l3 = event.listen(); + + let waker1 = { + let v = v.clone(); + waker_fn(move || v.lock().unwrap().push(1)) + }; + let waker2 = { + let v = v.clone(); + waker_fn(move || v.lock().unwrap().push(2)) + }; + let waker3 = { + let v = v.clone(); + waker_fn(move || v.lock().unwrap().push(3)) + }; + + assert!(Pin::new(&mut l1) + .poll(&mut Context::from_waker(&waker1)) + .is_pending()); + assert!(Pin::new(&mut l2) + .poll(&mut Context::from_waker(&waker2)) + .is_pending()); + assert!(Pin::new(&mut l3) + .poll(&mut Context::from_waker(&waker3)) + .is_pending()); + + assert_eq!(event.notify(usize::MAX), 3); + assert_eq!(&*v.lock().unwrap(), &[1, 2, 3]); + + assert!(Pin::new(&mut l1) + .poll(&mut Context::from_waker(&waker1)) + .is_ready()); + assert!(Pin::new(&mut l2) + .poll(&mut Context::from_waker(&waker2)) + .is_ready()); + assert!(Pin::new(&mut l3) + .poll(&mut Context::from_waker(&waker3)) + .is_ready()); + }) +}