Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relax SeqCst atomic ordering #625

Merged
merged 3 commits into from
Apr 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions inbox/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ fn try_send<T>(channel: &Channel<T>, value: T) -> Result<(), SendError<T>> {
/// enough for most practical use cases.
impl<T> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
// For the reasoning behind this relaxed ordering see `Arc::clone`.
// SAFETY: for the reasoning behind this relaxed ordering see `Arc::clone`.
let old_ref_count = self.channel().ref_count.fetch_add(1, Ordering::Relaxed);
debug_assert!(old_ref_count & SENDER_ACCESS != 0);
Sender {
Expand Down Expand Up @@ -452,7 +452,7 @@ impl<T> Drop for Sender<T> {
return;
}

// For the reasoning behind this ordering see `Arc::drop`.
// SAFETY: for the reasoning behind this ordering see `Arc::drop`.
fence!(self.channel().ref_count, Ordering::Acquire);

// Drop the memory.
Expand Down
6 changes: 3 additions & 3 deletions inbox/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ struct AwokenCount {

impl PartialEq<usize> for AwokenCount {
fn eq(&self, other: &usize) -> bool {
self.inner.count.load(Ordering::SeqCst) == *other
self.inner.count.load(Ordering::Acquire) == *other
}
}

Expand All @@ -33,11 +33,11 @@ struct WakerInner {

impl Wake for WakerInner {
fn wake(self: Arc<Self>) {
let _ = self.count.fetch_add(1, Ordering::SeqCst);
let _ = self.count.fetch_add(1, Ordering::AcqRel);
}

fn wake_by_ref(self: &Arc<Self>) {
let _ = self.count.fetch_add(1, Ordering::SeqCst);
let _ = self.count.fetch_add(1, Ordering::AcqRel);
}
}

Expand Down
10 changes: 5 additions & 5 deletions inbox/src/waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl WakerRegistration {
let stored_waker = self.waker.read().unwrap();
if let Some(stored_waker) = &*stored_waker {
if stored_waker.will_wake(waker) {
self.needs_wakeup.store(true, Ordering::SeqCst);
self.needs_wakeup.store(true, Ordering::Release);
return false;
}
}
Expand All @@ -36,26 +36,26 @@ impl WakerRegistration {
// again.
if let Some(stored_waker) = &*stored_waker {
if stored_waker.will_wake(waker) {
self.needs_wakeup.store(true, Ordering::SeqCst);
self.needs_wakeup.store(true, Ordering::Release);
return false;
}
}
*stored_waker = Some(waker.clone());
drop(stored_waker);

self.needs_wakeup.store(true, Ordering::SeqCst);
self.needs_wakeup.store(true, Ordering::Release);
true
}

/// Wake the waker registered, if required.
pub(crate) fn wake(&self) {
if !self.needs_wakeup.load(Ordering::SeqCst) {
if !self.needs_wakeup.load(Ordering::Acquire) {
// Receiver doesn't need a wake-up.
return;
}

// Mark that we've woken and after actually do the waking.
if self.needs_wakeup.swap(false, Ordering::SeqCst) {
if self.needs_wakeup.swap(false, Ordering::AcqRel) {
if let Some(waker) = &*self.waker.read().unwrap() {
waker.wake_by_ref();
}
Expand Down
6 changes: 3 additions & 3 deletions inbox/tests/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct AwokenCount {
impl AwokenCount {
/// Get the current count.
pub fn get(&self) -> usize {
self.inner.count.load(Ordering::SeqCst)
self.inner.count.load(Ordering::Acquire)
}
}

Expand All @@ -39,11 +39,11 @@ struct WakerInner {

impl Wake for WakerInner {
fn wake(self: Arc<Self>) {
let _ = self.count.fetch_add(1, Ordering::SeqCst);
let _ = self.count.fetch_add(1, Ordering::AcqRel);
}

fn wake_by_ref(self: &Arc<Self>) {
let _ = self.count.fetch_add(1, Ordering::SeqCst);
let _ = self.count.fetch_add(1, Ordering::AcqRel);
}
}

Expand Down
2 changes: 1 addition & 1 deletion rt/src/scheduler/inactive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ mod tests {

impl Drop for DropTest {
fn drop(&mut self) {
let _ = self.0.fetch_add(1, Ordering::SeqCst);
let _ = self.0.fetch_add(1, Ordering::AcqRel);
}
}

Expand Down
2 changes: 1 addition & 1 deletion rt/src/scheduler/shared/inactive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ mod tests {

impl Drop for DropTest {
fn drop(&mut self) {
let _ = self.0.fetch_add(1, Ordering::SeqCst);
let _ = self.0.fetch_add(1, Ordering::AcqRel);
}
}

Expand Down
2 changes: 1 addition & 1 deletion rt/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ where
M: Send + 'static,
{
static SYNC_WORKER_TEST_ID: AtomicUsize = AtomicUsize::new(10_000);
let id = SYNC_WORKER_TEST_ID.fetch_add(1, Ordering::SeqCst);
let id = SYNC_WORKER_TEST_ID.fetch_add(1, Ordering::AcqRel);

let shared = shared_internals();
sync_worker::start(id, supervisor, actor, arg, options, shared, None).map(
Expand Down
4 changes: 2 additions & 2 deletions rt/src/timers/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl<const N: usize> WakerBuilder<N> {
}

fn is_awoken(&self, n: usize) -> bool {
self.awoken[n].load(Ordering::SeqCst)
self.awoken[n].load(Ordering::Acquire)
}
}

Expand All @@ -45,7 +45,7 @@ struct TaskWaker<const N: usize> {

impl<const N: usize> Wake for TaskWaker<N> {
fn wake(self: Arc<Self>) {
self.awoken[self.n].store(true, Ordering::SeqCst)
self.awoken[self.n].store(true, Ordering::Release)
}
}

Expand Down
2 changes: 1 addition & 1 deletion rt/src/wakers/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl Wakers {
/// Static used to determine unique indices into `RUNTIMES`.
static IDS: AtomicU8 = AtomicU8::new(0);

let id = IDS.fetch_add(1, Ordering::SeqCst);
let id = IDS.fetch_add(1, Ordering::AcqRel);
assert!(
(id as usize) < MAX_RUNTIMES,
"Created too many Heph `Runtime`s, maximum of {MAX_RUNTIMES}",
Expand Down
46 changes: 23 additions & 23 deletions rt/tests/functional/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,11 @@ fn running_actors() {
}

fn get(value: &AtomicUsize) -> usize {
value.load(Ordering::SeqCst)
value.load(Ordering::Acquire)
}

fn incr(value: &AtomicUsize) {
let _ = value.fetch_add(1, Ordering::SeqCst);
let _ = value.fetch_add(1, Ordering::AcqRel);
}

impl<NA> Supervisor<NA> for RunningSupervisor<NA::Argument>
Expand Down Expand Up @@ -513,12 +513,12 @@ fn external_thread_wakes_sync_actor() {
}

async fn panic_actor<RT>(_: actor::Context<!, RT>, mark: &'static AtomicBool) {
mark.store(true, Ordering::SeqCst);
mark.store(true, Ordering::Release);
panic!("on purpose panic");
}

async fn ok_actor<RT>(_: actor::Context<!, RT>, mark: &'static AtomicBool) {
mark.store(true, Ordering::SeqCst);
mark.store(true, Ordering::Release);
}

fn actor_drop_panic<RT>(_: actor::Context<!, RT>, mark: &'static AtomicBool) -> PanicOnDropFuture {
Expand All @@ -531,7 +531,7 @@ impl Future for PanicOnDropFuture {
type Output = ();

fn poll(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<Self::Output> {
self.0.store(true, Ordering::SeqCst);
self.0.store(true, Ordering::Release);
Poll::Ready(())
}
}
Expand All @@ -543,12 +543,12 @@ impl Drop for PanicOnDropFuture {
}

async fn panic_future(mark: &'static AtomicBool) {
mark.store(true, Ordering::SeqCst);
mark.store(true, Ordering::Release);
panic!("on purpose panic");
}

async fn ok_future(mark: &'static AtomicBool) {
mark.store(true, Ordering::SeqCst);
mark.store(true, Ordering::Release);
}

#[test]
Expand All @@ -571,8 +571,8 @@ fn catches_actor_panics() {
);
runtime.start().unwrap();

assert!(PANIC_RAN.load(Ordering::SeqCst));
assert!(OK_RAN.load(Ordering::SeqCst));
assert!(PANIC_RAN.load(Ordering::Acquire));
assert!(OK_RAN.load(Ordering::Acquire));
}

#[test]
Expand Down Expand Up @@ -600,8 +600,8 @@ fn catches_local_actor_panics() {
.unwrap();
runtime.start().unwrap();

assert!(PANIC_RAN.load(Ordering::SeqCst));
assert!(OK_RAN.load(Ordering::SeqCst));
assert!(PANIC_RAN.load(Ordering::Acquire));
assert!(OK_RAN.load(Ordering::Acquire));
}

#[test]
Expand All @@ -624,8 +624,8 @@ fn catches_actor_panics_on_drop() {
);
runtime.start().unwrap();

assert!(PANIC_RAN.load(Ordering::SeqCst));
assert!(OK_RAN.load(Ordering::SeqCst));
assert!(PANIC_RAN.load(Ordering::Acquire));
assert!(OK_RAN.load(Ordering::Acquire));
}

#[test]
Expand Down Expand Up @@ -653,8 +653,8 @@ fn catches_local_actor_panics_on_drop() {
.unwrap();
runtime.start().unwrap();

assert!(PANIC_RAN.load(Ordering::SeqCst));
assert!(OK_RAN.load(Ordering::SeqCst));
assert!(PANIC_RAN.load(Ordering::Acquire));
assert!(OK_RAN.load(Ordering::Acquire));
}

#[test]
Expand All @@ -673,8 +673,8 @@ fn catches_future_panics() {
);
runtime.start().unwrap();

assert!(PANIC_RAN.load(Ordering::SeqCst));
assert!(OK_RAN.load(Ordering::SeqCst));
assert!(PANIC_RAN.load(Ordering::Acquire));
assert!(OK_RAN.load(Ordering::Acquire));
}

#[test]
Expand All @@ -698,8 +698,8 @@ fn catches_local_future_panics() {
.unwrap();
runtime.start().unwrap();

assert!(PANIC_RAN.load(Ordering::SeqCst));
assert!(OK_RAN.load(Ordering::SeqCst));
assert!(PANIC_RAN.load(Ordering::Acquire));
assert!(OK_RAN.load(Ordering::Acquire));
}

#[test]
Expand All @@ -718,8 +718,8 @@ fn catches_future_panics_on_drop() {
);
runtime.start().unwrap();

assert!(PANIC_RAN.load(Ordering::SeqCst));
assert!(OK_RAN.load(Ordering::SeqCst));
assert!(PANIC_RAN.load(Ordering::Acquire));
assert!(OK_RAN.load(Ordering::Acquire));
}

#[test]
Expand All @@ -743,6 +743,6 @@ fn catches_local_future_panics_on_drop() {
.unwrap();
runtime.start().unwrap();

assert!(PANIC_RAN.load(Ordering::SeqCst));
assert!(OK_RAN.load(Ordering::SeqCst));
assert!(PANIC_RAN.load(Ordering::Acquire));
assert!(OK_RAN.load(Ordering::Acquire));
}
12 changes: 6 additions & 6 deletions rt/tests/process_signals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,18 @@ fn with_signal_handles() {
runtime.start().unwrap();

// Make sure that all the actor received the signal once.
assert_eq!(thread_local.load(Ordering::SeqCst), 1);
assert_eq!(thread_safe1.load(Ordering::SeqCst), 1);
assert_eq!(thread_safe2.load(Ordering::SeqCst), 1);
assert_eq!(sync.load(Ordering::SeqCst), 1);
assert_eq!(thread_local.load(Ordering::Acquire), 1);
assert_eq!(thread_safe1.load(Ordering::Acquire), 1);
assert_eq!(thread_safe2.load(Ordering::Acquire), 1);
assert_eq!(sync.load(Ordering::Acquire), 1);
}

async fn actor<RT>(mut ctx: actor::Context<Signal, RT>, got_signal: Arc<AtomicUsize>) {
let _msg = ctx.receive_next().await.unwrap();
got_signal.fetch_add(1, Ordering::SeqCst);
got_signal.fetch_add(1, Ordering::AcqRel);
}

fn sync_actor<RT>(mut ctx: sync::Context<Signal, RT>, got_signal: Arc<AtomicUsize>) {
let _msg = ctx.receive_next().unwrap();
got_signal.fetch_add(1, Ordering::SeqCst);
got_signal.fetch_add(1, Ordering::AcqRel);
}
16 changes: 8 additions & 8 deletions src/actor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ fn actor_future() {

// Send a message and the actor should return Ok.
actor_ref.try_send(()).unwrap();
assert_eq!(count.load(Ordering::SeqCst), 1);
assert_eq!(count.load(Ordering::Acquire), 1);
let res = actor.as_mut().poll(&mut ctx);
assert_eq!(res, Poll::Ready(()));
}
Expand Down Expand Up @@ -150,7 +150,7 @@ fn erroneous_actor_process() {
let res = actor.as_mut().poll(&mut ctx);
assert_eq!(res, Poll::Ready(()));
assert_eq!(supervisor_called_count, 1);
assert_eq!(count.load(Ordering::SeqCst), 0);
assert_eq!(count.load(Ordering::Acquire), 0);
}

#[test]
Expand All @@ -170,7 +170,7 @@ fn restarting_erroneous_actor_process() {
assert_eq!(res, Poll::Pending);
assert_eq!(supervisor_called_count.get(), 1);
// The future to wake itself after a restart to ensure it gets run again.
assert_eq!(count.load(Ordering::SeqCst), 1);
assert_eq!(count.load(Ordering::Acquire), 1);

// After a restart the actor should continue without issues.
let res = actor.as_mut().poll(&mut ctx);
Expand All @@ -179,7 +179,7 @@ fn restarting_erroneous_actor_process() {

// Finally after sending it a message it should complete.
actor_ref.try_send(()).unwrap();
assert_eq!(count.load(Ordering::SeqCst), 2);
assert_eq!(count.load(Ordering::Acquire), 2);
let res = actor.as_mut().poll(&mut ctx);
assert_eq!(res, Poll::Ready(()));
assert_eq!(supervisor_called_count.get(), 1);
Expand Down Expand Up @@ -237,7 +237,7 @@ fn panicking_actor_process() {
let res = actor.as_mut().poll(&mut ctx);
assert_eq!(res, Poll::Ready(()));
assert_eq!(supervisor_called_count, 1);
assert_eq!(count.load(Ordering::SeqCst), 0);
assert_eq!(count.load(Ordering::Acquire), 0);
}

#[test]
Expand Down Expand Up @@ -284,7 +284,7 @@ fn restarting_panicking_actor_process() {
assert_eq!(res, Poll::Pending);
assert_eq!(supervisor_called_count.get(), 1);
// The future to wake itself after a restart to ensure it gets run again.
assert_eq!(count.load(Ordering::SeqCst), 1);
assert_eq!(count.load(Ordering::Acquire), 1);

// After a restart the actor should continue without issues.
let res = actor.as_mut().poll(&mut ctx);
Expand All @@ -293,7 +293,7 @@ fn restarting_panicking_actor_process() {

// Finally after sending it a message it should complete.
actor_ref.try_send(()).unwrap();
assert_eq!(count.load(Ordering::SeqCst), 2);
assert_eq!(count.load(Ordering::Acquire), 2);
let res = actor.as_mut().poll(&mut ctx);
assert_eq!(res, Poll::Ready(()));
assert_eq!(supervisor_called_count.get(), 1);
Expand All @@ -306,7 +306,7 @@ pub(crate) fn task_wake_counter() -> (task::Waker, Arc<AtomicUsize>) {

impl task::Wake for WakeCounter {
fn wake(self: Arc<Self>) {
_ = self.0.fetch_add(1, Ordering::SeqCst);
_ = self.0.fetch_add(1, Ordering::AcqRel);
}
}

Expand Down
Loading
Loading