Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement reentrance detection for std::sync::Once #72311

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 158 additions & 65 deletions library/std/src/sync/once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
// out when the mutex needs to be deallocated because it's not after the closure
// finishes, but after the first successful closure finishes.
//
// All in all, this is instead implemented with atomics and lock-free
// operations! Whee! Each `Once` has one word of atomic state, and this state is
// All in all, this is instead implemented with atomic operations and
// spin-locks! Whee! Each `Once` has one word of atomic state, and this state is
// CAS'd on to determine what to do. There are four possible state of a `Once`:
//
// * Incomplete - no initialization has run yet, and no thread is currently
Expand All @@ -43,11 +43,16 @@
// immediately.
//
// With 4 states we need 2 bits to encode this, and we use the remaining bits
// in the word we have allocated as a queue of threads waiting for the thread
// responsible for entering the RUNNING state. This queue is just a linked list
// of Waiter nodes which is monotonically increasing in size. Each node is
// allocated on the stack, and whenever the running closure finishes it will
// consume the entire queue and notify all waiters they should try again.
// in the word we have allocated to point to a queue of threads waiting for the
// thread responsible for entering the RUNNING state. These bits are also used
// to ensure that at most one thread can be dealing with the queue. If all payload
// bits are set to zero, it means the queue is being worked on and the current
// thread should spin.
//
// This queue simply contains a linked list of Waiter nodes which is monotonically
// increasing in size. Each node is allocated on the stack, and whenever the
// running closure finishes it will consume the entire queue and notify all waiters
// they should try again.
//
// You'll find a few more details in the implementation, but that's the gist of
// it!
Expand All @@ -57,21 +62,20 @@
// `Once.state_and_queue` and an unknown number of `Waiter.signaled`.
// * `state_and_queue` is used (1) as a state flag, (2) for synchronizing the
// result of the `Once`, and (3) for synchronizing `Waiter` nodes.
// - At the end of the `call_inner` function we have to make sure the result
// - At the end of the `try_call_inner` function we have to make sure the result
// of the `Once` is acquired. So every load which can be the only one to
// load COMPLETED must have at least Acquire ordering, which means all
// three of them.
// - `WaiterQueue::Drop` is the only place that may store COMPLETED, and
// - `WaiterQueueGuard::Drop` is the only place that may store COMPLETED, and
// must do so with Release ordering to make the result available.
// - `wait` inserts `Waiter` nodes as a pointer in `state_and_queue`, and
// needs to make the nodes available with Release ordering. The load in
// its `compare_and_swap` can be Relaxed because it only has to compare
// the atomic, not to read other data.
// - `WaiterQueue::Drop` must see the `Waiter` nodes, so it must load
// - `wait` must acquire the spin-lock with Acquire ordering and release it
// with the Release ordering. The load before spinning can be Relaxed
// because it only has to handle the atomic, not to read other data.
// - `WaiterQueue::Drop` also need to obtain the spin-lock, so it must load
// `state_and_queue` with Acquire ordering.
// - There is just one store where `state_and_queue` is used only as a
// state flag, without having to synchronize data: switching the state
// from INCOMPLETE to RUNNING in `call_inner`. This store can be Relaxed,
// from INCOMPLETE to RUNNING in `try_call_inner`. This store can be Relaxed,
// but the read has to be Acquire because of the requirements mentioned
// above.
// * `Waiter.signaled` is both used as a flag, and to protect a field with
Expand All @@ -87,8 +91,9 @@
use crate::cell::Cell;
use crate::fmt;
use crate::marker;
use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use crate::thread::{self, Thread};
use crate::ptr;
use crate::sync::atomic::{spin_loop_hint, AtomicBool, AtomicUsize, Ordering};
use crate::thread::{self, Thread, ThreadId};

/// A synchronization primitive which can be used to run a one-time global
/// initialization. Useful for one-time initialization for FFI or related
Expand Down Expand Up @@ -156,10 +161,10 @@ pub const ONCE_INIT: Once = Once::new();

// Four states that a Once can be in, encoded into the lower bits of
// `state_and_queue` in the Once structure.
const INCOMPLETE: usize = 0x0;
const COMPLETE: usize = 0x0;
const POISONED: usize = 0x1;
const RUNNING: usize = 0x2;
const COMPLETE: usize = 0x3;
const INCOMPLETE: usize = 0x3;

// Mask to learn about the state. All other bits are the queue of waiters if
// this is in the RUNNING state.
Expand All @@ -171,21 +176,40 @@ const STATE_MASK: usize = 0x3;
// `wait` would both hand out a mutable reference to its `Waiter` node, and keep
// a shared reference to check `signaled`. Instead we hold shared references and
// use interior mutability.
#[repr(align(4))] // Ensure the two lower bits are free to use as state bits.
struct Waiter {
thread: Cell<Option<Thread>>,
signaled: AtomicBool,
next: *const Waiter,
next: Cell<*const Waiter>,
}

