diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index 872d53e0891..9b43404f918 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -19,6 +19,7 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::{Mutex, MutexGuard}; use crate::util::linked_list::{self, LinkedList}; +use crate::util::WakeList; use std::future::Future; use std::marker::PhantomPinned; @@ -239,12 +240,12 @@ impl Semaphore { /// If `rem` exceeds the number of permits needed by the wait list, the /// remainder are assigned back to the semaphore. fn add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>) { - let mut wakers: [Option; 8] = Default::default(); + let mut wakers = WakeList::new(); let mut lock = Some(waiters); let mut is_empty = false; while rem > 0 { let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock()); - 'inner: for slot in &mut wakers[..] { + 'inner: while wakers.can_push() { // Was the waiter assigned enough permits to wake it? match waiters.queue.last() { Some(waiter) => { @@ -260,7 +261,11 @@ impl Semaphore { } }; let mut waiter = waiters.queue.pop_back().unwrap(); - *slot = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) }; + if let Some(waker) = + unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) } + { + wakers.push(waker); + } } if rem > 0 && is_empty { @@ -283,10 +288,7 @@ impl Semaphore { drop(waiters); // release the lock - wakers - .iter_mut() - .filter_map(Option::take) - .for_each(Waker::wake); + wakers.wake_all(); } assert_eq!(rem, 0); diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 2ea63591481..74b97cc481c 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -8,6 +8,7 @@ use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Mutex; use crate::util::linked_list::{self, LinkedList}; +use crate::util::WakeList; use std::cell::UnsafeCell; use std::future::Future; @@ -391,10 +392,7 @@ impl Notify { /// } /// ``` pub fn notify_waiters(&self) { - const NUM_WAKERS: usize = 32; - - let mut wakers: [Option; NUM_WAKERS] = Default::default(); - let mut curr_waker = 0; + let mut wakers = WakeList::new(); // There are waiters, the lock must be acquired to notify. let mut waiters = self.waiters.lock(); @@ -414,7 +412,7 @@ impl Notify { // concurrently change, as holding the lock is required to // transition **out** of `WAITING`. 'outer: loop { - while curr_waker < NUM_WAKERS { + while wakers.can_push() { match waiters.pop_back() { Some(mut waiter) => { // Safety: `waiters` lock is still held. @@ -425,8 +423,7 @@ impl Notify { waiter.notified = Some(NotificationType::AllWaiters); if let Some(waker) = waiter.waker.take() { - wakers[curr_waker] = Some(waker); - curr_waker += 1; + wakers.push(waker); } } None => { @@ -437,11 +434,7 @@ impl Notify { drop(waiters); - for waker in wakers.iter_mut().take(curr_waker) { - waker.take().unwrap().wake(); - } - - curr_waker = 0; + wakers.wake_all(); // Acquire the lock again. waiters = self.waiters.lock(); @@ -456,9 +449,7 @@ impl Notify { // Release the lock before notifying drop(waiters); - for waker in wakers.iter_mut().take(curr_waker) { - waker.take().unwrap().wake(); - } + wakers.wake_all(); } } diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index 300e06bc49e..df30f2b86a9 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -1,11 +1,31 @@ cfg_io_driver! { pub(crate) mod bit; pub(crate) mod slab; - - mod wake_list; - pub(crate) use wake_list::WakeList; } +#[cfg(any( + // io driver uses `WakeList` directly + feature = "net", + feature = "process", + // `sync` enables `Notify` and `batch_semaphore`, which require `WakeList`. + feature = "sync", + // `fs` uses `batch_semaphore`, which requires `WakeList`. + feature = "fs", + // rt and signal use `Notify`, which requires `WakeList`. + feature = "rt", + feature = "signal", +))] +mod wake_list; +#[cfg(any( + feature = "net", + feature = "process", + feature = "sync", + feature = "fs", + feature = "rt", + feature = "signal", +))] +pub(crate) use wake_list::WakeList; + #[cfg(any( feature = "fs", feature = "net",