diff --git a/CHANGELOG.md b/CHANGELOG.md index ca8ebc0..61928a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +# Version 1.12.0 + +- Add static executors, which are an optimization over executors that are kept + around forever. (#112) + # Version 1.11.0 - Re-export the `async_task::FallibleTask` primitive. (#113) @@ -58,7 +63,7 @@ # Version 1.5.1 -- Implement a better form of debug output for Executor and LocalExecutor. (#33) +- Implement a better form of debug output for Executor and LocalExecutor. (#33) # Version 1.5.0 diff --git a/Cargo.toml b/Cargo.toml index 16d33bc..d93727c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ name = "async-executor" # When publishing a new version: # - Update CHANGELOG.md # - Create "v1.x.y" git tag -version = "1.11.0" +version = "1.12.0" authors = ["Stjepan Glavina ", "John Nunley "] edition = "2021" rust-version = "1.63" diff --git a/src/lib.rs b/src/lib.rs index 7c5d49d..dc5ea82 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,11 +39,14 @@ )] #![cfg_attr(docsrs, feature(doc_auto_cfg))] +use std::cell::{Cell, RefCell}; +use std::cmp::Reverse; +use std::collections::VecDeque; use std::fmt; use std::marker::PhantomData; use std::panic::{RefUnwindSafe, UnwindSafe}; use std::rc::Rc; -use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, RwLock, TryLockError}; use std::task::{Poll, Waker}; @@ -347,8 +350,32 @@ impl<'a> Executor<'a> { fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { let state = self.state_as_arc(); - // TODO: If possible, push into the current local queue and notify the ticker. - move |runnable| { + move |mut runnable| { + // If possible, push into the current local queue and notify the ticker. + if let Some(local_queue) = state + .local_queues + .read() + .unwrap() + .get(thread_id()) + .and_then(|list| list.as_ref()) + { + match local_queue.queue.push(runnable) { + Ok(()) => { + if let Some(waker) = state + .sleepers + .lock() + .unwrap() + .notify_runner(local_queue.runner_id) + { + waker.wake(); + } + return; + } + + Err(r) => runnable = r.into_inner(), + } + } + state.queue.push(runnable).unwrap(); state.notify(); } @@ -665,7 +692,10 @@ struct State { queue: ConcurrentQueue, /// Local queues created by runners. - local_queues: RwLock>>>, + /// + /// These are keyed by the thread that the runner originated in. See the `thread_id` function + /// for more information. + local_queues: RwLock>>>, /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping. notified: AtomicBool, @@ -756,36 +786,57 @@ struct Sleepers { /// IDs and wakers of sleeping unnotified tickers. /// /// A sleeping ticker is notified when its waker is missing from this list. - wakers: Vec<(usize, Waker)>, + wakers: Vec, /// Reclaimed IDs. free_ids: Vec, } +/// A single sleeping ticker. +struct Sleeper { + /// ID of the sleeping ticker. + id: usize, + + /// Waker associated with this ticker. + waker: Waker, + + /// Specific runner ID for targeted wakeups. + runner: Option, +} + impl Sleepers { /// Inserts a new sleeping ticker. - fn insert(&mut self, waker: &Waker) -> usize { + fn insert(&mut self, waker: &Waker, runner: Option) -> usize { let id = match self.free_ids.pop() { Some(id) => id, None => self.count + 1, }; self.count += 1; - self.wakers.push((id, waker.clone())); + self.wakers.push(Sleeper { + id, + waker: waker.clone(), + runner, + }); id } /// Re-inserts a sleeping ticker's waker if it was notified. /// /// Returns `true` if the ticker was notified. - fn update(&mut self, id: usize, waker: &Waker) -> bool { + fn update(&mut self, id: usize, waker: &Waker, runner: Option) -> bool { for item in &mut self.wakers { - if item.0 == id { - item.1.clone_from(waker); + if item.id == id { + debug_assert_eq!(item.runner, runner); + item.waker.clone_from(waker); return false; } } - self.wakers.push((id, waker.clone())); + self.wakers.push(Sleeper { + id, + waker: waker.clone(), + runner, + }); true } @@ -797,7 +848,7 @@ impl Sleepers { self.free_ids.push(id); for i in (0..self.wakers.len()).rev() { - if self.wakers[i].0 == id { + if self.wakers[i].id == id { self.wakers.remove(i); return false; } @@ -815,7 +866,20 @@ impl Sleepers { /// If a ticker was notified already or there are no tickers, `None` will be returned. fn notify(&mut self) -> Option { if self.wakers.len() == self.count { - self.wakers.pop().map(|item| item.1) + self.wakers.pop().map(|item| item.waker) + } else { + None + } + } + + /// Notify a specific waker that was previously sleeping. + fn notify_runner(&mut self, runner: usize) -> Option { + if let Some(posn) = self + .wakers + .iter() + .position(|sleeper| sleeper.runner == Some(runner)) + { + Some(self.wakers.swap_remove(posn).waker) } else { None } @@ -834,12 +898,28 @@ struct Ticker<'a> { /// 2a) Sleeping and unnotified. /// 2b) Sleeping and notified. sleeping: usize, + + /// Unique runner ID, if this is a runner. + runner: Option, } impl Ticker<'_> { /// Creates a ticker. fn new(state: &State) -> Ticker<'_> { - Ticker { state, sleeping: 0 } + Ticker { + state, + sleeping: 0, + runner: None, + } + } + + /// Creates a ticker for a runner. + fn for_runner(state: &State, runner: usize) -> Ticker<'_> { + Ticker { + state, + sleeping: 0, + runner: Some(runner), + } } /// Moves the ticker into sleeping and unnotified state. @@ -851,12 +931,12 @@ impl Ticker<'_> { match self.sleeping { // Move to sleeping state. 0 => { - self.sleeping = sleepers.insert(waker); + self.sleeping = sleepers.insert(waker, self.runner); } // Already sleeping, check if notified. id => { - if !sleepers.update(id, waker) { + if !sleepers.update(id, waker, self.runner) { return false; } } @@ -946,8 +1026,13 @@ struct Runner<'a> { /// Inner ticker. ticker: Ticker<'a>, + /// The ID of the thread we originated from. + /// + /// This is `None` if we don't own the local runner for this thread. + origin_id: Option, + /// The local queue. - local: Arc>, + local: Arc, /// Bumped every time a runnable task is found. ticks: usize, @@ -956,17 +1041,46 @@ struct Runner<'a> { impl Runner<'_> { /// Creates a runner and registers it in the executor state. fn new(state: &State) -> Runner<'_> { - let runner = Runner { + static ID_GENERATOR: AtomicUsize = AtomicUsize::new(0); + let runner_id = ID_GENERATOR.fetch_add(1, Ordering::SeqCst); + + let origin_id = thread_id(); + let mut runner = Runner { state, - ticker: Ticker::new(state), - local: Arc::new(ConcurrentQueue::bounded(512)), + ticker: Ticker::for_runner(state, runner_id), + local: Arc::new(LocalQueue { + queue: ConcurrentQueue::bounded(512), + runner_id, + }), ticks: 0, + origin_id: Some(origin_id), }; - state + + // If this is the highest thread ID this executor has seen, make more slots. + let mut local_queues = state.local_queues.write().unwrap(); + if local_queues.len() <= origin_id { + local_queues.resize_with(origin_id + 1, || None); + } + + // Try to reserve the thread-local slot. + match state .local_queues .write() .unwrap() - .push(runner.local.clone()); + .get_mut(origin_id) + .unwrap() + { + slot @ None => { + // We won the race, insert our queue. + *slot = Some(runner.local.clone()); + } + + Some(_) => { + // We lost the race, indicate we don't own this ID. + runner.origin_id = None; + } + } + runner } @@ -976,13 +1090,13 @@ impl Runner<'_> { .ticker .runnable_with(|| { // Try the local queue. - if let Ok(r) = self.local.pop() { + if let Ok(r) = self.local.queue.pop() { return Some(r); } // Try stealing from the global queue. if let Ok(r) = self.state.queue.pop() { - steal(&self.state.queue, &self.local); + steal(&self.state.queue, &self.local.queue); return Some(r); } @@ -994,7 +1108,8 @@ impl Runner<'_> { let start = rng.usize(..n); let iter = local_queues .iter() - .chain(local_queues.iter()) + .filter_map(|list| list.as_ref()) + .chain(local_queues.iter().filter_map(|list| list.as_ref())) .skip(start) .take(n); @@ -1003,8 +1118,8 @@ impl Runner<'_> { // Try stealing from each local queue in the list. for local in iter { - steal(local, &self.local); - if let Ok(r) = self.local.pop() { + steal(&local.queue, &self.local.queue); + if let Ok(r) = self.local.queue.pop() { return Some(r); } } @@ -1018,7 +1133,7 @@ impl Runner<'_> { if self.ticks % 64 == 0 { // Steal tasks from the global queue to ensure fair task scheduling. - steal(&self.state.queue, &self.local); + steal(&self.state.queue, &self.local.queue); } runnable @@ -1028,19 +1143,32 @@ impl Runner<'_> { impl Drop for Runner<'_> { fn drop(&mut self) { // Remove the local queue. - self.state - .local_queues - .write() - .unwrap() - .retain(|local| !Arc::ptr_eq(local, &self.local)); + if let Some(origin_id) = self.origin_id { + *self + .state + .local_queues + .write() + .unwrap() + .get_mut(origin_id) + .unwrap() = None; + } // Re-schedule remaining tasks in the local queue. - while let Ok(r) = self.local.pop() { + while let Ok(r) = self.local.queue.pop() { r.schedule(); } } } +/// Data associated with a local queue. +struct LocalQueue { + /// Concurrent queue of active tasks. + queue: ConcurrentQueue, + + /// Unique ID associated with this runner. + runner_id: usize, +} + /// Steals some items from one queue into another. fn steal(src: &ConcurrentQueue, dest: &ConcurrentQueue) { // Half of `src`'s length rounded up. @@ -1103,21 +1231,7 @@ fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Re } } - /// Debug wrapper for the local runners. - struct LocalRunners<'a>(&'a RwLock>>>); - - impl fmt::Debug for LocalRunners<'_> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self.0.try_read() { - Ok(lock) => f - .debug_list() - .entries(lock.iter().map(|queue| queue.len())) - .finish(), - Err(TryLockError::WouldBlock) => f.write_str(""), - Err(TryLockError::Poisoned(_)) => f.write_str(""), - } - } - } + // TODO: Add wrapper for thread-local queues. /// Debug wrapper for the sleepers. struct SleepCount<'a>(&'a Mutex); @@ -1135,11 +1249,133 @@ fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Re f.debug_struct(name) .field("active", &ActiveTasks(&state.active)) .field("global_tasks", &state.queue.len()) - .field("local_runners", &LocalRunners(&state.local_queues)) .field("sleepers", &SleepCount(&state.sleepers)) .finish() } +fn thread_id() -> usize { + // TODO: This strategy does not work for WASM, figure out a better way! + + /// An allocator for thread IDs. + struct Allocator { + /// The next thread ID to yield. + free_id: usize, + + /// The list of thread ID's that have been released. + /// + /// This exists to defend against the case where a user spawns a million threads, then calls + /// this function, then drops all of those threads. In a few moments this strategy could take up + /// all of the available thread ID space. Therefore we try to reuse thread IDs after they've been + /// dropped. + /// + /// We prefer lower thread IDs, as larger thread IDs require more memory in the const-time addressing + /// strategy we use for thread-specific storage. + /// + /// This is only `None` at program startup, it's only `Option` for const initialization. + /// + /// TODO(notgull): make an entry in the "useful features" table for this + released_ids: Option>>, + } + + impl Allocator { + /// Run a closure with the address allocator. + fn with(f: impl FnOnce(&mut Allocator) -> R) -> R { + static ALLOCATOR: Mutex = Mutex::new(Allocator { + free_id: 0, + released_ids: None, + }); + + f(&mut ALLOCATOR.lock().unwrap_or_else(|x| x.into_inner())) + } + + /// Get the queue for released IDs. + fn released_ids(&mut self) -> &mut VecDeque> { + self.released_ids.get_or_insert_with(VecDeque::default) + } + + /// Get the newest ID. + fn alloc(&mut self) -> usize { + // If we can, reuse an existing ID. + if let Some(Reverse(id)) = self.released_ids().pop_front() { + return id; + } + + // Increment our ID counter. + let id = self.free_id; + self.free_id = self + .free_id + .checked_add(1) + .expect("took up all available thread-ID space"); + id + } + + /// Free an ID that was previously allocated. + fn free(&mut self, id: usize) { + self.released_ids().push_front(Reverse(id)); + } + } + + thread_local! { + /// The unique ID for this thread. + static THREAD_ID: Cell> = const { Cell::new(None) }; + } + + thread_local! { + /// A destructor that frees this ID before the thread exits. + /// + /// This is separate from `THREAD_ID` so that accessing it does not involve a thread-local + /// destructor. + static THREAD_GUARD: RefCell> = const { RefCell::new(None) }; + } + + struct ThreadGuard(usize); + + impl Drop for ThreadGuard { + fn drop(&mut self) { + // DEADLOCK: Allocator is only ever held by this and the first call to "thread_id". + Allocator::with(|alloc| { + // De-allocate the ID. + alloc.free(self.0); + }); + } + } + + /// Fast path for getting the thread ID. + #[inline] + fn get() -> usize { + // Try to use the cached thread ID. + THREAD_ID.with(|thread_id| { + if let Some(thread_id) = thread_id.get() { + return thread_id; + } + + // Use the slow path. + get_slow(thread_id) + }) + } + + /// Slow path for getting the thread ID. + #[cold] + fn get_slow(slot: &Cell>) -> usize { + // Allocate a new thread ID. + let id = Allocator::with(|alloc| alloc.alloc()); + + // Store the thread ID. + let old = slot.replace(Some(id)); + debug_assert!(old.is_none()); + + // Store the destructor, + THREAD_GUARD.with(|guard| { + *guard.borrow_mut() = Some(ThreadGuard(id)); + }); + + // Return the ID. + id + } + + get() +} + /// Runs a closure when dropped. struct CallOnDrop(F);