diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index dd966ac3cf036..9ecde8cca4f91 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -246,7 +246,7 @@ impl ParallelExecutor { if system_data.is_send { scope.spawn(task); } else { - scope.spawn_on_scope(task); + scope.spawn_on_external(task); } #[cfg(test)] @@ -281,7 +281,7 @@ impl ParallelExecutor { if system_data.is_send { scope.spawn(task); } else { - scope.spawn_on_scope(task); + scope.spawn_on_external(task); } } } diff --git a/crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs b/crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs index 84738f4bbe9cb..33789b2a15474 100644 --- a/crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs +++ b/crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs @@ -459,7 +459,7 @@ impl MultiThreadedExecutor { scope.spawn(task); } else { self.local_thread_running = true; - scope.spawn_on_scope(task); + scope.spawn_on_external(task); } } diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 9b77d8fd3bb2c..83e8084a6d3cd 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -178,17 +178,28 @@ pub struct Scope<'scope, 'env: 'scope, T> { } impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { - /// Spawns a scoped future onto the thread-local executor. The scope *must* outlive + /// Spawns a scoped future onto the executor. The scope *must* outlive /// the provided future. The results of the future will be returned as a part of /// [`TaskPool::scope`]'s return value. /// - /// On the single threaded task pool, it just calls [`Scope::spawn_local`]. + /// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`]. /// /// For more information, see [`TaskPool::scope`]. pub fn spawn + 'env>(&self, f: Fut) { self.spawn_on_scope(f); } + /// Spawns a scoped future onto the executor. The scope *must* outlive + /// the provided future. The results of the future will be returned as a part of + /// [`TaskPool::scope`]'s return value. + /// + /// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`]. + /// + /// For more information, see [`TaskPool::scope`]. + pub fn spawn_on_external + 'env>(&self, f: Fut) { + self.spawn_on_scope(f); + } + /// Spawns a scoped future that runs on the thread the scope called from. The /// scope *must* outlive the provided future. The results of the future will be /// returned as a part of [`TaskPool::scope`]'s return value. diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 5ded04818694f..254e482da5ed6 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -275,8 +275,9 @@ impl TaskPool { F: for<'scope> FnOnce(&'scope Scope<'scope, 'env, T>), T: Send + 'static, { - Self::THREAD_EXECUTOR - .with(|thread_executor| self.scope_with_executor_inner(true, thread_executor, f)) + Self::THREAD_EXECUTOR.with(|scope_executor| { + self.scope_with_executor_inner(true, scope_executor, scope_executor, f) + }) } /// This allows passing an external executor to spawn tasks on. When you pass an external executor @@ -291,28 +292,39 @@ impl TaskPool { pub fn scope_with_executor<'env, F, T>( &self, tick_task_pool_executor: bool, - thread_executor: Option<&ThreadExecutor>, + external_executor: Option<&ThreadExecutor>, f: F, ) -> Vec where F: for<'scope> FnOnce(&'scope Scope<'scope, 'env, T>), T: Send + 'static, { - // If a `thread_executor` is passed use that. Otherwise get the `thread_executor` stored - // in the `THREAD_EXECUTOR` thread local. - if let Some(thread_executor) = thread_executor { - self.scope_with_executor_inner(tick_task_pool_executor, thread_executor, f) - } else { - Self::THREAD_EXECUTOR.with(|thread_executor| { - self.scope_with_executor_inner(tick_task_pool_executor, thread_executor, f) - }) - } + Self::THREAD_EXECUTOR.with(|scope_executor| { + // If a `external_executor` is passed use that. Otherwise get the executor stored + // in the `THREAD_EXECUTOR` thread local. + if let Some(external_executor) = external_executor { + self.scope_with_executor_inner( + tick_task_pool_executor, + external_executor, + scope_executor, + f, + ) + } else { + self.scope_with_executor_inner( + tick_task_pool_executor, + scope_executor, + scope_executor, + f, + ) + } + }) } fn scope_with_executor_inner<'env, F, T>( &self, tick_task_pool_executor: bool, - thread_executor: &ThreadExecutor, + external_executor: &ThreadExecutor, + scope_executor: &ThreadExecutor, f: F, ) -> Vec where @@ -326,15 +338,17 @@ impl TaskPool { // transmute the lifetimes to 'env here to appease the compiler as it is unable to validate safety. let executor: &async_executor::Executor = &self.executor; let executor: &'env async_executor::Executor = unsafe { mem::transmute(executor) }; - let thread_executor: &'env ThreadExecutor<'env> = - unsafe { mem::transmute(thread_executor) }; + let external_executor: &'env ThreadExecutor<'env> = + unsafe { mem::transmute(external_executor) }; + let scope_executor: &'env ThreadExecutor<'env> = unsafe { mem::transmute(scope_executor) }; let spawned: ConcurrentQueue> = ConcurrentQueue::unbounded(); let spawned_ref: &'env ConcurrentQueue> = unsafe { mem::transmute(&spawned) }; let scope = Scope { executor, - thread_executor, + external_executor, + scope_executor, spawned: spawned_ref, scope: PhantomData, env: PhantomData, @@ -357,25 +371,36 @@ impl TaskPool { }; let tick_task_pool_executor = tick_task_pool_executor || self.threads.is_empty(); - if let Some(thread_ticker) = thread_executor.ticker() { + + // we get this from a thread local so we should always be on the scope executors thread. + let scope_ticker = scope_executor.ticker().unwrap(); + if let Some(external_ticker) = external_executor.ticker() { if tick_task_pool_executor { - Self::execute_local_global(thread_ticker, executor, get_results).await + Self::execute_global_external_scope( + executor, + external_ticker, + scope_ticker, + get_results, + ) + .await } else { - Self::execute_local(thread_ticker, get_results).await + Self::execute_external_scope(external_ticker, scope_ticker, get_results) + .await } } else if tick_task_pool_executor { - Self::execute_global(executor, get_results).await + Self::execute_global_scope(executor, scope_ticker, get_results).await } else { - get_results.await + Self::execute_scope(scope_ticker, get_results).await } }) } } #[inline] - async fn execute_local_global<'scope, 'ticker, T>( - thread_ticker: ThreadExecutorTicker<'scope, 'ticker>, + async fn execute_global_external_scope<'scope, 'ticker, T>( executor: &'scope async_executor::Executor<'scope>, + external_ticker: ThreadExecutorTicker<'scope, 'ticker>, + scope_ticker: ThreadExecutorTicker<'scope, 'ticker>, get_results: impl Future>, ) -> Vec { // we restart the executors if a task errors. if a scoped @@ -384,7 +409,7 @@ impl TaskPool { loop { let tick_forever = async { loop { - thread_ticker.tick().await; + external_ticker.tick().or(scope_ticker.tick()).await; } }; // we don't care if it errors. If a scoped task errors it will propagate @@ -399,15 +424,16 @@ impl TaskPool { } #[inline] - async fn execute_local<'scope, 'ticker, T>( - thread_ticker: ThreadExecutorTicker<'scope, 'ticker>, + async fn execute_external_scope<'scope, 'ticker, T>( + external_ticker: ThreadExecutorTicker<'scope, 'ticker>, + scope_ticker: ThreadExecutorTicker<'scope, 'ticker>, get_results: impl Future>, ) -> Vec { let execute_forever = async { loop { let tick_forever = async { loop { - thread_ticker.tick().await; + external_ticker.tick().or(scope_ticker.tick()).await; } }; let _result = AssertUnwindSafe(tick_forever).catch_unwind().await.is_ok(); @@ -417,13 +443,19 @@ impl TaskPool { } #[inline] - async fn execute_global<'scope, T>( + async fn execute_global_scope<'scope, 'ticker, T>( executor: &'scope async_executor::Executor<'scope>, + scope_ticker: ThreadExecutorTicker<'scope, 'ticker>, get_results: impl Future>, ) -> Vec { let execute_forever = async { loop { - let _result = AssertUnwindSafe(executor.run(std::future::pending::<()>())) + let tick_forever = async { + loop { + scope_ticker.tick().await; + } + }; + let _result = AssertUnwindSafe(executor.run(tick_forever)) .catch_unwind() .await .is_ok(); @@ -432,6 +464,24 @@ impl TaskPool { execute_forever.or(get_results).await } + #[inline] + async fn execute_scope<'scope, 'ticker, T>( + scope_ticker: ThreadExecutorTicker<'scope, 'ticker>, + get_results: impl Future>, + ) -> Vec { + let execute_forever = async { + loop { + let tick_forever = async { + loop { + scope_ticker.tick().await; + } + }; + let _result = AssertUnwindSafe(tick_forever).catch_unwind().await.is_ok(); + } + }; + execute_forever.or(get_results).await + } + /// Spawns a static future onto the thread pool. The returned Task is a future. It can also be /// cancelled and "detached" allowing it to continue running without having to be polled by the /// end-user. @@ -501,7 +551,8 @@ impl Drop for TaskPool { #[derive(Debug)] pub struct Scope<'scope, 'env: 'scope, T> { executor: &'scope async_executor::Executor<'scope>, - thread_executor: &'scope ThreadExecutor<'scope>, + external_executor: &'scope ThreadExecutor<'scope>, + scope_executor: &'scope ThreadExecutor<'scope>, spawned: &'scope ConcurrentQueue>, // make `Scope` invariant over 'scope and 'env scope: PhantomData<&'scope mut &'scope ()>, @@ -531,7 +582,21 @@ impl<'scope, 'env, T: Send + 'scope> Scope<'scope, 'env, T> { /// /// For more information, see [`TaskPool::scope`]. pub fn spawn_on_scope + 'scope + Send>(&self, f: Fut) { - let task = self.thread_executor.spawn(f).fallible(); + let task = self.scope_executor.spawn(f).fallible(); + // ConcurrentQueue only errors when closed or full, but we never + // close and use an unbounded queue, so it is safe to unwrap + self.spawned.push(task).unwrap(); + } + + /// Spawns a scoped future onto the thread of the external thread executor. + /// This is typically the main thread. The scope *must* outlive + /// the provided future. The results of the future will be returned as a part of + /// [`TaskPool::scope`]'s return value. Users should generally prefer to use + /// [`Scope::spawn`] instead, unless the provided future needs to run on the external thread. + /// + /// For more information, see [`TaskPool::scope`]. + pub fn spawn_on_external + 'scope + Send>(&self, f: Fut) { + let task = self.external_executor.spawn(f).fallible(); // ConcurrentQueue only errors when closed or full, but we never // close and use an unbounded queue, so it is safe to unwrap self.spawned.push(task).unwrap();