From 239b070674792a4d158e94bbc6478856e8766cad Mon Sep 17 00:00:00 2001 From: shuo Date: Thu, 2 Mar 2023 08:40:25 +0000 Subject: [PATCH] =?UTF-8?q?Fix=20asset=5Fdebug=5Fserver=20hang.=20There=20?= =?UTF-8?q?should=20be=20at=20most=20one=20ThreadExecut=E2=80=A6=20(#7825)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …or's ticker for one thread. # Objective - Fix debug_asset_server hang. ## Solution - Reuse the thread_local executor for MainThreadExecutor resource, so there will be only one ThreadExecutor for main thread. - If ThreadTickers from same executor, they are conflict with each other. Then only tick one. --- .../src/schedule/executor/multi_threaded.rs | 10 ++++-- .../src/single_threaded_task_pool.rs | 5 +++ crates/bevy_tasks/src/task_pool.rs | 31 ++++++++++++++----- crates/bevy_tasks/src/thread_executor.rs | 13 +++++--- 4 files changed, 45 insertions(+), 14 deletions(-) diff --git a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs index 881b311e33457..80c6f27d2e3d6 100644 --- a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs +++ b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs @@ -620,11 +620,17 @@ fn evaluate_and_fold_conditions(conditions: &mut [BoxedCondition], world: &World } /// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread -#[derive(Resource, Default, Clone)] +#[derive(Resource, Clone)] pub struct MainThreadExecutor(pub Arc>); +impl Default for MainThreadExecutor { + fn default() -> Self { + Self::new() + } +} + impl MainThreadExecutor { pub fn new() -> Self { - MainThreadExecutor(Arc::new(ThreadExecutor::new())) + MainThreadExecutor(TaskPool::get_thread_executor()) } } diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 83e8084a6d3cd..b1546884bce51 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -56,6 +56,11 @@ impl TaskPoolBuilder { pub struct TaskPool {} impl TaskPool { + /// Just create a new `ThreadExecutor` for wasm + pub fn get_thread_executor() -> Arc> { + Arc::new(ThreadExecutor::new()) + } + /// Create a `TaskPool` with the default configuration. pub fn new() -> Self { TaskPoolBuilder::new().build() diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index a60a74f6ee8b5..69bb2cea27892 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -112,7 +112,12 @@ pub struct TaskPool { impl TaskPool { thread_local! { static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = async_executor::LocalExecutor::new(); - static THREAD_EXECUTOR: ThreadExecutor<'static> = ThreadExecutor::new(); + static THREAD_EXECUTOR: Arc> = Arc::new(ThreadExecutor::new()); + } + + /// Each thread should only create one `ThreadExecutor`, otherwise, there are good chances they will deadlock + pub fn get_thread_executor() -> Arc> { + Self::THREAD_EXECUTOR.with(|executor| executor.clone()) } /// Create a `TaskPool` with the default configuration. @@ -376,9 +381,17 @@ impl TaskPool { let tick_task_pool_executor = tick_task_pool_executor || self.threads.is_empty(); // we get this from a thread local so we should always be on the scope executors thread. + // note: it is possible `scope_executor` and `external_executor` is the same executor, + // in that case, we should only tick one of them, otherwise, it may cause deadlock. let scope_ticker = scope_executor.ticker().unwrap(); - if let Some(external_ticker) = external_executor.ticker() { - if tick_task_pool_executor { + let external_ticker = if !external_executor.is_same(scope_executor) { + external_executor.ticker() + } else { + None + }; + + match (external_ticker, tick_task_pool_executor) { + (Some(external_ticker), true) => { Self::execute_global_external_scope( executor, external_ticker, @@ -386,14 +399,16 @@ impl TaskPool { get_results, ) .await - } else { + } + (Some(external_ticker), false) => { Self::execute_external_scope(external_ticker, scope_ticker, get_results) .await } - } else if tick_task_pool_executor { - Self::execute_global_scope(executor, scope_ticker, get_results).await - } else { - Self::execute_scope(scope_ticker, get_results).await + // either external_executor is none or it is same as scope_executor + (None, true) => { + Self::execute_global_scope(executor, scope_ticker, get_results).await + } + (None, false) => Self::execute_scope(scope_ticker, get_results).await, } }) } diff --git a/crates/bevy_tasks/src/thread_executor.rs b/crates/bevy_tasks/src/thread_executor.rs index 0ba66571db982..7495fe525e052 100644 --- a/crates/bevy_tasks/src/thread_executor.rs +++ b/crates/bevy_tasks/src/thread_executor.rs @@ -77,12 +77,17 @@ impl<'task> ThreadExecutor<'task> { pub fn ticker<'ticker>(&'ticker self) -> Option> { if thread::current().id() == self.thread_id { return Some(ThreadExecutorTicker { - executor: &self.executor, + executor: self, _marker: PhantomData::default(), }); } None } + + /// Returns true if `self` and `other`'s executor is same + pub fn is_same(&self, other: &Self) -> bool { + std::ptr::eq(self, other) + } } /// Used to tick the [`ThreadExecutor`]. The executor does not @@ -90,20 +95,20 @@ impl<'task> ThreadExecutor<'task> { /// created on. #[derive(Debug)] pub struct ThreadExecutorTicker<'task, 'ticker> { - executor: &'ticker Executor<'task>, + executor: &'ticker ThreadExecutor<'task>, // make type not send or sync _marker: PhantomData<*const ()>, } impl<'task, 'ticker> ThreadExecutorTicker<'task, 'ticker> { /// Tick the thread executor. pub async fn tick(&self) { - self.executor.tick().await; + self.executor.executor.tick().await; } /// Synchronously try to tick a task on the executor. /// Returns false if if does not find a task to tick. pub fn try_tick(&self) -> bool { - self.executor.try_tick() + self.executor.executor.try_tick() } }