Skip to content

Commit

Permalink
Abstract the state type for futexes
Browse files Browse the repository at this point in the history
In the same way that we expose SmallAtomic and SmallPrimitive to allow
Windows to use a value other than an AtomicU32 for its futex state, this
patch switches the primary futex state type from AtomicU32 to
futex::Atomic.  The futex::Atomic type should be usable as an atomic
value with underlying primitive type equal to futex::Primitive.

This allows supporting the futex API on systems where the underlying
kernel futex implementation requires more state than simply an
AtomicU32.

All in-tree futex implementations simply define {Atomic,Primitive}
directly as {AtomicU32,u32}.
  • Loading branch information
paulmenage committed Oct 17, 2024
1 parent 3a85d3f commit cf7ff15
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 66 deletions.
9 changes: 7 additions & 2 deletions library/std/src/sys/pal/hermit/futex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@ use crate::ptr::null;
use crate::sync::atomic::AtomicU32;
use crate::time::Duration;

/// An atomic for use as a futex that is at least 32-bits but may be larger
pub type Futex = AtomicU32;
/// Must be the underlying type of Futex
pub type Primitive = u32;

/// An atomic for use as a futex that is at least 8-bits but may be larger.
pub type SmallAtomic = AtomicU32;
/// Must be the underlying type of SmallAtomic
pub type SmallFutex = AtomicU32;
/// Must be the underlying type of SmallFutex
pub type SmallPrimitive = u32;

