Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

std: implement the once_wait feature #127567

Merged
merged 2 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions library/std/src/sync/once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,47 @@ impl Once {
self.inner.is_completed()
}

/// Blocks the current thread until initialization has completed.
///
/// # Example
///
/// ```rust
/// #![feature(once_wait)]
///
/// use std::sync::Once;
/// use std::thread;
///
/// static READY: Once = Once::new();
///
/// let thread = thread::spawn(|| {
/// READY.wait();
/// println!("everything is ready");
/// });
///
/// READY.call_once(|| println!("performing setup"));
/// ```
///
/// # Panics
///
/// If this [`Once`] has been poisoned because an initialization closure has
/// panicked, this method will also panic. Use [`wait_force`](Self::wait_force)
/// if this behaviour is not desired.
#[unstable(feature = "once_wait", issue = "127527")]
pub fn wait(&self) {
if !self.inner.is_completed() {
self.inner.wait(false);
}
}

/// Blocks the current thread until initialization has completed, ignoring
/// poisoning.
#[unstable(feature = "once_wait", issue = "127527")]
pub fn wait_force(&self) {
if !self.inner.is_completed() {
self.inner.wait(true);
}
}

/// Returns the current state of the `Once` instance.
///
/// Since this takes a mutable reference, no initialization can currently
Expand Down
47 changes: 47 additions & 0 deletions library/std/src/sync/once/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use super::Once;
use crate::sync::atomic::AtomicBool;
use crate::sync::atomic::Ordering::Relaxed;
use crate::sync::mpsc::channel;
use crate::time::Duration;
use crate::{panic, thread};

#[test]
Expand Down Expand Up @@ -113,3 +116,47 @@ fn wait_for_force_to_finish() {
assert!(t1.join().is_ok());
assert!(t2.join().is_ok());
}

#[test]
fn wait() {
for _ in 0..50 {
let val = AtomicBool::new(false);
let once = Once::new();

thread::scope(|s| {
for _ in 0..4 {
s.spawn(|| {
once.wait();
assert!(val.load(Relaxed));
});
}

once.call_once(|| val.store(true, Relaxed));
});
}
}

#[test]
fn wait_on_poisoned() {
let once = Once::new();

panic::catch_unwind(|| once.call_once(|| panic!())).unwrap_err();
panic::catch_unwind(|| once.wait()).unwrap_err();
}

#[test]
fn wait_force_on_poisoned() {
let once = Once::new();

thread::scope(|s| {
panic::catch_unwind(|| once.call_once(|| panic!())).unwrap_err();

s.spawn(|| {
thread::sleep(Duration::from_millis(100));

once.call_once_force(|_| {});
});

once.wait_force();
})
}
28 changes: 28 additions & 0 deletions library/std/src/sync/once_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,34 @@ impl<T> OnceLock<T> {
}
}

/// Blocks the current thread until the cell is initialized.
///
/// # Example
///
/// Waiting for a computation on another thread to finish:
/// ```rust
/// #![feature(once_wait)]
///
/// use std::thread;
/// use std::sync::OnceLock;
///
/// let value = OnceLock::new();
///
/// thread::scope(|s| {
/// s.spawn(|| value.set(1 + 1));
///
/// let result = value.wait();
/// assert_eq!(result, &2);
/// })
/// ```
#[inline]
#[unstable(feature = "once_wait", issue = "127527")]
pub fn wait(&self) -> &T {
self.once.wait_force();

unsafe { self.get_unchecked() }
}

/// Sets the contents of this cell to `value`.
///
/// May block if another thread is currently attempting to initialize the cell. The cell is
Expand Down
124 changes: 88 additions & 36 deletions library/std/src/sys/sync/once/futex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::sync::once::ExclusiveState;
use crate::sys::futex::{futex_wait, futex_wake_all};

// On some platforms, the OS is very nice and handles the waiter queue for us.
// This means we only need one atomic value with 5 states:
// This means we only need one atomic value with 4 states:

/// No initialization has run yet, and no thread is currently using the Once.
const INCOMPLETE: u32 = 0;
Expand All @@ -17,16 +17,20 @@ const POISONED: u32 = 1;
/// Some thread is currently attempting to run initialization. It may succeed,
/// so all future threads need to wait for it to finish.
const RUNNING: u32 = 2;
/// Some thread is currently attempting to run initialization and there are threads
/// waiting for it to finish.
const QUEUED: u32 = 3;
/// Initialization has completed and all future calls should finish immediately.
const COMPLETE: u32 = 4;
const COMPLETE: u32 = 3;

// Threads wait by setting the state to QUEUED and calling `futex_wait` on the state
// An additional bit indicates whether there are waiting threads:

/// May only be set if the state is not COMPLETE.
const QUEUED: u32 = 4;

// Threads wait by setting the QUEUED bit and calling `futex_wait` on the state
// variable. When the running thread finishes, it will wake all waiting threads using
// `futex_wake_all`.

const STATE_MASK: u32 = 0b11;

pub struct OnceState {
poisoned: bool,
set_state_to: Cell<u32>,
Expand All @@ -45,7 +49,7 @@ impl OnceState {
}

struct CompletionGuard<'a> {
state: &'a AtomicU32,
state_and_queued: &'a AtomicU32,
set_state_on_drop_to: u32,
}

