Skip to content

Commit

Permalink
feat: Add a way to batch spawn tasks
Browse files Browse the repository at this point in the history
For some workloads many tasks are spawned at a time. This requires
locking and unlocking the executor's inner lock every time you spawn a
task. If you spawn many tasks this can be expensive.

This commit exposes a new "spawn_batch" method on both types. This
method allows the user to spawn an entire set of tasks at a time.

Closes #91

Signed-off-by: John Nunley <dev@notgull.net>
  • Loading branch information
notgull committed Mar 30, 2024
1 parent 17720b0 commit d319699
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 33 deletions.
15 changes: 15 additions & 0 deletions benches/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,21 @@ fn running_benches(c: &mut Criterion) {
);
});

group.bench_function("executor::spawn_batch", |b| {
run(
|| {
let mut handles = vec![];

b.iter(|| {
EX.spawn_many((0..250).map(|_| future::yield_now()), &mut handles);
});

handles.clear();
},
*multithread,
)
});

group.bench_function("executor::spawn_many_local", |b| {
run(
|| {
Expand Down
203 changes: 170 additions & 33 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,85 @@ impl<'a> Executor<'a> {
pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
let mut active = self.state().active.lock().unwrap();

// SAFETY: `T` and the future are `Send`.
unsafe { self.spawn_inner(future, &mut active) }
}

/// Spawns many tasks onto the executor.
///
/// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
/// spawns all of the tasks in one go. With large amounts of tasks this can improve
/// contention.
///
/// For very large numbers of tasks the lock is occasionally dropped and re-acquired to
/// prevent runner thread starvation. It is assumed that the iterator provided does not
/// block; blocking iterators can lock up the internal mutex and therefore the entire
/// executor.
///
/// ## Example
///
/// ```
/// use async_executor::Executor;
/// use futures_lite::{stream, prelude::*};
/// use std::future::ready;
///
/// # futures_lite::future::block_on(async {
/// let mut ex = Executor::new();
///
/// let futures = [
/// ready(1),
/// ready(2),
/// ready(3)
/// ];
///
/// // Spawn all of the futures onto the executor at once.
/// let mut tasks = vec![];
/// ex.spawn_many(futures, &mut tasks);
///
/// // Await all of them.
/// let results = ex.run(async move {
/// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
/// }).await;
/// assert_eq!(results, [1, 2, 3]);
/// # });
/// ```
///
/// [`spawn`]: Executor::spawn
pub fn spawn_many<T: Send + 'a, F: Future<Output = T> + Send + 'a>(
&self,
futures: impl IntoIterator<Item = F>,
handles: &mut impl Extend<Task<F::Output>>,
) {
let mut active = Some(self.state().active.lock().unwrap());

// Convert the futures into tasks.
let tasks = futures.into_iter().enumerate().map(move |(i, future)| {
// SAFETY: `T` and the future are `Send`.
let task = unsafe { self.spawn_inner(future, active.as_mut().unwrap()) };

// Yield the lock every once in a while to ease contention.
if i.wrapping_sub(1) % 500 == 0 {
drop(active.take());
active = Some(self.state().active.lock().unwrap());
}

task
});

// Push the tasks to the user's collection.
handles.extend(tasks);
}

/// Spawn a future while holding the inner lock.
///
/// # Safety
///
/// If this is an `Executor`, `F` and `T` must be `Send`.
unsafe fn spawn_inner<T: 'a>(
&self,
future: impl Future<Output = T> + 'a,
active: &mut Slab<Waker>,
) -> Task<T> {
// Remove the task from the set of active tasks when the future finishes.
let entry = active.vacant_entry();
let index = entry.key();
Expand All @@ -159,11 +238,30 @@ impl<'a> Executor<'a> {
};

// Create the task and register it in the set of active tasks.
let (runnable, task) = unsafe {
Builder::new()
.propagate_panic(true)
.spawn_unchecked(|()| future, self.schedule())
};
//
// SAFETY:
//
// If `future` is not `Send`, this must be a `LocalExecutor` as per this
// function's unsafe precondition. Since `LocalExecutor` is `!Sync`,
// `try_tick`, `tick` and `run` can only be called from the origin
// thread of the `LocalExecutor`. Similarly, `spawn` can only be called
// from the origin thread, ensuring that `future` and the executor share
// the same origin thread. The `Runnable` can be scheduled from other
// threads, but because of the above `Runnable` can only be called or
// dropped on the origin thread.
//
// `future` is not `'static`, but we make sure that the `Runnable` does
// not outlive `'a`. When the executor is dropped, the `active` field is
// drained and all of the `Waker`s are woken. Then, the queue inside of
// the `Executor` is drained of all of its runnables. This ensures that
// runnables are dropped and this precondition is satisfied.
//
// `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
// Therefore we do not need to worry about what is done with the
// `Waker`.
let (runnable, task) = Builder::new()
.propagate_panic(true)
.spawn_unchecked(|()| future, self.schedule());
entry.insert(runnable.waker());

runnable.schedule();
Expand Down Expand Up @@ -292,7 +390,7 @@ impl<'a> Executor<'a> {
impl Drop for Executor<'_> {
fn drop(&mut self) {
if let Some(state) = self.state.get() {
let mut active = state.active.lock().unwrap();
let mut active = state.active.lock().unwrap_or_else(|e| e.into_inner());
for w in active.drain() {
w.wake();
}
Expand Down Expand Up @@ -397,25 +495,70 @@ impl<'a> LocalExecutor<'a> {
pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
let mut active = self.inner().state().active.lock().unwrap();

// Remove the task from the set of active tasks when the future finishes.
let entry = active.vacant_entry();
let index = entry.key();
let state = self.inner().state().clone();
let future = async move {
let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
future.await
};
// SAFETY: This executor is not thread safe, so the future and its result
// cannot be sent to another thread.
unsafe { self.inner().spawn_inner(future, &mut active) }
}

// Create the task and register it in the set of active tasks.
let (runnable, task) = unsafe {
Builder::new()
.propagate_panic(true)
.spawn_unchecked(|()| future, self.schedule())
};
entry.insert(runnable.waker());
/// Spawns many tasks onto the executor.
///
/// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
/// spawns all of the tasks in one go. With large amounts of tasks this can improve
/// contention.
///
/// It is assumed that the iterator provided does not block; blocking iterators can lock up
/// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the
/// mutex is not released, as there are no other threads that can poll this executor.
///
/// ## Example
///
/// ```
/// use async_executor::LocalExecutor;
/// use futures_lite::{stream, prelude::*};
/// use std::future::ready;
///
/// # futures_lite::future::block_on(async {
/// let mut ex = LocalExecutor::new();
///
/// let futures = [
/// ready(1),
/// ready(2),
/// ready(3)
/// ];
///
/// // Spawn all of the futures onto the executor at once.
/// let mut tasks = vec![];
/// ex.spawn_many(futures, &mut tasks);
///
/// // Await all of them.
/// let results = ex.run(async move {
/// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
/// }).await;
/// assert_eq!(results, [1, 2, 3]);
/// # });
/// ```
///
/// [`spawn`]: LocalExecutor::spawn
/// [`Executor::spawn_many`]: Executor::spawn_many
pub fn spawn_many<T: Send + 'a, F: Future<Output = T> + Send + 'a>(
&self,
futures: impl IntoIterator<Item = F>,
handles: &mut impl Extend<Task<F::Output>>,
) {
let mut active = self.inner().state().active.lock().unwrap();

runnable.schedule();
task
// Convert all of the futures to tasks.
let tasks = futures.into_iter().map(|future| {
// SAFETY: This executor is not thread safe, so the future and its result
// cannot be sent to another thread.
unsafe { self.inner().spawn_inner(future, &mut active) }

// As only one thread can spawn or poll tasks at a time, there is no need
// to release lock contention here.
});

// Push them to the user's collection.
handles.extend(tasks);
}

/// Attempts to run a task if at least one is scheduled.
Expand Down Expand Up @@ -481,16 +624,6 @@ impl<'a> LocalExecutor<'a> {
self.inner().run(future).await
}

/// Returns a function that schedules a runnable task when it gets woken up.
fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
let state = self.inner().state().clone();

move |runnable| {
state.queue.push(runnable).unwrap();
state.notify();
}
}

/// Returns a reference to the inner executor.
fn inner(&self) -> &Executor<'a> {
&self.inner
Expand Down Expand Up @@ -953,6 +1086,7 @@ fn _ensure_send_and_sync() {

fn is_send<T: Send>(_: T) {}
fn is_sync<T: Sync>(_: T) {}
fn is_static<T: 'static>(_: T) {}

is_send::<Executor<'_>>(Executor::new());
is_sync::<Executor<'_>>(Executor::new());
Expand All @@ -962,6 +1096,9 @@ fn _ensure_send_and_sync() {
is_sync(ex.run(pending::<()>()));
is_send(ex.tick());
is_sync(ex.tick());
is_send(ex.schedule());
is_sync(ex.schedule());
is_static(ex.schedule());

/// ```compile_fail
/// use async_executor::LocalExecutor;
Expand Down
14 changes: 14 additions & 0 deletions tests/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,20 @@ fn drop_finished_task_and_then_drop_executor() {
assert_eq!(DROP.load(Ordering::SeqCst), 1);
}

#[test]
fn iterator_panics_mid_run() {
let ex = Executor::new();

let panic = std::panic::catch_unwind(|| {
let mut handles = vec![];
ex.spawn_many(
(0..50).map(|i| if i == 25 { panic!() } else { future::ready(i) }),
&mut handles,
)
});
assert!(panic.is_err());
}

struct CallOnDrop<F: Fn()>(F);

impl<F: Fn()> Drop for CallOnDrop<F> {
Expand Down
45 changes: 45 additions & 0 deletions tests/spawn_many.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use async_executor::{Executor, LocalExecutor};
use futures_lite::future;

#[cfg(not(miri))]
const READY_COUNT: usize = 50_000;
#[cfg(miri)]
const READY_COUNT: usize = 505;

#[test]
fn spawn_many() {
future::block_on(async {
let ex = Executor::new();

// Spawn a lot of tasks.
let mut tasks = vec![];
ex.spawn_many((0..READY_COUNT).map(future::ready), &mut tasks);

// Run all of the tasks in parallel.
ex.run(async move {
for (i, task) in tasks.into_iter().enumerate() {
assert_eq!(task.await, i);
}
})
.await;
});
}

#[test]
fn spawn_many_local() {
future::block_on(async {
let ex = LocalExecutor::new();

// Spawn a lot of tasks.
let mut tasks = vec![];
ex.spawn_many((0..READY_COUNT).map(future::ready), &mut tasks);

// Run all of the tasks in parallel.
ex.run(async move {
for (i, task) in tasks.into_iter().enumerate() {
assert_eq!(task.await, i);
}
})
.await;
});
}

0 comments on commit d319699

Please sign in to comment.