// Head of a linked list of waiters.
// Every node is a struct on the stack of a waiting thread.
// Will wake up the waiters when it gets dropped, i.e. also on panic.
struct WaiterQueue<'a> {
// Note: Similar to `Waiter`, because a shared reference to `WaiterQueue` can be
// obtained by other threads, we cannot hold a mutable reference to it.
// This reason also disallows Drop to be implemented on it.
#[repr(align(4))] // Ensure the two lower bits are free to use as state bits.
struct WaiterQueue {
head: Cell<*const Waiter>,
id: ThreadId,
}

// A guard that will wake up the waiters when it gets dropped, i.e. also on panic.
// A separate guard is used rather than implementing Drop on WaiterQueue to avoid
// a mutable reference to WaiterQueue from being implicit created to WaiterQueue
// during drop.
struct WaiterQueueGuard<'a> {
state_and_queue: &'a AtomicUsize,
queue: &'a WaiterQueue,
set_state_on_drop_to: usize,
}

// Potential outcomes of calling try_call_inner
enum CallResult {
Complete,
Poisoned,
Reentrance,
}

impl Once {
/// Creates a new `Once` value.
#[stable(feature = "once_new", since = "1.2.0")]
Expand Down Expand Up @@ -387,16 +411,31 @@ impl Once {
// without some allocation overhead.
#[cold]
fn call_inner(&self, ignore_poisoning: bool, init: &mut dyn FnMut(&OnceState)) {
match self.try_call_inner(ignore_poisoning, init) {
CallResult::Complete => (),
// Panic to propagate the poison.
CallResult::Poisoned => panic!("Once instance has previously been poisoned"),
CallResult::Reentrance => panic!("Once instance cannot be recursively initialized"),
}
}

fn try_call_inner(
&self,
ignore_poisoning: bool,
init: &mut dyn FnMut(&OnceState),
) -> CallResult {
let mut state_and_queue = self.state_and_queue.load(Ordering::Acquire);
loop {
match state_and_queue {
COMPLETE => break,
COMPLETE => {
return CallResult::Complete;
}
POISONED if !ignore_poisoning => {
// Panic to propagate the poison.
panic!("Once instance has previously been poisoned");
return CallResult::Poisoned;
}
POISONED | INCOMPLETE => {
// Try to register this thread as the one RUNNING.
// This simultaneously obtained the lock or the queue head.
let old = self.state_and_queue.compare_and_swap(
state_and_queue,
RUNNING,
Expand All @@ -406,74 +445,121 @@ impl Once {
state_and_queue = old;
continue;
}
// `waiter_queue` will manage other waiting threads, and
// wake them up on drop.
let mut waiter_queue = WaiterQueue {

// `waiter_queue` will manage other waiting threads, and `queue_guard`
// will wake them up on drop.
let waiter_queue =
WaiterQueue { head: Cell::new(ptr::null()), id: thread::current().id() };
let mut queue_guard = WaiterQueueGuard {
state_and_queue: &self.state_and_queue,
queue: &waiter_queue,
set_state_on_drop_to: POISONED,
};
let queue = &waiter_queue as *const WaiterQueue as usize;
// Release the lock to make the WaiterQueue available for
// other threads to join.
self.state_and_queue.store(queue | RUNNING, Ordering::Release);

// Run the initialization function, letting it know if we're
// poisoned or not.
let init_state = OnceState {
poisoned: state_and_queue == POISONED,
set_state_on_drop_to: Cell::new(COMPLETE),
};
init(&init_state);
waiter_queue.set_state_on_drop_to = init_state.set_state_on_drop_to.get();
break;
queue_guard.set_state_on_drop_to = init_state.set_state_on_drop_to.get();
return CallResult::Complete;
}
_ => {
// All other values must be RUNNING with possibly a
// pointer to the waiter queue in the more significant bits.
assert!(state_and_queue & STATE_MASK == RUNNING);
wait(&self.state_and_queue, state_and_queue);
if wait(&self.state_and_queue, state_and_queue) {
return CallResult::Reentrance;
}
state_and_queue = self.state_and_queue.load(Ordering::Acquire);
}
}
}
}
}

fn wait(state_and_queue: &AtomicUsize, mut current_state: usize) {
// Returns whether reentrance has been detected.
fn wait(state_and_queue: &AtomicUsize, mut current_state: usize) -> bool {
// Note: the following code was carefully written to avoid creating a
// mutable reference to `node` that gets aliased.

// Create a node upfront to reduce time spent inside spin lock.
let thread = thread::current();
let id = thread.id();
let node = Waiter {
thread: Cell::new(Some(thread)),
signaled: AtomicBool::new(false),
next: Cell::new(ptr::null()),
};

// Use spin-lock to lock a waiter queue.
loop {
// Don't queue this thread if the status is no longer running,
// otherwise we will not be woken up.
if current_state & STATE_MASK != RUNNING {
return;
return false;
}

// Currently locked, spin.
if current_state & !STATE_MASK == 0 {
current_state = state_and_queue.load(Ordering::Relaxed);
spin_loop_hint();
continue;
}

// Create the node for our current thread.
let node = Waiter {
thread: Cell::new(Some(thread::current())),
signaled: AtomicBool::new(false),
next: (current_state & !STATE_MASK) as *const Waiter,
};
let me = &node as *const Waiter as usize;

// Try to slide in the node at the head of the linked list, making sure
// that another thread didn't just replace the head of the linked list.
let old = state_and_queue.compare_and_swap(current_state, me | RUNNING, Ordering::Release);
// Try to lock the WaiterQueue.
let old = state_and_queue.compare_and_swap(current_state, RUNNING, Ordering::Acquire);
if old != current_state {
current_state = old;
continue;
}

// We have enqueued ourselves, now lets wait.
// It is important not to return before being signaled, otherwise we
// would drop our `Waiter` node and leave a hole in the linked list
// (and a dangling reference). Guard against spurious wakeups by
// reparking ourselves until we are signaled.
while !node.signaled.load(Ordering::Acquire) {
// If the managing thread happens to signal and unpark us before we
// can park ourselves, the result could be this thread never gets
// unparked. Luckily `park` comes with the guarantee that if it got
// an `unpark` just before on an unparked thread is does not park.
thread::park();
}
break;
}

// Insert our node into the linked list.
let reentry = {
// SAFETY: This is okay because we have just "lock"ed it. Even the thread
// that creates this WaiterQueue would need to lock it before drop it, so
// the reference is definitely not dangling.
let queue = unsafe { &*((current_state & !STATE_MASK) as *const WaiterQueue) };
if queue.id != id {
node.next.set(queue.head.get());
queue.head.set(&node as *const Waiter);
false
} else {
// If thread id matches then this is an reentrance to try_call_inner
true
}
};

// Unlock the WaiterQueue.
state_and_queue.store(current_state, Ordering::Release);

if reentry {
return true;
}

// We have enqueued ourselves, now lets wait.
// It is important not to return before being signaled, otherwise we
// would drop our `Waiter` node and leave a hole in the linked list
// (and a dangling reference). Guard against spurious wakeups by
// reparking ourselves until we are signaled.
while !node.signaled.load(Ordering::Acquire) {
// If the managing thread happens to signal and unpark us before we
// can park ourselves, the result could be this thread never gets
// unparked. Luckily `park` comes with the guarantee that if it got
// an `unpark` just before on an unparked thread is does not park.
thread::park();
}

false
}

#[stable(feature = "std_debug", since = "1.16.0")]
Expand All @@ -483,14 +569,21 @@ impl fmt::Debug for Once {
}
}

impl Drop for WaiterQueue<'_> {
impl Drop for WaiterQueueGuard<'_> {
fn drop(&mut self) {
// Swap out our state with however we finished.
let state_and_queue =
self.state_and_queue.swap(self.set_state_on_drop_to, Ordering::AcqRel);
// Lock the queue before we can access it.
loop {
let state_and_queue = self.state_and_queue.swap(RUNNING, Ordering::Acquire);
if state_and_queue != RUNNING {
// Sanity check: We should get back the queue we originally put in.
assert_eq!(state_and_queue, self.queue as *const WaiterQueue as usize | RUNNING);
break;
}
spin_loop_hint();
}

// We should only ever see an old state which was RUNNING.
assert_eq!(state_and_queue & STATE_MASK, RUNNING);
// Set the state however we finished.
self.state_and_queue.store(self.set_state_on_drop_to, Ordering::Release);

// Walk the entire linked list of waiters and wake them up (in lifo
// order, last to register is first to wake up).
Expand All @@ -499,9 +592,9 @@ impl Drop for WaiterQueue<'_> {
// free `node` if there happens to be has a spurious wakeup.
// So we have to take out the `thread` field and copy the pointer to
// `next` first.
let mut queue = (state_and_queue & !STATE_MASK) as *const Waiter;
let mut queue = self.queue.head.get();
while !queue.is_null() {
let next = (*queue).next;
let next = (*queue).next.get();
let thread = (*queue).thread.take().unwrap();
(*queue).signaled.store(true, Ordering::Release);
// ^- FIXME (maybe): This is another case of issue #55005
Expand Down