pub fn futex_wait(futex: &AtomicU32, expected: u32, timeout: Option<Duration>) -> bool {
Expand Down
9 changes: 7 additions & 2 deletions library/std/src/sys/pal/unix/futex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@
use crate::sync::atomic::AtomicU32;
use crate::time::Duration;

/// An atomic for use as a futex that is at least 32-bits but may be larger
pub type Futex = AtomicU32;
/// Must be the underlying type of Futex
pub type Primitive = u32;

/// An atomic for use as a futex that is at least 8-bits but may be larger.
pub type SmallAtomic = AtomicU32;
/// Must be the underlying type of SmallAtomic
pub type SmallFutex = AtomicU32;
/// Must be the underlying type of SmallFutex
pub type SmallPrimitive = u32;

/// Waits for a `futex_wake` operation to wake us.
Expand Down
9 changes: 7 additions & 2 deletions library/std/src/sys/pal/wasm/atomics/futex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@ use core::arch::wasm64 as wasm;
use crate::sync::atomic::AtomicU32;
use crate::time::Duration;

/// An atomic for use as a futex that is at least 32-bits but may be larger
pub type Futex = AtomicU32;
/// Must be the underlying type of Futex
pub type Primitive = u32;

/// An atomic for use as a futex that is at least 8-bits but may be larger.
pub type SmallAtomic = AtomicU32;
/// Must be the underlying type of SmallAtomic
pub type SmallFutex = AtomicU32;
/// Must be the underlying type of SmallFutex
pub type SmallPrimitive = u32;

/// Wait for a futex_wake operation to wake us.
Expand Down
35 changes: 20 additions & 15 deletions library/std/src/sys/pal/windows/futex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,27 @@ use core::{mem, ptr};
use super::api::{self, WinError};
use crate::sys::{c, dur2timeout};

/// An atomic for use as a futex that is at least 32-bits but may be larger
pub type Futex = AtomicU32;
/// Must be the underlying type of Futex
pub type Primitive = u32;

/// An atomic for use as a futex that is at least 8-bits but may be larger.
pub type SmallAtomic = AtomicU8;
/// Must be the underlying type of SmallAtomic
pub type SmallFutex = AtomicU8;
/// Must be the underlying type of SmallFutex
pub type SmallPrimitive = u8;

pub unsafe trait Futex {}
pub unsafe trait Futexable {}
pub unsafe trait Waitable {
type Atomic;
type Futex;
}
macro_rules! unsafe_waitable_int {
($(($int:ty, $atomic:ty)),*$(,)?) => {
$(
unsafe impl Waitable for $int {
type Atomic = $atomic;
type Futex = $atomic;
}
unsafe impl Futex for $atomic {}
unsafe impl Futexable for $atomic {}
)*
};
}
Expand All @@ -42,15 +47,15 @@ unsafe_waitable_int! {
(usize, AtomicUsize),
}
unsafe impl<T> Waitable for *const T {
type Atomic = AtomicPtr<T>;
type Futex = AtomicPtr<T>;
}
unsafe impl<T> Waitable for *mut T {
type Atomic = AtomicPtr<T>;
type Futex = AtomicPtr<T>;
}
unsafe impl<T> Futex for AtomicPtr<T> {}
unsafe impl<T> Futexable for AtomicPtr<T> {}

pub fn wait_on_address<W: Waitable>(
address: &W::Atomic,
address: &W::Futex,
compare: W,
timeout: Option<Duration>,
) -> bool {
Expand All @@ -63,30 +68,30 @@ pub fn wait_on_address<W: Waitable>(
}
}

pub fn wake_by_address_single<T: Futex>(address: &T) {
pub fn wake_by_address_single<T: Futexable>(address: &T) {
unsafe {
let addr = ptr::from_ref(address).cast::<c_void>();
c::WakeByAddressSingle(addr);
}
}

pub fn wake_by_address_all<T: Futex>(address: &T) {
pub fn wake_by_address_all<T: Futexable>(address: &T) {
unsafe {
let addr = ptr::from_ref(address).cast::<c_void>();
c::WakeByAddressAll(addr);
}
}

pub fn futex_wait<W: Waitable>(futex: &W::Atomic, expected: W, timeout: Option<Duration>) -> bool {
pub fn futex_wait<W: Waitable>(futex: &W::Futex, expected: W, timeout: Option<Duration>) -> bool {
// return false only on timeout
wait_on_address(futex, expected, timeout) || api::get_last_error() != WinError::TIMEOUT
}

pub fn futex_wake<T: Futex>(futex: &T) -> bool {
pub fn futex_wake<T: Futexable>(futex: &T) -> bool {
wake_by_address_single(futex);
false
}

pub fn futex_wake_all<T: Futex>(futex: &T) {
pub fn futex_wake_all<T: Futexable>(futex: &T) {
wake_by_address_all(futex)
}
7 changes: 3 additions & 4 deletions library/std/src/sys/sync/condvar/futex.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
use crate::sync::atomic::AtomicU32;
use crate::sync::atomic::Ordering::Relaxed;
use crate::sys::futex::{futex_wait, futex_wake, futex_wake_all};
use crate::sys::futex::{Futex, futex_wait, futex_wake, futex_wake_all};
use crate::sys::sync::Mutex;
use crate::time::Duration;

pub struct Condvar {
// The value of this atomic is simply incremented on every notification.
// This is used by `.wait()` to not miss any notifications after
// unlocking the mutex and before waiting for notifications.
futex: AtomicU32,
futex: Futex,
}

impl Condvar {
#[inline]
pub const fn new() -> Self {
Self { futex: AtomicU32::new(0) }
Self { futex: Futex::new(0) }
}

// All the memory orderings here are `Relaxed`,
Expand Down
6 changes: 3 additions & 3 deletions library/std/src/sys/sync/mutex/futex.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use crate::sys::futex::{self, futex_wait, futex_wake};

type Atomic = futex::SmallAtomic;
type Futex = futex::SmallFutex;
type State = futex::SmallPrimitive;

pub struct Mutex {
futex: Atomic,
futex: Futex,
}

const UNLOCKED: State = 0;
Expand All @@ -15,7 +15,7 @@ const CONTENDED: State = 2; // locked, and other threads waiting (contended)
impl Mutex {
#[inline]
pub const fn new() -> Self {
Self { futex: Atomic::new(UNLOCKED) }
Self { futex: Futex::new(UNLOCKED) }
}

#[inline]
Expand Down
25 changes: 12 additions & 13 deletions library/std/src/sys/sync/once/futex.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,38 @@
use crate::cell::Cell;
use crate::sync as public;
use crate::sync::atomic::AtomicU32;
use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use crate::sync::once::ExclusiveState;
use crate::sys::futex::{futex_wait, futex_wake_all};
use crate::sys::futex::{Futex, Primitive, futex_wait, futex_wake_all};

// On some platforms, the OS is very nice and handles the waiter queue for us.
// This means we only need one atomic value with 4 states:

/// No initialization has run yet, and no thread is currently using the Once.
const INCOMPLETE: u32 = 0;
const INCOMPLETE: Primitive = 0;
/// Some thread has previously attempted to initialize the Once, but it panicked,
/// so the Once is now poisoned. There are no other threads currently accessing
/// this Once.
const POISONED: u32 = 1;
const POISONED: Primitive = 1;
/// Some thread is currently attempting to run initialization. It may succeed,
/// so all future threads need to wait for it to finish.
const RUNNING: u32 = 2;
const RUNNING: Primitive = 2;
/// Initialization has completed and all future calls should finish immediately.
const COMPLETE: u32 = 3;
const COMPLETE: Primitive = 3;

// An additional bit indicates whether there are waiting threads:

/// May only be set if the state is not COMPLETE.
const QUEUED: u32 = 4;
const QUEUED: Primitive = 4;

// Threads wait by setting the QUEUED bit and calling `futex_wait` on the state
// variable. When the running thread finishes, it will wake all waiting threads using
// `futex_wake_all`.

const STATE_MASK: u32 = 0b11;
const STATE_MASK: Primitive = 0b11;

pub struct OnceState {
poisoned: bool,
set_state_to: Cell<u32>,
set_state_to: Cell<Primitive>,
}

impl OnceState {
Expand All @@ -49,8 +48,8 @@ impl OnceState {
}

struct CompletionGuard<'a> {
state_and_queued: &'a AtomicU32,
set_state_on_drop_to: u32,
state_and_queued: &'a Futex,
set_state_on_drop_to: Primitive,
}

impl<'a> Drop for CompletionGuard<'a> {
Expand All @@ -65,13 +64,13 @@ impl<'a> Drop for CompletionGuard<'a> {
}

pub struct Once {
state_and_queued: AtomicU32,
state_and_queued: Futex,
}

impl Once {
#[inline]
pub const fn new() -> Once {
Once { state_and_queued: AtomicU32::new(INCOMPLETE) }
Once { state_and_queued: Futex::new(INCOMPLETE) }
}

#[inline]
Expand Down
2 changes: 1 addition & 1 deletion library/std/src/sys/sync/once/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
// You'll find a few more details in the implementation, but that's the gist of
// it!
//
// Atomic orderings:
// Futex orderings:
// When running `Once` we deal with multiple atomics:
// `Once.state_and_queue` and an unknown number of `Waiter.signaled`.
// * `state_and_queue` is used (1) as a state flag, (2) for synchronizing the
Expand Down
41 changes: 20 additions & 21 deletions library/std/src/sys/sync/rwlock/futex.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::sync::atomic::AtomicU32;
use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use crate::sys::futex::{futex_wait, futex_wake, futex_wake_all};
use crate::sys::futex::{Futex, Primitive, futex_wait, futex_wake, futex_wake_all};

pub struct RwLock {
// The state consists of a 30-bit reader counter, a 'readers waiting' flag, and a 'writers waiting' flag.
Expand All @@ -10,41 +9,41 @@ pub struct RwLock {
// 0x3FFF_FFFF: Write locked
// Bit 30: Readers are waiting on this futex.
// Bit 31: Writers are waiting on the writer_notify futex.
state: AtomicU32,
state: Futex,
// The 'condition variable' to notify writers through.
// Incremented on every signal.
writer_notify: AtomicU32,
writer_notify: Futex,
}

const READ_LOCKED: u32 = 1;
const MASK: u32 = (1 << 30) - 1;
const WRITE_LOCKED: u32 = MASK;
const MAX_READERS: u32 = MASK - 1;
const READERS_WAITING: u32 = 1 << 30;
const WRITERS_WAITING: u32 = 1 << 31;
const READ_LOCKED: Primitive = 1;
const MASK: Primitive = (1 << 30) - 1;
const WRITE_LOCKED: Primitive = MASK;
const MAX_READERS: Primitive = MASK - 1;
const READERS_WAITING: Primitive = 1 << 30;
const WRITERS_WAITING: Primitive = 1 << 31;

#[inline]
fn is_unlocked(state: u32) -> bool {
fn is_unlocked(state: Primitive) -> bool {
state & MASK == 0
}

#[inline]
fn is_write_locked(state: u32) -> bool {
fn is_write_locked(state: Primitive) -> bool {
state & MASK == WRITE_LOCKED
}

#[inline]
fn has_readers_waiting(state: u32) -> bool {
fn has_readers_waiting(state: Primitive) -> bool {
state & READERS_WAITING != 0
}

#[inline]
fn has_writers_waiting(state: u32) -> bool {
fn has_writers_waiting(state: Primitive) -> bool {
state & WRITERS_WAITING != 0
}

#[inline]
fn is_read_lockable(state: u32) -> bool {
fn is_read_lockable(state: Primitive) -> bool {
// This also returns false if the counter could overflow if we tried to read lock it.
//
// We don't allow read-locking if there's readers waiting, even if the lock is unlocked
Expand All @@ -55,14 +54,14 @@ fn is_read_lockable(state: u32) -> bool {
}

#[inline]
fn has_reached_max_readers(state: u32) -> bool {
fn has_reached_max_readers(state: Primitive) -> bool {
state & MASK == MAX_READERS
}

impl RwLock {
#[inline]
pub const fn new() -> Self {
Self { state: AtomicU32::new(0), writer_notify: AtomicU32::new(0) }
Self { state: Futex::new(0), writer_notify: Futex::new(0) }
}

#[inline]
Expand Down Expand Up @@ -225,7 +224,7 @@ impl RwLock {
/// If both are waiting, this will wake up only one writer, but will fall
/// back to waking up readers if there was no writer to wake up.
#[cold]
fn wake_writer_or_readers(&self, mut state: u32) {
fn wake_writer_or_readers(&self, mut state: Primitive) {
assert!(is_unlocked(state));

// The readers waiting bit might be turned on at any point now,
Expand Down Expand Up @@ -290,7 +289,7 @@ impl RwLock {

/// Spin for a while, but stop directly at the given condition.
#[inline]
fn spin_until(&self, f: impl Fn(u32) -> bool) -> u32 {
fn spin_until(&self, f: impl Fn(Primitive) -> bool) -> Primitive {
let mut spin = 100; // Chosen by fair dice roll.
loop {
let state = self.state.load(Relaxed);
Expand All @@ -303,13 +302,13 @@ impl RwLock {
}

#[inline]
fn spin_write(&self) -> u32 {
fn spin_write(&self) -> Primitive {
// Stop spinning when it's unlocked or when there's waiting writers, to keep things somewhat fair.
self.spin_until(|state| is_unlocked(state) || has_writers_waiting(state))
}

#[inline]
fn spin_read(&self) -> u32 {
fn spin_read(&self) -> Primitive {
// Stop spinning when it's unlocked or read locked, or when there's waiting threads.
self.spin_until(|state| {
!is_write_locked(state) || has_readers_waiting(state) || has_writers_waiting(state)
Expand Down
Loading

0 comments on commit cf7ff15

Please sign in to comment.