Expand All @@ -54,64 +58,106 @@ impl<'a> Drop for CompletionGuard<'a> {
// Use release ordering to propagate changes to all threads checking
// up on the Once. `futex_wake_all` does its own synchronization, hence
// we do not need `AcqRel`.
if self.state.swap(self.set_state_on_drop_to, Release) == QUEUED {
futex_wake_all(self.state);
if self.state_and_queued.swap(self.set_state_on_drop_to, Release) & QUEUED != 0 {
futex_wake_all(self.state_and_queued);
}
}
}

pub struct Once {
state: AtomicU32,
state_and_queued: AtomicU32,
}

impl Once {
#[inline]
pub const fn new() -> Once {
Once { state: AtomicU32::new(INCOMPLETE) }
Once { state_and_queued: AtomicU32::new(INCOMPLETE) }
}

#[inline]
pub fn is_completed(&self) -> bool {
// Use acquire ordering to make all initialization changes visible to the
// current thread.
self.state.load(Acquire) == COMPLETE
self.state_and_queued.load(Acquire) == COMPLETE
}

#[inline]
pub(crate) fn state(&mut self) -> ExclusiveState {
match *self.state.get_mut() {
match *self.state_and_queued.get_mut() {
INCOMPLETE => ExclusiveState::Incomplete,
POISONED => ExclusiveState::Poisoned,
COMPLETE => ExclusiveState::Complete,
_ => unreachable!("invalid Once state"),
}
}

// This uses FnMut to match the API of the generic implementation. As this
// implementation is quite light-weight, it is generic over the closure and
// so avoids the cost of dynamic dispatch.
#[cold]
#[track_caller]
pub fn call(&self, ignore_poisoning: bool, f: &mut impl FnMut(&public::OnceState)) {
let mut state = self.state.load(Acquire);
pub fn wait(&self, ignore_poisoning: bool) {
let mut state_and_queued = self.state_and_queued.load(Acquire);
loop {
let state = state_and_queued & STATE_MASK;
let queued = state_and_queued & QUEUED != 0;
match state {
COMPLETE => return,
POISONED if !ignore_poisoning => {
// Panic to propagate the poison.
panic!("Once instance has previously been poisoned");
}
_ => {
// Set the QUEUED bit if it has not already been set.
if !queued {
state_and_queued += QUEUED;
if let Err(new) = self.state_and_queued.compare_exchange_weak(
state,
state_and_queued,
Relaxed,
Acquire,
) {
state_and_queued = new;
continue;
}
}

futex_wait(&self.state_and_queued, state_and_queued, None);
state_and_queued = self.state_and_queued.load(Acquire);
}
}
}
}

#[cold]
#[track_caller]
pub fn call(&self, ignore_poisoning: bool, f: &mut dyn FnMut(&public::OnceState)) {
let mut state_and_queued = self.state_and_queued.load(Acquire);
loop {
let state = state_and_queued & STATE_MASK;
let queued = state_and_queued & QUEUED != 0;
match state {
COMPLETE => return,
POISONED if !ignore_poisoning => {
// Panic to propagate the poison.
panic!("Once instance has previously been poisoned");
}
INCOMPLETE | POISONED => {
// Try to register the current thread as the one running.
if let Err(new) =
self.state.compare_exchange_weak(state, RUNNING, Acquire, Acquire)
{
state = new;
let next = RUNNING + if queued { QUEUED } else { 0 };
if let Err(new) = self.state_and_queued.compare_exchange_weak(
state_and_queued,
next,
Acquire,
Acquire,
) {
state_and_queued = new;
continue;
}

// `waiter_queue` will manage other waiting threads, and
// wake them up on drop.
let mut waiter_queue =
CompletionGuard { state: &self.state, set_state_on_drop_to: POISONED };
let mut waiter_queue = CompletionGuard {
state_and_queued: &self.state_and_queued,
set_state_on_drop_to: POISONED,
};
// Run the function, letting it know if we're poisoned or not.
let f_state = public::OnceState {
inner: OnceState {
Expand All @@ -123,21 +169,27 @@ impl Once {
waiter_queue.set_state_on_drop_to = f_state.inner.set_state_to.get();
return;
}
RUNNING | QUEUED => {
// Set the state to QUEUED if it is not already.
if state == RUNNING
&& let Err(new) =
self.state.compare_exchange_weak(RUNNING, QUEUED, Relaxed, Acquire)
{
state = new;
continue;
_ => {
// All other values must be RUNNING.
assert!(state == RUNNING);

// Set the QUEUED bit if it is not already set.
if !queued {
state_and_queued += QUEUED;
if let Err(new) = self.state_and_queued.compare_exchange_weak(
state,
state_and_queued,
Relaxed,
Acquire,
) {
state_and_queued = new;
continue;
}
}

futex_wait(&self.state, QUEUED, None);
state = self.state.load(Acquire);
futex_wait(&self.state_and_queued, state_and_queued, None);
state_and_queued = self.state_and_queued.load(Acquire);
}
COMPLETE => return,
_ => unreachable!("state is never set to invalid values"),
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions library/std/src/sys/sync/once/no_threads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ impl Once {
}
}

#[cold]
#[track_caller]
pub fn wait(&self, _ignore_poisoning: bool) {
panic!("not implementable on this target");
}

#[cold]
#[track_caller]
pub fn call(&self, ignore_poisoning: bool, f: &mut impl FnMut(&public::OnceState)) {
Expand Down
Loading
Loading