Skip to content

Commit

Permalink
Stageless: add a method to scope to always run a task on the scope th…
Browse files Browse the repository at this point in the history
…read (#7415)

# Objective

- Currently exclusive systems and applying buffers run outside of the multithreaded executor and just calls the funtions on the thread the schedule is running on. Stageless changes this to run these using tasks in a scope. Specifically It uses `spawn_on_scope` to run these. For the render thread this is incorrect as calling `spawn_on_scope` there runs tasks on the main thread. It should instead run these on the render thread and only run nonsend systems on the main thread.
 
## Solution

- Add another executor to `Scope` for spawning tasks on the scope. `spawn_on_scope` now always runs the task on the thread the scope is running on. `spawn_on_external` spawns onto the external executor than is optionally passed in. If None is passed `spawn_on_external` will spawn onto the scope executor.
- Eventually this new machinery will be able to be removed. This will happen once a fix for removing NonSend resources from the world lands. So this is a temporary fix to support stageless.

---

## Changelog

- add a spawn_on_external method to allow spawning on the scope's thread or an external thread

## Migration Guide

> No migration guide. The main thread executor was introduced in pipelined rendering which was merged for 0.10. spawn_on_scope now behaves the same way as on 0.9.
  • Loading branch information
hymm committed Feb 5, 2023
1 parent 2e53f3b commit e1b0bbf
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 36 deletions.
4 changes: 2 additions & 2 deletions crates/bevy_ecs/src/schedule/executor_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ impl ParallelExecutor {
if system_data.is_send {
scope.spawn(task);
} else {
scope.spawn_on_scope(task);
scope.spawn_on_external(task);
}

#[cfg(test)]
Expand Down Expand Up @@ -281,7 +281,7 @@ impl ParallelExecutor {
if system_data.is_send {
scope.spawn(task);
} else {
scope.spawn_on_scope(task);
scope.spawn_on_external(task);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ impl MultiThreadedExecutor {
scope.spawn(task);
} else {
self.local_thread_running = true;
scope.spawn_on_scope(task);
scope.spawn_on_external(task);
}
}

Expand Down
15 changes: 13 additions & 2 deletions crates/bevy_tasks/src/single_threaded_task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,28 @@ pub struct Scope<'scope, 'env: 'scope, T> {
}

impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> {
/// Spawns a scoped future onto the thread-local executor. The scope *must* outlive
/// Spawns a scoped future onto the executor. The scope *must* outlive
/// the provided future. The results of the future will be returned as a part of
/// [`TaskPool::scope`]'s return value.
///
/// On the single threaded task pool, it just calls [`Scope::spawn_local`].
/// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`].
///
/// For more information, see [`TaskPool::scope`].
pub fn spawn<Fut: Future<Output = T> + 'env>(&self, f: Fut) {
self.spawn_on_scope(f);
}

/// Spawns a scoped future onto the executor. The scope *must* outlive
/// the provided future. The results of the future will be returned as a part of
/// [`TaskPool::scope`]'s return value.
///
/// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`].
///
/// For more information, see [`TaskPool::scope`].
pub fn spawn_on_external<Fut: Future<Output = T> + 'env>(&self, f: Fut) {
self.spawn_on_scope(f);
}

/// Spawns a scoped future that runs on the thread the scope called from. The
/// scope *must* outlive the provided future. The results of the future will be
/// returned as a part of [`TaskPool::scope`]'s return value.
Expand Down
127 changes: 96 additions & 31 deletions crates/bevy_tasks/src/task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,9 @@ impl TaskPool {
F: for<'scope> FnOnce(&'scope Scope<'scope, 'env, T>),
T: Send + 'static,
{
Self::THREAD_EXECUTOR
.with(|thread_executor| self.scope_with_executor_inner(true, thread_executor, f))
Self::THREAD_EXECUTOR.with(|scope_executor| {
self.scope_with_executor_inner(true, scope_executor, scope_executor, f)
})
}

/// This allows passing an external executor to spawn tasks on. When you pass an external executor
Expand All @@ -291,28 +292,39 @@ impl TaskPool {
pub fn scope_with_executor<'env, F, T>(
&self,
tick_task_pool_executor: bool,
thread_executor: Option<&ThreadExecutor>,
external_executor: Option<&ThreadExecutor>,
f: F,
) -> Vec<T>
where
F: for<'scope> FnOnce(&'scope Scope<'scope, 'env, T>),
T: Send + 'static,
{
// If a `thread_executor` is passed use that. Otherwise get the `thread_executor` stored
// in the `THREAD_EXECUTOR` thread local.
if let Some(thread_executor) = thread_executor {
self.scope_with_executor_inner(tick_task_pool_executor, thread_executor, f)
} else {
Self::THREAD_EXECUTOR.with(|thread_executor| {
self.scope_with_executor_inner(tick_task_pool_executor, thread_executor, f)
})
}
Self::THREAD_EXECUTOR.with(|scope_executor| {
// If a `external_executor` is passed use that. Otherwise get the executor stored
// in the `THREAD_EXECUTOR` thread local.
if let Some(external_executor) = external_executor {
self.scope_with_executor_inner(
tick_task_pool_executor,
external_executor,
scope_executor,
f,
)
} else {
self.scope_with_executor_inner(
tick_task_pool_executor,
scope_executor,
scope_executor,
f,
)
}
})
}

