diff --git a/library/std/src/sync/once.rs b/library/std/src/sync/once.rs index 64260990824b8..f6e038fec9ede 100644 --- a/library/std/src/sync/once.rs +++ b/library/std/src/sync/once.rs @@ -26,8 +26,8 @@ // out when the mutex needs to be deallocated because it's not after the closure // finishes, but after the first successful closure finishes. // -// All in all, this is instead implemented with atomics and lock-free -// operations! Whee! Each `Once` has one word of atomic state, and this state is +// All in all, this is instead implemented with atomic operations and +// spin-locks! Whee! Each `Once` has one word of atomic state, and this state is // CAS'd on to determine what to do. There are four possible state of a `Once`: // // * Incomplete - no initialization has run yet, and no thread is currently @@ -43,11 +43,16 @@ // immediately. // // With 4 states we need 2 bits to encode this, and we use the remaining bits -// in the word we have allocated as a queue of threads waiting for the thread -// responsible for entering the RUNNING state. This queue is just a linked list -// of Waiter nodes which is monotonically increasing in size. Each node is -// allocated on the stack, and whenever the running closure finishes it will -// consume the entire queue and notify all waiters they should try again. +// in the word we have allocated to point to a queue of threads waiting for the +// thread responsible for entering the RUNNING state. These bits are also used +// to ensure that at most one thread can be dealing with the queue. If all payload +// bits are set to zero, it means the queue is being worked on and the current +// thread should spin. +// +// This queue simply contains a linked list of Waiter nodes which is monotonically +// increasing in size. Each node is allocated on the stack, and whenever the +// running closure finishes it will consume the entire queue and notify all waiters +// they should try again. // // You'll find a few more details in the implementation, but that's the gist of // it! @@ -57,21 +62,20 @@ // `Once.state_and_queue` and an unknown number of `Waiter.signaled`. // * `state_and_queue` is used (1) as a state flag, (2) for synchronizing the // result of the `Once`, and (3) for synchronizing `Waiter` nodes. -// - At the end of the `call_inner` function we have to make sure the result +// - At the end of the `try_call_inner` function we have to make sure the result // of the `Once` is acquired. So every load which can be the only one to // load COMPLETED must have at least Acquire ordering, which means all // three of them. -// - `WaiterQueue::Drop` is the only place that may store COMPLETED, and +// - `WaiterQueueGuard::Drop` is the only place that may store COMPLETED, and // must do so with Release ordering to make the result available. -// - `wait` inserts `Waiter` nodes as a pointer in `state_and_queue`, and -// needs to make the nodes available with Release ordering. The load in -// its `compare_and_swap` can be Relaxed because it only has to compare -// the atomic, not to read other data. -// - `WaiterQueue::Drop` must see the `Waiter` nodes, so it must load +// - `wait` must acquire the spin-lock with Acquire ordering and release it +// with the Release ordering. The load before spinning can be Relaxed +// because it only has to handle the atomic, not to read other data. +// - `WaiterQueue::Drop` also need to obtain the spin-lock, so it must load // `state_and_queue` with Acquire ordering. // - There is just one store where `state_and_queue` is used only as a // state flag, without having to synchronize data: switching the state -// from INCOMPLETE to RUNNING in `call_inner`. This store can be Relaxed, +// from INCOMPLETE to RUNNING in `try_call_inner`. This store can be Relaxed, // but the read has to be Acquire because of the requirements mentioned // above. // * `Waiter.signaled` is both used as a flag, and to protect a field with @@ -87,8 +91,9 @@ use crate::cell::Cell; use crate::fmt; use crate::marker; -use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use crate::thread::{self, Thread}; +use crate::ptr; +use crate::sync::atomic::{spin_loop_hint, AtomicBool, AtomicUsize, Ordering}; +use crate::thread::{self, Thread, ThreadId}; /// A synchronization primitive which can be used to run a one-time global /// initialization. Useful for one-time initialization for FFI or related @@ -156,10 +161,10 @@ pub const ONCE_INIT: Once = Once::new(); // Four states that a Once can be in, encoded into the lower bits of // `state_and_queue` in the Once structure. -const INCOMPLETE: usize = 0x0; +const COMPLETE: usize = 0x0; const POISONED: usize = 0x1; const RUNNING: usize = 0x2; -const COMPLETE: usize = 0x3; +const INCOMPLETE: usize = 0x3; // Mask to learn about the state. All other bits are the queue of waiters if // this is in the RUNNING state. @@ -171,21 +176,40 @@ const STATE_MASK: usize = 0x3; // `wait` would both hand out a mutable reference to its `Waiter` node, and keep // a shared reference to check `signaled`. Instead we hold shared references and // use interior mutability. -#[repr(align(4))] // Ensure the two lower bits are free to use as state bits. struct Waiter { thread: Cell>, signaled: AtomicBool, - next: *const Waiter, + next: Cell<*const Waiter>, } // Head of a linked list of waiters. // Every node is a struct on the stack of a waiting thread. -// Will wake up the waiters when it gets dropped, i.e. also on panic. -struct WaiterQueue<'a> { +// Note: Similar to `Waiter`, because a shared reference to `WaiterQueue` can be +// obtained by other threads, we cannot hold a mutable reference to it. +// This reason also disallows Drop to be implemented on it. +#[repr(align(4))] // Ensure the two lower bits are free to use as state bits. +struct WaiterQueue { + head: Cell<*const Waiter>, + id: ThreadId, +} + +// A guard that will wake up the waiters when it gets dropped, i.e. also on panic. +// A separate guard is used rather than implementing Drop on WaiterQueue to avoid +// a mutable reference to WaiterQueue from being implicit created to WaiterQueue +// during drop. +struct WaiterQueueGuard<'a> { state_and_queue: &'a AtomicUsize, + queue: &'a WaiterQueue, set_state_on_drop_to: usize, } +// Potential outcomes of calling try_call_inner +enum CallResult { + Complete, + Poisoned, + Reentrance, +} + impl Once { /// Creates a new `Once` value. #[stable(feature = "once_new", since = "1.2.0")] @@ -387,16 +411,31 @@ impl Once { // without some allocation overhead. #[cold] fn call_inner(&self, ignore_poisoning: bool, init: &mut dyn FnMut(&OnceState)) { + match self.try_call_inner(ignore_poisoning, init) { + CallResult::Complete => (), + // Panic to propagate the poison. + CallResult::Poisoned => panic!("Once instance has previously been poisoned"), + CallResult::Reentrance => panic!("Once instance cannot be recursively initialized"), + } + } + + fn try_call_inner( + &self, + ignore_poisoning: bool, + init: &mut dyn FnMut(&OnceState), + ) -> CallResult { let mut state_and_queue = self.state_and_queue.load(Ordering::Acquire); loop { match state_and_queue { - COMPLETE => break, + COMPLETE => { + return CallResult::Complete; + } POISONED if !ignore_poisoning => { - // Panic to propagate the poison. - panic!("Once instance has previously been poisoned"); + return CallResult::Poisoned; } POISONED | INCOMPLETE => { // Try to register this thread as the one RUNNING. + // This simultaneously obtained the lock or the queue head. let old = self.state_and_queue.compare_and_swap( state_and_queue, RUNNING, @@ -406,12 +445,21 @@ impl Once { state_and_queue = old; continue; } - // `waiter_queue` will manage other waiting threads, and - // wake them up on drop. - let mut waiter_queue = WaiterQueue { + + // `waiter_queue` will manage other waiting threads, and `queue_guard` + // will wake them up on drop. + let waiter_queue = + WaiterQueue { head: Cell::new(ptr::null()), id: thread::current().id() }; + let mut queue_guard = WaiterQueueGuard { state_and_queue: &self.state_and_queue, + queue: &waiter_queue, set_state_on_drop_to: POISONED, }; + let queue = &waiter_queue as *const WaiterQueue as usize; + // Release the lock to make the WaiterQueue available for + // other threads to join. + self.state_and_queue.store(queue | RUNNING, Ordering::Release); + // Run the initialization function, letting it know if we're // poisoned or not. let init_state = OnceState { @@ -419,14 +467,16 @@ impl Once { set_state_on_drop_to: Cell::new(COMPLETE), }; init(&init_state); - waiter_queue.set_state_on_drop_to = init_state.set_state_on_drop_to.get(); - break; + queue_guard.set_state_on_drop_to = init_state.set_state_on_drop_to.get(); + return CallResult::Complete; } _ => { // All other values must be RUNNING with possibly a // pointer to the waiter queue in the more significant bits. assert!(state_and_queue & STATE_MASK == RUNNING); - wait(&self.state_and_queue, state_and_queue); + if wait(&self.state_and_queue, state_and_queue) { + return CallResult::Reentrance; + } state_and_queue = self.state_and_queue.load(Ordering::Acquire); } } @@ -434,46 +484,82 @@ impl Once { } } -fn wait(state_and_queue: &AtomicUsize, mut current_state: usize) { +// Returns whether reentrance has been detected. +fn wait(state_and_queue: &AtomicUsize, mut current_state: usize) -> bool { // Note: the following code was carefully written to avoid creating a // mutable reference to `node` that gets aliased. + + // Create a node upfront to reduce time spent inside spin lock. + let thread = thread::current(); + let id = thread.id(); + let node = Waiter { + thread: Cell::new(Some(thread)), + signaled: AtomicBool::new(false), + next: Cell::new(ptr::null()), + }; + + // Use spin-lock to lock a waiter queue. loop { // Don't queue this thread if the status is no longer running, // otherwise we will not be woken up. if current_state & STATE_MASK != RUNNING { - return; + return false; + } + + // Currently locked, spin. + if current_state & !STATE_MASK == 0 { + current_state = state_and_queue.load(Ordering::Relaxed); + spin_loop_hint(); + continue; } - // Create the node for our current thread. - let node = Waiter { - thread: Cell::new(Some(thread::current())), - signaled: AtomicBool::new(false), - next: (current_state & !STATE_MASK) as *const Waiter, - }; - let me = &node as *const Waiter as usize; - - // Try to slide in the node at the head of the linked list, making sure - // that another thread didn't just replace the head of the linked list. - let old = state_and_queue.compare_and_swap(current_state, me | RUNNING, Ordering::Release); + // Try to lock the WaiterQueue. + let old = state_and_queue.compare_and_swap(current_state, RUNNING, Ordering::Acquire); if old != current_state { current_state = old; continue; } - // We have enqueued ourselves, now lets wait. - // It is important not to return before being signaled, otherwise we - // would drop our `Waiter` node and leave a hole in the linked list - // (and a dangling reference). Guard against spurious wakeups by - // reparking ourselves until we are signaled. - while !node.signaled.load(Ordering::Acquire) { - // If the managing thread happens to signal and unpark us before we - // can park ourselves, the result could be this thread never gets - // unparked. Luckily `park` comes with the guarantee that if it got - // an `unpark` just before on an unparked thread is does not park. - thread::park(); - } break; } + + // Insert our node into the linked list. + let reentry = { + // SAFETY: This is okay because we have just "lock"ed it. Even the thread + // that creates this WaiterQueue would need to lock it before drop it, so + // the reference is definitely not dangling. + let queue = unsafe { &*((current_state & !STATE_MASK) as *const WaiterQueue) }; + if queue.id != id { + node.next.set(queue.head.get()); + queue.head.set(&node as *const Waiter); + false + } else { + // If thread id matches then this is an reentrance to try_call_inner + true + } + }; + + // Unlock the WaiterQueue. + state_and_queue.store(current_state, Ordering::Release); + + if reentry { + return true; + } + + // We have enqueued ourselves, now lets wait. + // It is important not to return before being signaled, otherwise we + // would drop our `Waiter` node and leave a hole in the linked list + // (and a dangling reference). Guard against spurious wakeups by + // reparking ourselves until we are signaled. + while !node.signaled.load(Ordering::Acquire) { + // If the managing thread happens to signal and unpark us before we + // can park ourselves, the result could be this thread never gets + // unparked. Luckily `park` comes with the guarantee that if it got + // an `unpark` just before on an unparked thread is does not park. + thread::park(); + } + + false } #[stable(feature = "std_debug", since = "1.16.0")] @@ -483,14 +569,21 @@ impl fmt::Debug for Once { } } -impl Drop for WaiterQueue<'_> { +impl Drop for WaiterQueueGuard<'_> { fn drop(&mut self) { - // Swap out our state with however we finished. - let state_and_queue = - self.state_and_queue.swap(self.set_state_on_drop_to, Ordering::AcqRel); + // Lock the queue before we can access it. + loop { + let state_and_queue = self.state_and_queue.swap(RUNNING, Ordering::Acquire); + if state_and_queue != RUNNING { + // Sanity check: We should get back the queue we originally put in. + assert_eq!(state_and_queue, self.queue as *const WaiterQueue as usize | RUNNING); + break; + } + spin_loop_hint(); + } - // We should only ever see an old state which was RUNNING. - assert_eq!(state_and_queue & STATE_MASK, RUNNING); + // Set the state however we finished. + self.state_and_queue.store(self.set_state_on_drop_to, Ordering::Release); // Walk the entire linked list of waiters and wake them up (in lifo // order, last to register is first to wake up). @@ -499,9 +592,9 @@ impl Drop for WaiterQueue<'_> { // free `node` if there happens to be has a spurious wakeup. // So we have to take out the `thread` field and copy the pointer to // `next` first. - let mut queue = (state_and_queue & !STATE_MASK) as *const Waiter; + let mut queue = self.queue.head.get(); while !queue.is_null() { - let next = (*queue).next; + let next = (*queue).next.get(); let thread = (*queue).thread.take().unwrap(); (*queue).signaled.store(true, Ordering::Release); // ^- FIXME (maybe): This is another case of issue #55005