From b683e1dc13475238bf74a3998de841de1af5fc2f Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Wed, 16 Oct 2024 09:30:12 -0400 Subject: [PATCH 01/10] add `RwLock` `downgrade` tests --- library/std/src/sync/rwlock/tests.rs | 124 ++++++++++++++++++++++++++- 1 file changed, 122 insertions(+), 2 deletions(-) diff --git a/library/std/src/sync/rwlock/tests.rs b/library/std/src/sync/rwlock/tests.rs index 37a2e41641ac1..a4af49dc82cce 100644 --- a/library/std/src/sync/rwlock/tests.rs +++ b/library/std/src/sync/rwlock/tests.rs @@ -3,8 +3,8 @@ use rand::Rng; use crate::sync::atomic::{AtomicUsize, Ordering}; use crate::sync::mpsc::channel; use crate::sync::{ - Arc, MappedRwLockReadGuard, MappedRwLockWriteGuard, RwLock, RwLockReadGuard, RwLockWriteGuard, - TryLockError, + Arc, Barrier, MappedRwLockReadGuard, MappedRwLockWriteGuard, RwLock, RwLockReadGuard, + RwLockWriteGuard, TryLockError, }; use crate::thread; @@ -501,3 +501,123 @@ fn panic_while_mapping_write_unlocked_poison() { drop(lock); } + +#[test] +fn test_downgrade_basic() { + let r = RwLock::new(()); + + let write_guard = r.write().unwrap(); + let _read_guard = RwLockWriteGuard::downgrade(write_guard); +} + +#[test] +fn test_downgrade_readers() { + // This test creates 1 writing thread and `R` reader threads doing `N` iterations. + const R: usize = 10; + const N: usize = if cfg!(target_pointer_width = "64") { 100 } else { 20 }; + + // The writer thread will constantly update the value inside the `RwLock`, and this test will + // only pass if every reader observes all values between 0 and `N`. + let rwlock = Arc::new(RwLock::new(0)); + let barrier = Arc::new(Barrier::new(R + 1)); + + // Create the writing thread. + let r_writer = rwlock.clone(); + let b_writer = barrier.clone(); + thread::spawn(move || { + for i in 0..N { + let mut write_guard = r_writer.write().unwrap(); + *write_guard = i; + + let read_guard = RwLockWriteGuard::downgrade(write_guard); + assert_eq!(*read_guard, i); + + // Wait for all readers to observe the new value. + b_writer.wait(); + } + }); + + for _ in 0..R { + let rwlock = rwlock.clone(); + let barrier = barrier.clone(); + thread::spawn(move || { + // Every reader thread needs to observe every value up to `N`. + for i in 0..N { + let read_guard = rwlock.read().unwrap(); + assert_eq!(*read_guard, i); + drop(read_guard); + + // Wait for everyone to read and for the writer to change the value again. + barrier.wait(); + + // Spin until the writer has changed the value. + loop { + let read_guard = rwlock.read().unwrap(); + assert!(*read_guard >= i); + + if *read_guard > i { + break; + } + } + } + }); + } +} + +#[test] +fn test_downgrade_atomic() { + const NEW_VALUE: i32 = -1; + + // This test checks that `downgrade` is atomic, meaning as soon as a write lock has been + // downgraded, the lock must be in read mode and no other threads can take the write lock to + // modify the protected value. + + // `W` is the number of evil writer threads. + const W: usize = if cfg!(target_pointer_width = "64") { 100 } else { 20 }; + let rwlock = Arc::new(RwLock::new(0)); + + // Spawns many evil writer threads that will try and write to the locked value before the + // initial writer (who has the exclusive lock) can read after it downgrades. + // If the `RwLock` behaves correctly, then the initial writer should read the value it wrote + // itself as no other thread should be able to mutate the protected value. + + // Put the lock in write mode, causing all future threads trying to access this go to sleep. + let mut main_write_guard = rwlock.write().unwrap(); + + // Spawn all of the evil writer threads. They will each increment the protected value by 1. + let handles: Vec<_> = (0..W) + .map(|_| { + let rwlock = rwlock.clone(); + thread::spawn(move || { + // Will go to sleep since the main thread initially has the write lock. + let mut evil_guard = rwlock.write().unwrap(); + *evil_guard += 1; + }) + }) + .collect(); + + // Wait for a good amount of time so that evil threads go to sleep. + // Note: this is not strictly necessary... + let eternity = crate::time::Duration::from_millis(42); + thread::sleep(eternity); + + // Once everyone is asleep, set the value to `NEW_VALUE`. + *main_write_guard = NEW_VALUE; + + // Atomically downgrade the write guard into a read guard. + let main_read_guard = RwLockWriteGuard::downgrade(main_write_guard); + + // If the above is not atomic, then it would be possible for an evil thread to get in front of + // this read and change the value to be non-negative. + assert_eq!(*main_read_guard, NEW_VALUE, "`downgrade` was not atomic"); + + // Drop the main read guard and allow the evil writer threads to start incrementing. + drop(main_read_guard); + + for handle in handles { + handle.join().unwrap(); + } + + let final_check = rwlock.read().unwrap(); + assert_eq!(*final_check, W as i32 + NEW_VALUE); +} From f71ecc48cc130b8e056063062d69338e2bc4e1b2 Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Wed, 16 Oct 2024 09:31:30 -0400 Subject: [PATCH 02/10] add `downgrade` method onto `RwLockWriteGuard` --- library/std/src/sync/rwlock.rs | 74 ++++++++++++++++++++++++++++++++-- 1 file changed, 70 insertions(+), 4 deletions(-) diff --git a/library/std/src/sync/rwlock.rs b/library/std/src/sync/rwlock.rs index da2da6f9dfc53..d55d1c80dcae0 100644 --- a/library/std/src/sync/rwlock.rs +++ b/library/std/src/sync/rwlock.rs @@ -4,10 +4,10 @@ mod tests; use crate::cell::UnsafeCell; use crate::fmt; use crate::marker::PhantomData; -use crate::mem::ManuallyDrop; +use crate::mem::{ManuallyDrop, forget}; use crate::ops::{Deref, DerefMut}; use crate::ptr::NonNull; -use crate::sync::{LockResult, TryLockError, TryLockResult, poison}; +use crate::sync::{LockResult, PoisonError, TryLockError, TryLockResult, poison}; use crate::sys::sync as sys; /// A reader-writer lock @@ -574,8 +574,12 @@ impl From for RwLock { impl<'rwlock, T: ?Sized> RwLockReadGuard<'rwlock, T> { /// Creates a new instance of `RwLockReadGuard` from a `RwLock`. - // SAFETY: if and only if `lock.inner.read()` (or `lock.inner.try_read()`) has been - // successfully called from the same thread before instantiating this object. + /// + /// # Safety + /// + /// This function is safe if and only if the same thread has successfully and safely called + /// `lock.inner.read()`, `lock.inner.try_read()`, or `lock.inner.downgrade()` before + /// instantiating this object. unsafe fn new(lock: &'rwlock RwLock) -> LockResult> { poison::map_result(lock.poison.borrow(), |()| RwLockReadGuard { data: unsafe { NonNull::new_unchecked(lock.data.get()) }, @@ -957,6 +961,68 @@ impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> { None => Err(orig), } } + + /// Downgrades a write-locked `RwLockWriteGuard` into a read-locked [`RwLockReadGuard`]. + /// + /// This method will atomically change the state of the [`RwLock`] from exclusive mode into + /// shared mode. This means that it is impossible for a writing thread to get in between a + /// thread calling `downgrade` and the same thread reading whatever it wrote while it had the + /// [`RwLock`] in write mode. + /// + /// Note that since we have the `RwLockWriteGuard`, we know that the [`RwLock`] is already + /// locked for writing, so this method cannot fail. + /// + /// # Example + /// + /// ``` + /// #![feature(rwlock_downgrade)] + /// use std::sync::{Arc, RwLock, RwLockWriteGuard}; + /// + /// // The inner value starts as 0. + /// let rw = Arc::new(RwLock::new(0)); + /// + /// // Put the lock in write mode. + /// let mut main_write_guard = rw.write().unwrap(); + /// + /// let evil = rw.clone(); + /// let handle = std::thread::spawn(move || { + /// // This will not return until the main thread drops the `main_read_guard`. + /// let mut evil_guard = evil.write().unwrap(); + /// + /// assert_eq!(*evil_guard, 1); + /// *evil_guard = 2; + /// }); + /// + /// // After spawning the writer thread, set the inner value to 1. + /// *main_write_guard = 1; + /// + /// // Atomically downgrade the write guard into a read guard. + /// let main_read_guard = RwLockWriteGuard::downgrade(main_write_guard); + /// + /// // Since `downgrade` is atomic, the writer thread cannot have set the inner value to 2. + /// assert_eq!(*main_read_guard, 1, "`downgrade` was not atomic"); + /// + /// // Clean up everything now + /// drop(main_read_guard); + /// handle.join().unwrap(); + /// + /// let final_check = rw.read().unwrap(); + /// assert_eq!(*final_check, 2); + /// ``` + #[unstable(feature = "rwlock_downgrade", issue = "128203")] + pub fn downgrade(s: Self) -> RwLockReadGuard<'a, T> { + let lock = s.lock; + + // We don't want to call the destructor since that calls `write_unlock`. + forget(s); + + // SAFETY: We take ownership of a write guard, so we must already have the `RwLock` in write + // mode, satisfying the `downgrade` contract. + unsafe { lock.inner.downgrade() }; + + // SAFETY: We have just successfully called `downgrade`, so we fulfill the safety contract. + unsafe { RwLockReadGuard::new(lock).unwrap_or_else(PoisonError::into_inner) } + } } impl<'a, T: ?Sized> MappedRwLockWriteGuard<'a, T> { From 3336ae0838a1bc0f77854b7b40127d2e6bdca696 Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Wed, 16 Oct 2024 09:32:01 -0400 Subject: [PATCH 03/10] add simple `downgrade` implementations --- library/std/src/sys/sync/rwlock/no_threads.rs | 5 +++++ library/std/src/sys/sync/rwlock/solid.rs | 6 ++++++ library/std/src/sys/sync/rwlock/teeos.rs | 6 ++++++ 3 files changed, 17 insertions(+) diff --git a/library/std/src/sys/sync/rwlock/no_threads.rs b/library/std/src/sys/sync/rwlock/no_threads.rs index 6965e2e2cabe5..c11e59f719e93 100644 --- a/library/std/src/sys/sync/rwlock/no_threads.rs +++ b/library/std/src/sys/sync/rwlock/no_threads.rs @@ -62,4 +62,9 @@ impl RwLock { pub unsafe fn write_unlock(&self) { assert_eq!(self.mode.replace(0), -1); } + + #[inline] + pub unsafe fn downgrade(&self) { + assert_eq!(self.mode.replace(1), -1); + } } diff --git a/library/std/src/sys/sync/rwlock/solid.rs b/library/std/src/sys/sync/rwlock/solid.rs index 7703082f95116..f664fef907404 100644 --- a/library/std/src/sys/sync/rwlock/solid.rs +++ b/library/std/src/sys/sync/rwlock/solid.rs @@ -79,6 +79,12 @@ impl RwLock { let rwl = self.raw(); expect_success_aborting(unsafe { abi::rwl_unl_rwl(rwl) }, &"rwl_unl_rwl"); } + + #[inline] + pub unsafe fn downgrade(&self) { + // The SOLID platform does not support the `downgrade` operation for reader writer locks, so + // this function is simply a no-op as only 1 reader can read: the original writer. + } } impl Drop for RwLock { diff --git a/library/std/src/sys/sync/rwlock/teeos.rs b/library/std/src/sys/sync/rwlock/teeos.rs index 763430223834b..4a71a3abc2729 100644 --- a/library/std/src/sys/sync/rwlock/teeos.rs +++ b/library/std/src/sys/sync/rwlock/teeos.rs @@ -41,4 +41,10 @@ impl RwLock { pub unsafe fn write_unlock(&self) { unsafe { self.inner.unlock() }; } + + #[inline] + pub unsafe fn downgrade(&self) { + // Since there is no difference between read-locked and write-locked on this platform, this + // function is simply a no-op as only 1 reader can read: the original writer. + } } From fa9f04af5db346847295b10142fffde38dab6169 Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Wed, 16 Oct 2024 09:41:30 -0400 Subject: [PATCH 04/10] add `downgrade` to `futex` implementation --- library/std/src/sys/sync/rwlock/futex.rs | 52 +++++++++++++++++++++--- 1 file changed, 47 insertions(+), 5 deletions(-) diff --git a/library/std/src/sys/sync/rwlock/futex.rs b/library/std/src/sys/sync/rwlock/futex.rs index 447048edf7622..e0f4a91e0731b 100644 --- a/library/std/src/sys/sync/rwlock/futex.rs +++ b/library/std/src/sys/sync/rwlock/futex.rs @@ -18,6 +18,7 @@ pub struct RwLock { const READ_LOCKED: Primitive = 1; const MASK: Primitive = (1 << 30) - 1; const WRITE_LOCKED: Primitive = MASK; +const DOWNGRADE: Primitive = READ_LOCKED.wrapping_sub(WRITE_LOCKED); // READ_LOCKED - WRITE_LOCKED const MAX_READERS: Primitive = MASK - 1; const READERS_WAITING: Primitive = 1 << 30; const WRITERS_WAITING: Primitive = 1 << 31; @@ -53,6 +54,24 @@ fn is_read_lockable(state: Primitive) -> bool { state & MASK < MAX_READERS && !has_readers_waiting(state) && !has_writers_waiting(state) } +#[inline] +fn is_read_lockable_after_wakeup(state: Primitive) -> bool { + // We make a special case for checking if we can read-lock _after_ a reader thread that went to + // sleep has been woken up by a call to `downgrade`. + // + // `downgrade` will wake up all readers and place the lock in read mode. Thus, there should be + // no readers waiting and the lock should be read-locked (not write-locked or unlocked). + // + // Note that we do not check if any writers are waiting. This is because a call to `downgrade` + // implies that the caller wants other readers to read the value protected by the lock. If we + // did not allow readers to acquire the lock before writers after a `downgrade`, then only the + // original writer would be able to read the value, thus defeating the purpose of `downgrade`. + state & MASK < MAX_READERS + && !has_readers_waiting(state) + && !is_write_locked(state) + && !is_unlocked(state) +} + #[inline] fn has_reached_max_readers(state: Primitive) -> bool { state & MASK == MAX_READERS @@ -84,6 +103,9 @@ impl RwLock { } } + /// # Safety + /// + /// The `RwLock` must be read-locked (N readers) in order to call this. #[inline] pub unsafe fn read_unlock(&self) { let state = self.state.fetch_sub(READ_LOCKED, Release) - READ_LOCKED; @@ -100,11 +122,13 @@ impl RwLock { #[cold] fn read_contended(&self) { + let mut has_slept = false; let mut state = self.spin_read(); loop { - // If we can lock it, lock it. - if is_read_lockable(state) { + // If we have just been woken up, first check for a `downgrade` call. + // Otherwise, if we can read-lock it, lock it. + if (has_slept && is_read_lockable_after_wakeup(state)) || is_read_lockable(state) { match self.state.compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed) { Ok(_) => return, // Locked! @@ -116,9 +140,7 @@ impl RwLock { } // Check for overflow. - if has_reached_max_readers(state) { - panic!("too many active read locks on RwLock"); - } + assert!(!has_reached_max_readers(state), "too many active read locks on RwLock"); // Make sure the readers waiting bit is set before we go to sleep. if !has_readers_waiting(state) { @@ -132,6 +154,7 @@ impl RwLock { // Wait for the state to change. futex_wait(&self.state, state | READERS_WAITING, None); + has_slept = true; // Spin again after waking up. state = self.spin_read(); @@ -152,6 +175,9 @@ impl RwLock { } } + /// # Safety + /// + /// The `RwLock` must be write-locked (single writer) in order to call this. #[inline] pub unsafe fn write_unlock(&self) { let state = self.state.fetch_sub(WRITE_LOCKED, Release) - WRITE_LOCKED; @@ -163,6 +189,22 @@ impl RwLock { } } + /// # Safety + /// + /// The `RwLock` must be write-locked (single writer) in order to call this. + #[inline] + pub unsafe fn downgrade(&self) { + // Removes all write bits and adds a single read bit. + let state = self.state.fetch_add(DOWNGRADE, Relaxed); + debug_assert!(is_write_locked(state), "RwLock must be write locked to call `downgrade`"); + + if has_readers_waiting(state) { + // Since we had the exclusive lock, nobody else can unset this bit. + self.state.fetch_sub(READERS_WAITING, Relaxed); + futex_wake_all(&self.state); + } + } + #[cold] fn write_contended(&self) { let mut state = self.spin_write(); From 31e35c2131f0b71a7d2cdc3b515d1a22f5f3b61d Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Fri, 25 Oct 2024 14:54:43 -0400 Subject: [PATCH 05/10] modify queue implementation documentation This commit only has documentation changes and a few things moved around the file. The very few code changes are cosmetic: changes like turning a `match` statement into an `if let` statement or reducing indentation for long if statements. This commit also adds several safety comments on top of `unsafe` blocks that might not be immediately obvious to a first-time reader. Code "changes" are in: - `add_backlinks_and_find_tail` - `lock_contended` A majority of the changes are just expanding the comments from 80 columns to 100 columns. --- library/std/src/sys/sync/rwlock/queue.rs | 405 ++++++++++++----------- 1 file changed, 214 insertions(+), 191 deletions(-) diff --git a/library/std/src/sys/sync/rwlock/queue.rs b/library/std/src/sys/sync/rwlock/queue.rs index 889961915f4e6..5879d1f84154f 100644 --- a/library/std/src/sys/sync/rwlock/queue.rs +++ b/library/std/src/sys/sync/rwlock/queue.rs @@ -1,37 +1,38 @@ //! Efficient read-write locking without `pthread_rwlock_t`. //! -//! The readers-writer lock provided by the `pthread` library has a number of -//! problems which make it a suboptimal choice for `std`: +//! The readers-writer lock provided by the `pthread` library has a number of problems which make it +//! a suboptimal choice for `std`: //! -//! * It is non-movable, so it needs to be allocated (lazily, to make the -//! constructor `const`). -//! * `pthread` is an external library, meaning the fast path of acquiring an -//! uncontended lock cannot be inlined. -//! * Some platforms (at least glibc before version 2.25) have buggy implementations -//! that can easily lead to undefined behavior in safe Rust code when not properly -//! guarded against. +//! * It is non-movable, so it needs to be allocated (lazily, to make the constructor `const`). +//! * `pthread` is an external library, meaning the fast path of acquiring an uncontended lock +//! cannot be inlined. +//! * Some platforms (at least glibc before version 2.25) have buggy implementations that can easily +//! lead to undefined behaviour in safe Rust code when not properly guarded against. //! * On some platforms (e.g. macOS), the lock is very slow. //! -//! Therefore, we implement our own `RwLock`! Naively, one might reach for a -//! spinlock, but those [can be quite problematic] when the lock is contended. -//! Instead, this readers-writer lock copies its implementation strategy from -//! the Windows [SRWLOCK] and the [usync] library. Spinning is still used for the -//! fast path, but it is bounded: after spinning fails, threads will locklessly -//! add an information structure containing a [`Thread`] handle into a queue of -//! waiters associated with the lock. The lock owner, upon releasing the lock, -//! will scan through the queue and wake up threads as appropriate, which will -//! then again try to acquire the lock. The resulting [`RwLock`] is: +//! Therefore, we implement our own [`RwLock`]! Naively, one might reach for a spinlock, but those +//! can be quite [problematic] when the lock is contended. //! -//! * adaptive, since it spins before doing any heavywheight parking operations -//! * allocation-free, modulo the per-thread [`Thread`] handle, which is -//! allocated regardless when using threads created by `std` +//! Instead, this [`RwLock`] copies its implementation strategy from the Windows [SRWLOCK] and the +//! [usync] library implementations. +//! +//! Spinning is still used for the fast path, but it is bounded: after spinning fails, threads will +//! locklessly add an information structure ([`Node`]) containing a [`Thread`] handle into a queue +//! of waiters associated with the lock. The lock owner, upon releasing the lock, will scan through +//! the queue and wake up threads as appropriate, and the newly-awoken threads will then try to +//! acquire the lock themselves. +//! +//! The resulting [`RwLock`] is: +//! +//! * adaptive, since it spins before doing any heavyweight parking operations +//! * allocation-free, modulo the per-thread [`Thread`] handle, which is allocated anyways when +//! using threads created by `std` //! * writer-preferring, even if some readers may still slip through -//! * unfair, which reduces context-switching and thus drastically improves -//! performance +//! * unfair, which reduces context-switching and thus drastically improves performance //! //! and also quite fast in most cases. //! -//! [can be quite problematic]: https://matklad.github.io/2020/01/02/spinlocks-considered-harmful.html +//! [problematic]: https://matklad.github.io/2020/01/02/spinlocks-considered-harmful.html //! [SRWLOCK]: https://learn.microsoft.com/en-us/windows/win32/sync/slim-reader-writer--srw--locks //! [usync]: https://crates.io/crates/usync //! @@ -39,8 +40,8 @@ //! //! ## State //! -//! A single [`AtomicPtr`] is used as state variable. The lowest three bits are used -//! to indicate the meaning of the remaining bits: +//! A single [`AtomicPtr`] is used as state variable. The lowest three bits are used to indicate the +//! meaning of the remaining bits: //! //! | [`LOCKED`] | [`QUEUED`] | [`QUEUE_LOCKED`] | Remaining | | //! |:-----------|:-----------|:-----------------|:-------------|:----------------------------------------------------------------------------------------------------------------------------| @@ -50,22 +51,26 @@ //! | 0 | 1 | * | `*mut Node` | The lock is unlocked, but some threads are waiting. Only writers may lock the lock | //! | 1 | 1 | * | `*mut Node` | The lock is locked, but some threads are waiting. If the lock is read-locked, the last queue node contains the reader count | //! -//! ## Waiter queue +//! ## Waiter Queue //! -//! When threads are waiting on the lock (`QUEUE` is set), the lock state -//! points to a queue of waiters, which is implemented as a linked list of -//! nodes stored on the stack to avoid memory allocation. To enable lockless -//! enqueuing of new nodes to the queue, the linked list is single-linked upon -//! creation. Since when the lock is read-locked, the lock count is stored in -//! the last link of the queue, threads have to traverse the queue to find the -//! last element upon releasing the lock. To avoid having to traverse the whole -//! list again and again, a pointer to the found tail is cached in the (current) -//! first element of the queue. +//! When threads are waiting on the lock (the `QUEUE` bit is set), the lock state points to a queue +//! of waiters, which is implemented as a linked list of nodes stored on the stack to avoid memory +//! allocation. //! -//! Also, while the lock is unfair for performance reasons, it is still best to -//! wake the tail node first, which requires backlinks to previous nodes to be -//! created. This is done at the same time as finding the tail, and thus a set -//! tail field indicates the remaining portion of the queue is initialized. +//! To enable lock-free enqueuing of new nodes to the queue, the linked list is singly-linked upon +//! creation. +//! +//! When the lock is read-locked, the lock count (number of readers) is stored in the last link of +//! the queue. Threads have to traverse the queue to find the last element upon releasing the lock. +//! To avoid having to traverse the entire list every time we want to access the reader count, a +//! pointer to the found tail is cached in the (current) first element of the queue. +//! +//! Also, while the lock is unfair for performance reasons, it is still best to wake the tail node +//! first (FIFO ordering). Since we always pop nodes off the tail of the queue, we must store +//! backlinks to previous nodes so that we can update the `tail` field of the (current) first +//! element of the queue. Adding backlinks is done at the same time as finding the tail (via the +//! function [`find_tail_and_add_backlinks`]), and thus encountering a set tail field on a node +//! indicates that all following nodes in the queue are initialized. //! //! TLDR: Here's a diagram of what the queue looks like: //! @@ -89,21 +94,21 @@ //! 3. All nodes preceding this node must have a correct, non-null `next` field. //! 4. All nodes following this node must have a correct, non-null `prev` field. //! -//! Access to the queue is controlled by the `QUEUE_LOCKED` bit, which threads -//! try to set both after enqueuing themselves to eagerly add backlinks to the -//! queue, which drastically improves performance, and after unlocking the lock -//! to wake the next waiter(s). This is done atomically at the same time as the -//! enqueuing/unlocking operation. The thread releasing the `QUEUE_LOCK` bit -//! will check the state of the lock and wake up waiters as appropriate. This -//! guarantees forward-progress even if the unlocking thread could not acquire -//! the queue lock. +//! Access to the queue is controlled by the `QUEUE_LOCKED` bit. Threads will try to set this bit +//! in two cases: one is when a thread enqueues itself and eagerly adds backlinks to the queue +//! (which drastically improves performance), and the other is after a thread unlocks the lock to +//! wake up the next waiter(s). +//! +//! `QUEUE_LOCKED` is set atomically at the same time as the enqueuing/unlocking operations. The +//! thread releasing the `QUEUE_LOCK` bit will check the state of the lock and wake up waiters as +//! appropriate. This guarantees forward progress even if the unlocking thread could not acquire the +//! queue lock. //! -//! ## Memory orderings +//! ## Memory Orderings //! -//! To properly synchronize changes to the data protected by the lock, the lock -//! is acquired and released with [`Acquire`] and [`Release`] ordering, respectively. -//! To propagate the initialization of nodes, changes to the queue lock are also -//! performed using these orderings. +//! To properly synchronize changes to the data protected by the lock, the lock is acquired and +//! released with [`Acquire`] and [`Release`] ordering, respectively. To propagate the +//! initialization of nodes, changes to the queue lock are also performed using these orderings. #![forbid(unsafe_op_in_unsafe_fn)] @@ -115,20 +120,23 @@ use crate::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; use crate::sync::atomic::{AtomicBool, AtomicPtr}; use crate::thread::{self, Thread, ThreadId}; -// Locking uses exponential backoff. `SPIN_COUNT` indicates how many times the -// locking operation will be retried. -// `spin_loop` will be called `2.pow(SPIN_COUNT) - 1` times. -const SPIN_COUNT: usize = 7; - -type State = *mut (); +/// The atomic lock state. type AtomicState = AtomicPtr<()>; +/// The inner lock state. +type State = *mut (); const UNLOCKED: State = without_provenance_mut(0); -const LOCKED: usize = 1; -const QUEUED: usize = 2; -const QUEUE_LOCKED: usize = 4; -const SINGLE: usize = 8; -const MASK: usize = !(QUEUE_LOCKED | QUEUED | LOCKED); +const LOCKED: usize = 1 << 0; +const QUEUED: usize = 1 << 1; +const QUEUE_LOCKED: usize = 1 << 2; +const SINGLE: usize = 1 << 3; +const NODE_MASK: usize = !(QUEUE_LOCKED | QUEUED | LOCKED); + +/// Locking uses exponential backoff. `SPIN_COUNT` indicates how many times the locking operation +/// will be retried. +/// +/// In other words, `spin_loop` will be called `2.pow(SPIN_COUNT) - 1` times. +const SPIN_COUNT: usize = 7; /// Marks the state as write-locked, if possible. #[inline] @@ -147,13 +155,28 @@ fn read_lock(state: State) -> Option { } } -/// Masks the state, assuming it points to a queue node. +/// Converts a `State` into a `Node` by masking out the bottom bits of the state, assuming that the +/// state points to a queue node. /// /// # Safety +/// /// The state must contain a valid pointer to a queue node. #[inline] unsafe fn to_node(state: State) -> NonNull { - unsafe { NonNull::new_unchecked(state.mask(MASK)).cast() } + unsafe { NonNull::new_unchecked(state.mask(NODE_MASK)).cast() } +} + +/// The representation of a thread waiting on the lock queue. +/// +/// We initialize these `Node`s on thread execution stacks to avoid allocation. +#[repr(align(8))] +struct Node { + next: AtomicLink, + prev: AtomicLink, + tail: AtomicLink, + write: bool, + thread: OnceCell, + completed: AtomicBool, } /// An atomic node pointer with relaxed operations. @@ -173,16 +196,6 @@ impl AtomicLink { } } -#[repr(align(8))] -struct Node { - next: AtomicLink, - prev: AtomicLink, - tail: AtomicLink, - write: bool, - thread: OnceCell, - completed: AtomicBool, -} - impl Node { /// Creates a new queue node. fn new(write: bool) -> Node { @@ -198,17 +211,17 @@ impl Node { /// Prepare this node for waiting. fn prepare(&mut self) { - // Fall back to creating an unnamed `Thread` handle to allow locking in - // TLS destructors. + // Fall back to creating an unnamed `Thread` handle to allow locking in TLS destructors. self.thread.get_or_init(|| { thread::try_current().unwrap_or_else(|| Thread::new_unnamed(ThreadId::new())) }); self.completed = AtomicBool::new(false); } - /// Wait until this node is marked as completed. + /// Wait until this node is marked as [`complete`](Node::complete)d by another thread. /// /// # Safety + /// /// May only be called from the thread that created the node. unsafe fn wait(&self) { while !self.completed.load(Acquire) { @@ -218,51 +231,48 @@ impl Node { } } - /// Atomically mark this node as completed. The node may not outlive this call. - unsafe fn complete(this: NonNull) { - // Since the node may be destroyed immediately after the completed flag - // is set, clone the thread handle before that. - let thread = unsafe { this.as_ref().thread.get().unwrap().clone() }; + /// Atomically mark this node as completed. + /// + /// # Safety + /// + /// `node` must point to a valid `Node`, and the node may not outlive this call. + unsafe fn complete(node: NonNull) { + // Since the node may be destroyed immediately after the completed flag is set, clone the + // thread handle before that. + let thread = unsafe { node.as_ref().thread.get().unwrap().clone() }; unsafe { - this.as_ref().completed.store(true, Release); + node.as_ref().completed.store(true, Release); } thread.unpark(); } } -struct PanicGuard; - -impl Drop for PanicGuard { - fn drop(&mut self) { - rtabort!("tried to drop node in intrusive list."); - } -} - -/// Add backlinks to the queue, returning the tail. +/// Traverse the queue and find the tail, adding backlinks to the queue while traversing. /// -/// May be called from multiple threads at the same time, while the queue is not +/// This may be called from multiple threads at the same time as long as the queue is not being /// modified (this happens when unlocking multiple readers). /// /// # Safety +/// /// * `head` must point to a node in a valid queue. -/// * `head` must be or be in front of the head of the queue at the time of the -/// last removal. -/// * The part of the queue starting with `head` must not be modified during this -/// call. -unsafe fn add_backlinks_and_find_tail(head: NonNull) -> NonNull { +/// * `head` must be in front of the head of the queue at the time of the last removal. +/// * The part of the queue starting with `head` must not be modified during this call. +unsafe fn find_tail_and_add_backlinks(head: NonNull) -> NonNull { let mut current = head; + + // Traverse the queue until we find a node that has a set `tail`. let tail = loop { let c = unsafe { current.as_ref() }; - match c.tail.get() { - Some(tail) => break tail, - // SAFETY: - // All `next` fields before the first node with a `set` tail are - // non-null and valid (invariant 3). - None => unsafe { - let next = c.next.get().unwrap_unchecked(); - next.as_ref().prev.set(Some(current)); - current = next; - }, + if let Some(tail) = c.tail.get() { + break tail; + } + + // SAFETY: All `next` fields before the first node with a set `tail` are non-null and valid + // (by Invariant 3). + unsafe { + let next = c.next.get().unwrap_unchecked(); + next.as_ref().prev.set(Some(current)); + current = next; } }; @@ -272,6 +282,16 @@ unsafe fn add_backlinks_and_find_tail(head: NonNull) -> NonNull { } } +/// A type to guard against the unwinds of stacks that nodes are located on due to panics. +struct PanicGuard; + +impl Drop for PanicGuard { + fn drop(&mut self) { + rtabort!("tried to drop node in intrusive list."); + } +} + +/// The public inner `RwLock` type. pub struct RwLock { state: AtomicState, } @@ -296,11 +316,10 @@ impl RwLock { #[inline] pub fn try_write(&self) -> bool { - // Atomically set the `LOCKED` bit. This is lowered to a single atomic - // instruction on most modern processors (e.g. "lock bts" on x86 and - // "ldseta" on modern AArch64), and therefore is more efficient than - // `fetch_update(lock(true))`, which can spuriously fail if a new node - // is appended to the queue. + // Atomically set the `LOCKED` bit. This is lowered to a single atomic instruction on most + // modern processors (e.g. "lock bts" on x86 and "ldseta" on modern AArch64), and therefore + // is more efficient than `fetch_update(lock(true))`, which can spuriously fail if a new + // node is appended to the queue. self.state.fetch_or(LOCKED, Acquire).addr() & LOCKED == 0 } @@ -313,88 +332,91 @@ impl RwLock { #[cold] fn lock_contended(&self, write: bool) { - let update = if write { write_lock } else { read_lock }; + let update_fn = if write { write_lock } else { read_lock }; let mut node = Node::new(write); let mut state = self.state.load(Relaxed); let mut count = 0; loop { - if let Some(next) = update(state) { + // Optimistically update the state. + if let Some(next) = update_fn(state) { // The lock is available, try locking it. match self.state.compare_exchange_weak(state, next, Acquire, Relaxed) { Ok(_) => return, Err(new) => state = new, } + continue; } else if state.addr() & QUEUED == 0 && count < SPIN_COUNT { - // If the lock is not available and no threads are queued, spin - // for a while, using exponential backoff to decrease cache - // contention. + // If the lock is not available and no threads are queued, optimistically spin for a + // while, using exponential backoff to decrease cache contention. for _ in 0..(1 << count) { spin_loop(); } state = self.state.load(Relaxed); count += 1; + continue; + } + // The optimistic paths did not succeed, so fall back to parking the thread. + + // First, prepare the node. + node.prepare(); + + // If there are threads queued, this will set the `next` field to be a pointer to the + // first node in the queue. + // If the state is read-locked, this will set `next` to the lock count. + // If it is write-locked, it will set `next` to zero. + node.next.0 = AtomicPtr::new(state.mask(NODE_MASK).cast()); + node.prev = AtomicLink::new(None); + + // Set the `QUEUED` bit and maintain the `LOCKED` bit. + let mut next = ptr::from_ref(&node) + .map_addr(|addr| addr | QUEUED | (state.addr() & LOCKED)) + as State; + + if state.addr() & QUEUED == 0 { + // If this is the first node in the queue, set the `tail` field to the node itself + // to ensure there is a valid `tail` field in the queue (Invariants 1 & 2). + // This needs to use `set` to avoid invalidating the new pointer. + node.tail.set(Some(NonNull::from(&node))); } else { - // Fall back to parking. First, prepare the node. - node.prepare(); - - // If there are threads queued, set the `next` field to a - // pointer to the next node in the queue. Otherwise set it to - // the lock count if the state is read-locked or to zero if it - // is write-locked. - node.next.0 = AtomicPtr::new(state.mask(MASK).cast()); - node.prev = AtomicLink::new(None); - let mut next = ptr::from_ref(&node) - .map_addr(|addr| addr | QUEUED | (state.addr() & LOCKED)) - as State; - - if state.addr() & QUEUED == 0 { - // If this is the first node in the queue, set the tail field to - // the node itself to ensure there is a current `tail` field in - // the queue (invariants 1 and 2). This needs to use `set` to - // avoid invalidating the new pointer. - node.tail.set(Some(NonNull::from(&node))); - } else { - // Otherwise, the tail of the queue is not known. - node.tail.set(None); - // Try locking the queue to eagerly add backlinks. - next = next.map_addr(|addr| addr | QUEUE_LOCKED); - } + // Otherwise, the tail of the queue is not known. + node.tail.set(None); - // Register the node, using release ordering to propagate our - // changes to the waking thread. - if let Err(new) = self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) { - // The state has changed, just try again. - state = new; - continue; - } + // Try locking the queue to eagerly add backlinks. + next = next.map_addr(|addr| addr | QUEUE_LOCKED); + } - // The node is registered, so the structure must not be - // mutably accessed or destroyed while other threads may - // be accessing it. Guard against unwinds using a panic - // guard that aborts when dropped. - let guard = PanicGuard; + // Register the node, using release ordering to propagate our changes to the waking + // thread. + if let Err(new) = self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) { + // The state has changed, just try again. + state = new; + continue; + } + // The node has been registered, so the structure must not be mutably accessed or + // destroyed while other threads may be accessing it. - // If the current thread locked the queue, unlock it again, - // linking it in the process. - if state.addr() & (QUEUE_LOCKED | QUEUED) == QUEUED { - unsafe { - self.unlock_queue(next); - } - } + // Guard against unwinds using a `PanicGuard` that aborts when dropped. + let guard = PanicGuard; - // Wait until the node is removed from the queue. - // SAFETY: the node was created by the current thread. + // If the current thread locked the queue, unlock it to eagerly add backlinks. + if state.addr() & (QUEUE_LOCKED | QUEUED) == QUEUED { unsafe { - node.wait(); + self.unlock_queue(next); } + } - // The node was removed from the queue, disarm the guard. - mem::forget(guard); - - // Reload the state and try again. - state = self.state.load(Relaxed); - count = 0; + // Wait until the node is removed from the queue. + // SAFETY: the node was created by the current thread. + unsafe { + node.wait(); } + + // The node was removed from the queue, disarm the guard. + mem::forget(guard); + + // Reload the state and try again. + state = self.state.load(Relaxed); + count = 0; } } @@ -402,6 +424,7 @@ impl RwLock { pub unsafe fn read_unlock(&self) { match self.state.fetch_update(Release, Acquire, |state| { if state.addr() & QUEUED == 0 { + // If there are no threads queued, simply decrement the reader count. let count = state.addr() - (SINGLE | LOCKED); Some(if count > 0 { without_provenance_mut(count | LOCKED) } else { UNLOCKED }) } else { @@ -409,8 +432,7 @@ impl RwLock { } }) { Ok(_) => {} - // There are waiters queued and the lock count was moved to the - // tail of the queue. + // There are waiters queued and the lock count was moved to the tail of the queue. Err(state) => unsafe { self.read_unlock_contended(state) }, } } @@ -420,21 +442,21 @@ impl RwLock { // The state was observed with acquire ordering above, so the current // thread will observe all node initializations. - // SAFETY: - // Because new read-locks cannot be acquired while threads are queued, - // all queue-lock owners will observe the set `LOCKED` bit. Because they - // do not modify the queue while there is a lock owner, the queue will - // not be removed from here. - let tail = unsafe { add_backlinks_and_find_tail(to_node(state)).as_ref() }; + // FIXME this is a bit confusing + // SAFETY: Because new read-locks cannot be acquired while threads are queued, all + // queue-lock owners will observe the set `LOCKED` bit. And because no downgrade can be in + // progress (we checked above), they hence do not modify the queue, so the queue will not be + // removed from here. + let tail = unsafe { find_tail_and_add_backlinks(to_node(state)).as_ref() }; + // The lock count is stored in the `next` field of `tail`. - // Decrement it, making sure to observe all changes made to the queue - // by the other lock owners by using acquire-release ordering. + // Decrement it, making sure to observe all changes made to the queue by the other lock + // owners by using acquire-release ordering. let was_last = tail.next.0.fetch_byte_sub(SINGLE, AcqRel).addr() - SINGLE == 0; if was_last { - // SAFETY: - // Other threads cannot read-lock while threads are queued. Also, - // the `LOCKED` bit is still set, so there are no writers. Therefore, - // the current thread exclusively owns the lock. + // SAFETY: Other threads cannot read-lock while threads are queued. Also, the `LOCKED` + // bit is still set, so there are no writers. Thus the current thread exclusively owns + // this lock, even though it is a reader. unsafe { self.unlock_contended(state) } } } @@ -444,14 +466,14 @@ impl RwLock { if let Err(state) = self.state.compare_exchange(without_provenance_mut(LOCKED), UNLOCKED, Release, Relaxed) { - // SAFETY: - // Since other threads cannot acquire the lock, the state can only - // have changed because there are threads queued on the lock. + // SAFETY: Since other threads cannot acquire the lock, the state can only have changed + // because there are threads queued on the lock. unsafe { self.unlock_contended(state) } } } /// # Safety + /// /// * The lock must be exclusively owned by this thread. /// * There must be threads queued on the lock. #[cold] @@ -477,12 +499,13 @@ impl RwLock { /// thread(s). /// /// # Safety + /// /// The queue lock must be held by the current thread. unsafe fn unlock_queue(&self, mut state: State) { debug_assert_eq!(state.addr() & (QUEUED | QUEUE_LOCKED), QUEUED | QUEUE_LOCKED); loop { - let tail = unsafe { add_backlinks_and_find_tail(to_node(state)) }; + let tail = unsafe { find_tail_and_add_backlinks(to_node(state)) }; if state.addr() & LOCKED == LOCKED { // Another thread has locked the lock. Leave waking up waiters From 26b5a1485e60a1ea7fd7e638fd59ec6b25fbc3d0 Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Fri, 25 Oct 2024 15:01:57 -0400 Subject: [PATCH 06/10] add `downgrade` to `queue` implementation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit adds the `downgrade` method onto the inner `RwLock` queue implementation. There are also a few other style patches included in this commit. Co-authored-by: Jonas Böttiger --- library/std/src/sys/sync/rwlock/queue.rs | 276 +++++++++++++++++------ 1 file changed, 206 insertions(+), 70 deletions(-) diff --git a/library/std/src/sys/sync/rwlock/queue.rs b/library/std/src/sys/sync/rwlock/queue.rs index 5879d1f84154f..77a5ee23309be 100644 --- a/library/std/src/sys/sync/rwlock/queue.rs +++ b/library/std/src/sys/sync/rwlock/queue.rs @@ -40,16 +40,16 @@ //! //! ## State //! -//! A single [`AtomicPtr`] is used as state variable. The lowest three bits are used to indicate the +//! A single [`AtomicPtr`] is used as state variable. The lowest four bits are used to indicate the //! meaning of the remaining bits: //! -//! | [`LOCKED`] | [`QUEUED`] | [`QUEUE_LOCKED`] | Remaining | | -//! |:-----------|:-----------|:-----------------|:-------------|:----------------------------------------------------------------------------------------------------------------------------| -//! | 0 | 0 | 0 | 0 | The lock is unlocked, no threads are waiting | -//! | 1 | 0 | 0 | 0 | The lock is write-locked, no threads waiting | -//! | 1 | 0 | 0 | n > 0 | The lock is read-locked with n readers | -//! | 0 | 1 | * | `*mut Node` | The lock is unlocked, but some threads are waiting. Only writers may lock the lock | -//! | 1 | 1 | * | `*mut Node` | The lock is locked, but some threads are waiting. If the lock is read-locked, the last queue node contains the reader count | +//! | [`LOCKED`] | [`QUEUED`] | [`QUEUE_LOCKED`] | [`DOWNGRADED`] | Remaining | | +//! |------------|:-----------|:-----------------|:---------------|:-------------|:----------------------------------------------------------------------------------------------------------------------------| +//! | 0 | 0 | 0 | 0 | 0 | The lock is unlocked, no threads are waiting | +//! | 1 | 0 | 0 | 0 | 0 | The lock is write-locked, no threads waiting | +//! | 1 | 0 | 0 | 0 | n > 0 | The lock is read-locked with n readers | +//! | 0 | 1 | * | 0 | `*mut Node` | The lock is unlocked, but some threads are waiting. Only writers may lock the lock | +//! | 1 | 1 | * | * | `*mut Node` | The lock is locked, but some threads are waiting. If the lock is read-locked, the last queue node contains the reader count | //! //! ## Waiter Queue //! @@ -100,9 +100,9 @@ //! wake up the next waiter(s). //! //! `QUEUE_LOCKED` is set atomically at the same time as the enqueuing/unlocking operations. The -//! thread releasing the `QUEUE_LOCK` bit will check the state of the lock and wake up waiters as -//! appropriate. This guarantees forward progress even if the unlocking thread could not acquire the -//! queue lock. +//! thread releasing the `QUEUE_LOCKED` bit will check the state of the lock (in particular, whether +//! a downgrade was requested using the [`DOWNGRADED`] bit) and wake up waiters as appropriate. This +//! guarantees forward progress even if the unlocking thread could not acquire the queue lock. //! //! ## Memory Orderings //! @@ -129,8 +129,10 @@ const UNLOCKED: State = without_provenance_mut(0); const LOCKED: usize = 1 << 0; const QUEUED: usize = 1 << 1; const QUEUE_LOCKED: usize = 1 << 2; -const SINGLE: usize = 1 << 3; -const NODE_MASK: usize = !(QUEUE_LOCKED | QUEUED | LOCKED); +const DOWNGRADED: usize = 1 << 3; +const SINGLE: usize = 1 << 4; +const STATE: usize = DOWNGRADED | QUEUE_LOCKED | QUEUED | LOCKED; +const NODE_MASK: usize = !STATE; /// Locking uses exponential backoff. `SPIN_COUNT` indicates how many times the locking operation /// will be retried. @@ -141,8 +143,7 @@ const SPIN_COUNT: usize = 7; /// Marks the state as write-locked, if possible. #[inline] fn write_lock(state: State) -> Option { - let state = state.wrapping_byte_add(LOCKED); - if state.addr() & LOCKED == LOCKED { Some(state) } else { None } + if state.addr() & LOCKED == 0 { Some(state.map_addr(|addr| addr | LOCKED)) } else { None } } /// Marks the state as read-locked, if possible. @@ -169,7 +170,11 @@ unsafe fn to_node(state: State) -> NonNull { /// The representation of a thread waiting on the lock queue. /// /// We initialize these `Node`s on thread execution stacks to avoid allocation. -#[repr(align(8))] +/// +/// Note that we need an alignment of 16 to ensure that the last 4 bits of any +/// pointers to `Node`s are always zeroed (for the bit flags described in the +/// module-level documentation). +#[repr(align(16))] struct Node { next: AtomicLink, prev: AtomicLink, @@ -255,7 +260,7 @@ impl Node { /// # Safety /// /// * `head` must point to a node in a valid queue. -/// * `head` must be in front of the head of the queue at the time of the last removal. +/// * `head` must be in front of the previous head node that was used to perform the last removal. /// * The part of the queue starting with `head` must not be modified during this call. unsafe fn find_tail_and_add_backlinks(head: NonNull) -> NonNull { let mut current = head; @@ -282,6 +287,28 @@ unsafe fn find_tail_and_add_backlinks(head: NonNull) -> NonNull { } } +/// [`complete`](Node::complete)s all threads in the queue ending with `tail`. +/// +/// # Safety +/// +/// * `tail` must be a valid tail of a fully linked queue. +/// * The current thread must have exclusive access to that queue. +unsafe fn complete_all(tail: NonNull) { + let mut current = tail; + + // Traverse backwards through the queue (FIFO) and `complete` all of the nodes. + loop { + let prev = unsafe { current.as_ref().prev.get() }; + unsafe { + Node::complete(current); + } + match prev { + Some(prev) => current = prev, + None => return, + } + } +} + /// A type to guard against the unwinds of stacks that nodes are located on due to panics. struct PanicGuard; @@ -332,10 +359,11 @@ impl RwLock { #[cold] fn lock_contended(&self, write: bool) { - let update_fn = if write { write_lock } else { read_lock }; let mut node = Node::new(write); let mut state = self.state.load(Relaxed); let mut count = 0; + let update_fn = if write { write_lock } else { read_lock }; + loop { // Optimistically update the state. if let Some(next) = update_fn(state) { @@ -372,6 +400,7 @@ impl RwLock { .map_addr(|addr| addr | QUEUED | (state.addr() & LOCKED)) as State; + let mut is_queue_locked = false; if state.addr() & QUEUED == 0 { // If this is the first node in the queue, set the `tail` field to the node itself // to ensure there is a valid `tail` field in the queue (Invariants 1 & 2). @@ -383,6 +412,9 @@ impl RwLock { // Try locking the queue to eagerly add backlinks. next = next.map_addr(|addr| addr | QUEUE_LOCKED); + + // Track if we changed the `QUEUE_LOCKED` bit from off to on. + is_queue_locked = state.addr() & QUEUE_LOCKED == 0; } // Register the node, using release ordering to propagate our changes to the waking @@ -398,8 +430,9 @@ impl RwLock { // Guard against unwinds using a `PanicGuard` that aborts when dropped. let guard = PanicGuard; - // If the current thread locked the queue, unlock it to eagerly add backlinks. - if state.addr() & (QUEUE_LOCKED | QUEUED) == QUEUED { + // If the current thread locked the queue, unlock it to eagerly adding backlinks. + if is_queue_locked { + // SAFETY: This thread set the `QUEUE_LOCKED` bit above. unsafe { self.unlock_queue(next); } @@ -427,6 +460,12 @@ impl RwLock { // If there are no threads queued, simply decrement the reader count. let count = state.addr() - (SINGLE | LOCKED); Some(if count > 0 { without_provenance_mut(count | LOCKED) } else { UNLOCKED }) + } else if state.addr() & DOWNGRADED != 0 { + // This thread used to have exclusive access, but requested a downgrade. This has + // not been completed yet, so we still have exclusive access. + // Retract the downgrade request and unlock, but leave waking up new threads to the + // thread that already holds the queue lock. + Some(state.mask(!(DOWNGRADED | LOCKED))) } else { None } @@ -476,40 +515,127 @@ impl RwLock { /// /// * The lock must be exclusively owned by this thread. /// * There must be threads queued on the lock. + /// * There cannot be a `downgrade` in progress. #[cold] - unsafe fn unlock_contended(&self, mut state: State) { + unsafe fn unlock_contended(&self, state: State) { + debug_assert!(state.addr() & STATE == (QUEUED | LOCKED)); + + let mut current = state; + + // We want to atomically release the lock and try to acquire the queue lock. loop { + // First check if the queue lock is already held. + if current.addr() & QUEUE_LOCKED != 0 { + // Another thread holds the queue lock, so let them wake up waiters for us. + let next = current.mask(!LOCKED); + match self.state.compare_exchange_weak(current, next, Release, Relaxed) { + Ok(_) => return, + Err(new) => { + current = new; + continue; + } + } + } + // Atomically release the lock and try to acquire the queue lock. - let next = state.map_addr(|a| (a & !LOCKED) | QUEUE_LOCKED); - match self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) { - // The queue lock was acquired. Release it, waking up the next - // waiter in the process. - Ok(_) if state.addr() & QUEUE_LOCKED == 0 => unsafe { - return self.unlock_queue(next); - }, - // Another thread already holds the queue lock, leave waking up - // waiters to it. - Ok(_) => return, - Err(new) => state = new, + let next = current.map_addr(|addr| (addr & !LOCKED) | QUEUE_LOCKED); + match self.state.compare_exchange_weak(current, next, AcqRel, Relaxed) { + Ok(_) => { + // Now that we have the queue lock, we can wake up the next waiter. + // SAFETY: This thread is exclusively owned by this thread. + unsafe { self.unlock_queue(next) }; + return; + } + Err(new) => current = new, + } + } + } + + /// # Safety + /// + /// * The lock must be write-locked by this thread. + #[inline] + pub unsafe fn downgrade(&self) { + // Optimistically change the state from write-locked with a single writer and no waiters to + // read-locked with a single reader and no waiters. + if let Err(state) = self.state.compare_exchange( + without_provenance_mut(LOCKED), + without_provenance_mut(SINGLE | LOCKED), + Release, + Relaxed, + ) { + // SAFETY: The only way the state can have changed is if there are threads queued. + // Wake all of them up. + unsafe { self.downgrade_slow(state) } + } + } + + /// Downgrades the lock from write-locked to read-locked in the case that there are threads + /// waiting on the wait queue. + /// + /// This function will either wake up all of the waiters on the wait queue or designate the + /// current holder of the queue lock to wake up all of the waiters instead. Once the waiters + /// wake up, they will continue in the execution loop of `lock_contended`. + /// + /// # Safety + /// + /// * The lock must be write-locked by this thread. + /// * There must be threads queued on the lock. + #[cold] + unsafe fn downgrade_slow(&self, mut state: State) { + debug_assert!(state.addr() & (DOWNGRADED | QUEUED | LOCKED) == (QUEUED | LOCKED)); + + // Attempt to wake up all waiters by taking ownership of the entire waiter queue. + loop { + if state.addr() & QUEUE_LOCKED != 0 { + // Another thread already holds the queue lock. Tell it to wake up all waiters. + // If the other thread succeeds in waking up waiters before we release our lock, the + // effect will be just the same as if we had changed the state below. + // Otherwise, the `DOWNGRADED` bit will still be set, meaning that when this thread + // calls `read_unlock` later (because it holds a read lock and must unlock + // eventually), it will realize that the lock is still exclusively locked and act + // accordingly. + let next = state.map_addr(|addr| addr | DOWNGRADED); + match self.state.compare_exchange_weak(state, next, Release, Relaxed) { + Ok(_) => return, + Err(new) => state = new, + } + } else { + // Grab the entire queue by swapping the `state` with a single reader. + let next = ptr::without_provenance_mut(SINGLE | LOCKED); + if let Err(new) = self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) { + state = new; + continue; + } + + // SAFETY: We have full ownership of this queue now, so nobody else can modify it. + let tail = unsafe { find_tail_and_add_backlinks(to_node(state)) }; + + // Wake up all waiters. + // SAFETY: `tail` was just computed, meaning the whole queue is linked. + unsafe { complete_all(tail) }; + + return; } } } - /// Unlocks the queue. If the lock is unlocked, wakes up the next eligible - /// thread(s). + /// Unlocks the queue. Wakes up all threads if a downgrade was requested, otherwise wakes up the + /// next eligible thread(s) if the lock is unlocked. /// /// # Safety /// - /// The queue lock must be held by the current thread. + /// * The queue lock must be held by the current thread. + /// * There must be threads queued on the lock. unsafe fn unlock_queue(&self, mut state: State) { debug_assert_eq!(state.addr() & (QUEUED | QUEUE_LOCKED), QUEUED | QUEUE_LOCKED); loop { let tail = unsafe { find_tail_and_add_backlinks(to_node(state)) }; - if state.addr() & LOCKED == LOCKED { - // Another thread has locked the lock. Leave waking up waiters - // to them by releasing the queue lock. + if state.addr() & (DOWNGRADED | LOCKED) == LOCKED { + // Another thread has locked the lock and no downgrade was requested. + // Leave waking up waiters to them by releasing the queue lock. match self.state.compare_exchange_weak( state, state.mask(!QUEUE_LOCKED), @@ -524,53 +650,63 @@ impl RwLock { } } + // Since we hold the queue lock and downgrades cannot be requested if the lock is + // already read-locked, we have exclusive control over the queue here and can make + // modifications. + + let downgrade = state.addr() & DOWNGRADED != 0; let is_writer = unsafe { tail.as_ref().write }; - if is_writer && let Some(prev) = unsafe { tail.as_ref().prev.get() } { - // `tail` is a writer and there is a node before `tail`. - // Split off `tail`. + if !downgrade + && is_writer + && let Some(prev) = unsafe { tail.as_ref().prev.get() } + { + // If we are not downgrading and the next thread is a writer, only wake up that + // writing thread. - // There are no set `tail` links before the node pointed to by - // `state`, so the first non-null tail field will be current - // (invariant 2). Invariant 4 is fullfilled since `find_tail` - // was called on this node, which ensures all backlinks are set. + // Split off `tail`. + // There are no set `tail` links before the node pointed to by `state`, so the first + // non-null tail field will be current (Invariant 2). + // We also fulfill Invariant 4 since `find_tail` was called on this node, which + // ensures all backlinks are set. unsafe { to_node(state).as_ref().tail.set(Some(prev)); } - // Release the queue lock. Doing this by subtraction is more - // efficient on modern processors since it is a single instruction - // instead of an update loop, which will fail if new threads are - // added to the list. - self.state.fetch_byte_sub(QUEUE_LOCKED, Release); + // Try to release the queue lock. We need to check the state again since another + // thread might have acquired the lock and requested a downgrade. + let next = state.mask(!QUEUE_LOCKED); + if let Err(new) = self.state.compare_exchange_weak(state, next, Release, Acquire) { + // Undo the tail modification above, so that we can find the tail again above. + // As mentioned above, we have exclusive control over the queue, so no other + // thread could have noticed the change. + unsafe { + to_node(state).as_ref().tail.set(Some(tail)); + } + state = new; + continue; + } - // The tail was split off and the lock released. Mark the node as - // completed. + // The tail was split off and the lock was released. Mark the node as completed. unsafe { return Node::complete(tail); } } else { - // The next waiter is a reader or the queue only consists of one - // waiter. Just wake all threads. - - // The lock cannot be locked (checked above), so mark it as - // unlocked to reset the queue. - if let Err(new) = - self.state.compare_exchange_weak(state, UNLOCKED, Release, Acquire) - { + // We are either downgrading, the next waiter is a reader, or the queue only + // consists of one waiter. In any case, just wake all threads. + + // Clear the queue. + let next = + if downgrade { ptr::without_provenance_mut(SINGLE | LOCKED) } else { UNLOCKED }; + if let Err(new) = self.state.compare_exchange_weak(state, next, Release, Acquire) { state = new; continue; } - let mut current = tail; - loop { - let prev = unsafe { current.as_ref().prev.get() }; - unsafe { - Node::complete(current); - } - match prev { - Some(prev) => current = prev, - None => return, - } + // SAFETY: we computed `tail` above, and no new nodes can have been added since + // (otherwise the CAS above would have failed). + // Thus we have complete control over the whole queue. + unsafe { + return complete_all(tail); } } } From 3d191b50d2b5a4b17eda2f8cccaa5088ef56b8af Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Mon, 28 Oct 2024 20:02:14 -0400 Subject: [PATCH 07/10] add safety comments for queue implementation --- library/std/src/sys/sync/rwlock/queue.rs | 38 ++++++++++++++++-------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/library/std/src/sys/sync/rwlock/queue.rs b/library/std/src/sys/sync/rwlock/queue.rs index 77a5ee23309be..c654fca0d6e96 100644 --- a/library/std/src/sys/sync/rwlock/queue.rs +++ b/library/std/src/sys/sync/rwlock/queue.rs @@ -476,16 +476,22 @@ impl RwLock { } } + /// # Safety + /// + /// * There must be threads queued on the lock. + /// * `state` must be a pointer to a node in a valid queue. + /// * There cannot be a `downgrade` in progress. #[cold] unsafe fn read_unlock_contended(&self, state: State) { - // The state was observed with acquire ordering above, so the current - // thread will observe all node initializations. - - // FIXME this is a bit confusing - // SAFETY: Because new read-locks cannot be acquired while threads are queued, all - // queue-lock owners will observe the set `LOCKED` bit. And because no downgrade can be in - // progress (we checked above), they hence do not modify the queue, so the queue will not be - // removed from here. + // SAFETY: + // The state was observed with acquire ordering above, so the current thread will have + // observed all node initializations. + // We also know that no threads can be modifying the queue starting at `state`: because new + // read-locks cannot be acquired while there are any threads queued on the lock, all + // queue-lock owners will observe a set `LOCKED` bit in `self.state` and will not modify + // the queue. The other case that a thread could modify the queue is if a downgrade is in + // progress (removal of the entire queue), but since that is part of this function's safety + // contract, we can guarantee that no other threads can modify the queue. let tail = unsafe { find_tail_and_add_backlinks(to_node(state)).as_ref() }; // The lock count is stored in the `next` field of `tail`. @@ -515,10 +521,11 @@ impl RwLock { /// /// * The lock must be exclusively owned by this thread. /// * There must be threads queued on the lock. + /// * `state` must be a pointer to a node in a valid queue. /// * There cannot be a `downgrade` in progress. #[cold] unsafe fn unlock_contended(&self, state: State) { - debug_assert!(state.addr() & STATE == (QUEUED | LOCKED)); + debug_assert_eq!(state.addr() & (DOWNGRADED | QUEUED | LOCKED), QUEUED | LOCKED); let mut current = state; @@ -540,9 +547,10 @@ impl RwLock { // Atomically release the lock and try to acquire the queue lock. let next = current.map_addr(|addr| (addr & !LOCKED) | QUEUE_LOCKED); match self.state.compare_exchange_weak(current, next, AcqRel, Relaxed) { + // Now that we have the queue lock, we can wake up the next waiter. Ok(_) => { - // Now that we have the queue lock, we can wake up the next waiter. - // SAFETY: This thread is exclusively owned by this thread. + // SAFETY: This thread just acquired the queue lock, and this function's safety + // contract requires that there are threads already queued on the lock. unsafe { self.unlock_queue(next) }; return; } @@ -580,10 +588,11 @@ impl RwLock { /// # Safety /// /// * The lock must be write-locked by this thread. + /// * `state` must be a pointer to a node in a valid queue. /// * There must be threads queued on the lock. #[cold] unsafe fn downgrade_slow(&self, mut state: State) { - debug_assert!(state.addr() & (DOWNGRADED | QUEUED | LOCKED) == (QUEUED | LOCKED)); + debug_assert_eq!(state.addr() & (DOWNGRADED | QUEUED | LOCKED), QUEUED | LOCKED); // Attempt to wake up all waiters by taking ownership of the entire waiter queue. loop { @@ -612,7 +621,8 @@ impl RwLock { let tail = unsafe { find_tail_and_add_backlinks(to_node(state)) }; // Wake up all waiters. - // SAFETY: `tail` was just computed, meaning the whole queue is linked. + // SAFETY: `tail` was just computed, meaning the whole queue is linked, and we have + // full ownership of the queue, so we have exclusive access. unsafe { complete_all(tail) }; return; @@ -626,11 +636,13 @@ impl RwLock { /// # Safety /// /// * The queue lock must be held by the current thread. + /// * `state` must be a pointer to a node in a valid queue. /// * There must be threads queued on the lock. unsafe fn unlock_queue(&self, mut state: State) { debug_assert_eq!(state.addr() & (QUEUED | QUEUE_LOCKED), QUEUED | QUEUE_LOCKED); loop { + // SAFETY: Since we have the queue lock, nobody else can be modifying the queue. let tail = unsafe { find_tail_and_add_backlinks(to_node(state)) }; if state.addr() & (DOWNGRADED | LOCKED) == LOCKED { From 84fd95cbedf1e1c1826e378ffc4d1a3d335deff4 Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Fri, 8 Nov 2024 18:13:18 -0500 Subject: [PATCH 08/10] fix memory ordering bug + bad test This commit fixes a memory ordering bug in the futex implementation (`Relaxed` -> `Release` on `downgrade`). This commit also removes a badly written test that deadlocked and replaces it with a more reasonable test based on an already-tested `downgrade` test from the parking-lot crate. --- library/std/src/sync/rwlock/tests.rs | 79 ++++++++++-------------- library/std/src/sys/sync/rwlock/futex.rs | 2 +- 2 files changed, 33 insertions(+), 48 deletions(-) diff --git a/library/std/src/sync/rwlock/tests.rs b/library/std/src/sync/rwlock/tests.rs index a4af49dc82cce..4f30ef3cac398 100644 --- a/library/std/src/sync/rwlock/tests.rs +++ b/library/std/src/sync/rwlock/tests.rs @@ -3,8 +3,8 @@ use rand::Rng; use crate::sync::atomic::{AtomicUsize, Ordering}; use crate::sync::mpsc::channel; use crate::sync::{ - Arc, Barrier, MappedRwLockReadGuard, MappedRwLockWriteGuard, RwLock, RwLockReadGuard, - RwLockWriteGuard, TryLockError, + Arc, MappedRwLockReadGuard, MappedRwLockWriteGuard, RwLock, RwLockReadGuard, RwLockWriteGuard, + TryLockError, }; use crate::thread; @@ -511,57 +511,42 @@ fn test_downgrade_basic() { } #[test] -fn test_downgrade_readers() { - // This test creates 1 writing thread and `R` reader threads doing `N` iterations. - const R: usize = 10; - const N: usize = if cfg!(target_pointer_width = "64") { 100 } else { 20 }; +fn test_downgrade_observe() { + // Taken from the test `test_rwlock_downgrade` from: + // https://github.com/Amanieu/parking_lot/blob/master/src/rwlock.rs - // The writer thread will constantly update the value inside the `RwLock`, and this test will - // only pass if every reader observes all values between 0 and `N`. - let rwlock = Arc::new(RwLock::new(0)); - let barrier = Arc::new(Barrier::new(R + 1)); - - // Create the writing thread. - let r_writer = rwlock.clone(); - let b_writer = barrier.clone(); - thread::spawn(move || { - for i in 0..N { - let mut write_guard = r_writer.write().unwrap(); - *write_guard = i; + const W: usize = if cfg!(target_pointer_width = "64") { 100 } else { 20 }; + const N: usize = 100; - let read_guard = RwLockWriteGuard::downgrade(write_guard); - assert_eq!(*read_guard, i); + // This test spawns `W` writer threads, where each will increment a counter `N` times, ensuring + // that the value they wrote has not changed after downgrading. - // Wait for all readers to observe the new value. - b_writer.wait(); - } - }); + let rw = Arc::new(RwLock::new(0)); - for _ in 0..R { - let rwlock = rwlock.clone(); - let barrier = barrier.clone(); - thread::spawn(move || { - // Every reader thread needs to observe every value up to `N`. - for i in 0..N { - let read_guard = rwlock.read().unwrap(); - assert_eq!(*read_guard, i); - drop(read_guard); - - // Wait for everyone to read and for the writer to change the value again. - barrier.wait(); - - // Spin until the writer has changed the value. - loop { - let read_guard = rwlock.read().unwrap(); - assert!(*read_guard >= i); - - if *read_guard > i { - break; - } + // Spawn the writers that will do `W * N` operations and checks. + let handles: Vec<_> = (0..W) + .map(|_| { + let rw = rw.clone(); + thread::spawn(move || { + for _ in 0..N { + // Increment the counter. + let mut write_guard = rw.write().unwrap(); + *write_guard += 1; + let cur_val = *write_guard; + + // Downgrade the lock to read mode, where the value protected cannot be modified. + let read_guard = RwLockWriteGuard::downgrade(write_guard); + assert_eq!(cur_val, *read_guard); } - } - }); + }) + }) + .collect(); + + for handle in handles { + handle.join().unwrap(); } + + assert_eq!(*rw.read().unwrap(), W * N); } #[test] diff --git a/library/std/src/sys/sync/rwlock/futex.rs b/library/std/src/sys/sync/rwlock/futex.rs index e0f4a91e0731b..961819cae8d6e 100644 --- a/library/std/src/sys/sync/rwlock/futex.rs +++ b/library/std/src/sys/sync/rwlock/futex.rs @@ -195,7 +195,7 @@ impl RwLock { #[inline] pub unsafe fn downgrade(&self) { // Removes all write bits and adds a single read bit. - let state = self.state.fetch_add(DOWNGRADE, Relaxed); + let state = self.state.fetch_add(DOWNGRADE, Release); debug_assert!(is_write_locked(state), "RwLock must be write locked to call `downgrade`"); if has_readers_waiting(state) { From 782b07e1ff4a4f8ecf00d5e030dfead9222d2185 Mon Sep 17 00:00:00 2001 From: Connor Tsui <87130162+connortsui20@users.noreply.github.com> Date: Sat, 16 Nov 2024 12:22:35 -0500 Subject: [PATCH 09/10] fix `DOWNGRADED` bit unpreserved MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jonas Böttiger --- library/std/src/sync/rwlock/tests.rs | 2 +- library/std/src/sys/sync/rwlock/queue.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/library/std/src/sync/rwlock/tests.rs b/library/std/src/sync/rwlock/tests.rs index 4f30ef3cac398..5413a062cef35 100644 --- a/library/std/src/sync/rwlock/tests.rs +++ b/library/std/src/sync/rwlock/tests.rs @@ -558,7 +558,7 @@ fn test_downgrade_atomic() { // modify the protected value. // `W` is the number of evil writer threads. - const W: usize = if cfg!(target_pointer_width = "64") { 100 } else { 20 }; + const W: usize = 20; let rwlock = Arc::new(RwLock::new(0)); // Spawns many evil writer threads that will try and write to the locked value before the diff --git a/library/std/src/sys/sync/rwlock/queue.rs b/library/std/src/sys/sync/rwlock/queue.rs index c654fca0d6e96..51330f8fafe57 100644 --- a/library/std/src/sys/sync/rwlock/queue.rs +++ b/library/std/src/sys/sync/rwlock/queue.rs @@ -395,9 +395,9 @@ impl RwLock { node.next.0 = AtomicPtr::new(state.mask(NODE_MASK).cast()); node.prev = AtomicLink::new(None); - // Set the `QUEUED` bit and maintain the `LOCKED` bit. + // Set the `QUEUED` bit and preserve the `LOCKED` and `DOWNGRADED` bit. let mut next = ptr::from_ref(&node) - .map_addr(|addr| addr | QUEUED | (state.addr() & LOCKED)) + .map_addr(|addr| addr | QUEUED | (state.addr() & (DOWNGRADED | LOCKED))) as State; let mut is_queue_locked = false; From fc52cdd9a8b9c2b2e21b0976373e6b54f4c3b5fa Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Sat, 16 Nov 2024 12:24:33 -0500 Subject: [PATCH 10/10] reduce threads in downgrade test --- library/std/src/sync/rwlock/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/library/std/src/sync/rwlock/tests.rs b/library/std/src/sync/rwlock/tests.rs index 5413a062cef35..02ac1c85b9124 100644 --- a/library/std/src/sync/rwlock/tests.rs +++ b/library/std/src/sync/rwlock/tests.rs @@ -515,7 +515,7 @@ fn test_downgrade_observe() { // Taken from the test `test_rwlock_downgrade` from: // https://github.com/Amanieu/parking_lot/blob/master/src/rwlock.rs - const W: usize = if cfg!(target_pointer_width = "64") { 100 } else { 20 }; + const W: usize = 20; const N: usize = 100; // This test spawns `W` writer threads, where each will increment a counter `N` times, ensuring