fn scope_with_executor_inner<'env, F, T>(
&self,
tick_task_pool_executor: bool,
thread_executor: &ThreadExecutor,
external_executor: &ThreadExecutor,
scope_executor: &ThreadExecutor,
f: F,
) -> Vec<T>
where
Expand All @@ -326,15 +338,17 @@ impl TaskPool {
// transmute the lifetimes to 'env here to appease the compiler as it is unable to validate safety.
let executor: &async_executor::Executor = &self.executor;
let executor: &'env async_executor::Executor = unsafe { mem::transmute(executor) };
let thread_executor: &'env ThreadExecutor<'env> =
unsafe { mem::transmute(thread_executor) };
let external_executor: &'env ThreadExecutor<'env> =
unsafe { mem::transmute(external_executor) };
let scope_executor: &'env ThreadExecutor<'env> = unsafe { mem::transmute(scope_executor) };
let spawned: ConcurrentQueue<FallibleTask<T>> = ConcurrentQueue::unbounded();
let spawned_ref: &'env ConcurrentQueue<FallibleTask<T>> =
unsafe { mem::transmute(&spawned) };

let scope = Scope {
executor,
thread_executor,
external_executor,
scope_executor,
spawned: spawned_ref,
scope: PhantomData,
env: PhantomData,
Expand All @@ -357,25 +371,36 @@ impl TaskPool {
};

let tick_task_pool_executor = tick_task_pool_executor || self.threads.is_empty();
if let Some(thread_ticker) = thread_executor.ticker() {

// we get this from a thread local so we should always be on the scope executors thread.
let scope_ticker = scope_executor.ticker().unwrap();
if let Some(external_ticker) = external_executor.ticker() {
if tick_task_pool_executor {
Self::execute_local_global(thread_ticker, executor, get_results).await
Self::execute_global_external_scope(
executor,
external_ticker,
scope_ticker,
get_results,
)
.await
} else {
Self::execute_local(thread_ticker, get_results).await
Self::execute_external_scope(external_ticker, scope_ticker, get_results)
.await
}
} else if tick_task_pool_executor {
Self::execute_global(executor, get_results).await
Self::execute_global_scope(executor, scope_ticker, get_results).await
} else {
get_results.await
Self::execute_scope(scope_ticker, get_results).await
}
})
}
}

#[inline]
async fn execute_local_global<'scope, 'ticker, T>(
thread_ticker: ThreadExecutorTicker<'scope, 'ticker>,
async fn execute_global_external_scope<'scope, 'ticker, T>(
executor: &'scope async_executor::Executor<'scope>,
external_ticker: ThreadExecutorTicker<'scope, 'ticker>,
scope_ticker: ThreadExecutorTicker<'scope, 'ticker>,
get_results: impl Future<Output = Vec<T>>,
) -> Vec<T> {
// we restart the executors if a task errors. if a scoped
Expand All @@ -384,7 +409,7 @@ impl TaskPool {
loop {
let tick_forever = async {
loop {
thread_ticker.tick().await;
external_ticker.tick().or(scope_ticker.tick()).await;
}
};
// we don't care if it errors. If a scoped task errors it will propagate
Expand All @@ -399,15 +424,16 @@ impl TaskPool {
}

#[inline]
async fn execute_local<'scope, 'ticker, T>(
thread_ticker: ThreadExecutorTicker<'scope, 'ticker>,
async fn execute_external_scope<'scope, 'ticker, T>(
external_ticker: ThreadExecutorTicker<'scope, 'ticker>,
scope_ticker: ThreadExecutorTicker<'scope, 'ticker>,
get_results: impl Future<Output = Vec<T>>,
) -> Vec<T> {
let execute_forever = async {
loop {
let tick_forever = async {
loop {
thread_ticker.tick().await;
external_ticker.tick().or(scope_ticker.tick()).await;
}
};
let _result = AssertUnwindSafe(tick_forever).catch_unwind().await.is_ok();
Expand All @@ -417,13 +443,19 @@ impl TaskPool {
}

#[inline]
async fn execute_global<'scope, T>(
async fn execute_global_scope<'scope, 'ticker, T>(
executor: &'scope async_executor::Executor<'scope>,
scope_ticker: ThreadExecutorTicker<'scope, 'ticker>,
get_results: impl Future<Output = Vec<T>>,
) -> Vec<T> {
let execute_forever = async {
loop {
let _result = AssertUnwindSafe(executor.run(std::future::pending::<()>()))
let tick_forever = async {
loop {
scope_ticker.tick().await;
}
};
let _result = AssertUnwindSafe(executor.run(tick_forever))
.catch_unwind()
.await
.is_ok();
Expand All @@ -432,6 +464,24 @@ impl TaskPool {
execute_forever.or(get_results).await
}

#[inline]
async fn execute_scope<'scope, 'ticker, T>(
scope_ticker: ThreadExecutorTicker<'scope, 'ticker>,
get_results: impl Future<Output = Vec<T>>,
) -> Vec<T> {
let execute_forever = async {
loop {
let tick_forever = async {
loop {
scope_ticker.tick().await;
}
};
let _result = AssertUnwindSafe(tick_forever).catch_unwind().await.is_ok();
}
};
execute_forever.or(get_results).await
}

/// Spawns a static future onto the thread pool. The returned Task is a future. It can also be
/// cancelled and "detached" allowing it to continue running without having to be polled by the
/// end-user.
Expand Down Expand Up @@ -501,7 +551,8 @@ impl Drop for TaskPool {
#[derive(Debug)]
pub struct Scope<'scope, 'env: 'scope, T> {
executor: &'scope async_executor::Executor<'scope>,
thread_executor: &'scope ThreadExecutor<'scope>,
external_executor: &'scope ThreadExecutor<'scope>,
scope_executor: &'scope ThreadExecutor<'scope>,
spawned: &'scope ConcurrentQueue<FallibleTask<T>>,
// make `Scope` invariant over 'scope and 'env
scope: PhantomData<&'scope mut &'scope ()>,
Expand Down Expand Up @@ -531,7 +582,21 @@ impl<'scope, 'env, T: Send + 'scope> Scope<'scope, 'env, T> {
///
/// For more information, see [`TaskPool::scope`].
pub fn spawn_on_scope<Fut: Future<Output = T> + 'scope + Send>(&self, f: Fut) {
let task = self.thread_executor.spawn(f).fallible();
let task = self.scope_executor.spawn(f).fallible();
// ConcurrentQueue only errors when closed or full, but we never
// close and use an unbounded queue, so it is safe to unwrap
self.spawned.push(task).unwrap();
}

/// Spawns a scoped future onto the thread of the external thread executor.
/// This is typically the main thread. The scope *must* outlive
/// the provided future. The results of the future will be returned as a part of
/// [`TaskPool::scope`]'s return value. Users should generally prefer to use
/// [`Scope::spawn`] instead, unless the provided future needs to run on the external thread.
///
/// For more information, see [`TaskPool::scope`].
pub fn spawn_on_external<Fut: Future<Output = T> + 'scope + Send>(&self, f: Fut) {
let task = self.external_executor.spawn(f).fallible();
// ConcurrentQueue only errors when closed or full, but we never
// close and use an unbounded queue, so it is safe to unwrap
self.spawned.push(task).unwrap();
Expand Down

0 comments on commit e1b0bbf

Please sign in to comment.