Skip to content

Commit

Permalink
Fix asset_debug_server hang. There should be at most one ThreadExecut… (
Browse files Browse the repository at this point in the history
#7825)

…or's ticker for one thread.

# Objective

- Fix debug_asset_server hang.

## Solution

- Reuse the thread_local executor for MainThreadExecutor resource, so there will be only one ThreadExecutor for main thread. 
- If ThreadTickers from same executor, they are conflict with each other. Then only tick one.
  • Loading branch information
shuoli84 committed Mar 2, 2023
1 parent e54103f commit 239b070
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 14 deletions.
10 changes: 8 additions & 2 deletions crates/bevy_ecs/src/schedule/executor/multi_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,11 +620,17 @@ fn evaluate_and_fold_conditions(conditions: &mut [BoxedCondition], world: &World
}

/// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread
#[derive(Resource, Default, Clone)]
#[derive(Resource, Clone)]
pub struct MainThreadExecutor(pub Arc<ThreadExecutor<'static>>);

impl Default for MainThreadExecutor {
fn default() -> Self {
Self::new()
}
}

impl MainThreadExecutor {
pub fn new() -> Self {
MainThreadExecutor(Arc::new(ThreadExecutor::new()))
MainThreadExecutor(TaskPool::get_thread_executor())
}
}
5 changes: 5 additions & 0 deletions crates/bevy_tasks/src/single_threaded_task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ impl TaskPoolBuilder {
pub struct TaskPool {}

impl TaskPool {
/// Just create a new `ThreadExecutor` for wasm
pub fn get_thread_executor() -> Arc<ThreadExecutor<'static>> {
Arc::new(ThreadExecutor::new())
}

/// Create a `TaskPool` with the default configuration.
pub fn new() -> Self {
TaskPoolBuilder::new().build()
Expand Down
31 changes: 23 additions & 8 deletions crates/bevy_tasks/src/task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,12 @@ pub struct TaskPool {
impl TaskPool {
thread_local! {
static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = async_executor::LocalExecutor::new();
static THREAD_EXECUTOR: ThreadExecutor<'static> = ThreadExecutor::new();
static THREAD_EXECUTOR: Arc<ThreadExecutor<'static>> = Arc::new(ThreadExecutor::new());
}

/// Each thread should only create one `ThreadExecutor`, otherwise, there are good chances they will deadlock
pub fn get_thread_executor() -> Arc<ThreadExecutor<'static>> {
Self::THREAD_EXECUTOR.with(|executor| executor.clone())
}

/// Create a `TaskPool` with the default configuration.
Expand Down Expand Up @@ -376,24 +381,34 @@ impl TaskPool {
let tick_task_pool_executor = tick_task_pool_executor || self.threads.is_empty();

// we get this from a thread local so we should always be on the scope executors thread.
// note: it is possible `scope_executor` and `external_executor` is the same executor,
// in that case, we should only tick one of them, otherwise, it may cause deadlock.
let scope_ticker = scope_executor.ticker().unwrap();
if let Some(external_ticker) = external_executor.ticker() {
if tick_task_pool_executor {
let external_ticker = if !external_executor.is_same(scope_executor) {
external_executor.ticker()
} else {
None
};

match (external_ticker, tick_task_pool_executor) {
(Some(external_ticker), true) => {
Self::execute_global_external_scope(
executor,
external_ticker,
scope_ticker,
get_results,
)
.await
} else {
}
(Some(external_ticker), false) => {
Self::execute_external_scope(external_ticker, scope_ticker, get_results)
.await
}
} else if tick_task_pool_executor {
Self::execute_global_scope(executor, scope_ticker, get_results).await
} else {
Self::execute_scope(scope_ticker, get_results).await
// either external_executor is none or it is same as scope_executor
(None, true) => {
Self::execute_global_scope(executor, scope_ticker, get_results).await
}
(None, false) => Self::execute_scope(scope_ticker, get_results).await,
}
})
}
Expand Down
13 changes: 9 additions & 4 deletions crates/bevy_tasks/src/thread_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,33 +77,38 @@ impl<'task> ThreadExecutor<'task> {
pub fn ticker<'ticker>(&'ticker self) -> Option<ThreadExecutorTicker<'task, 'ticker>> {
if thread::current().id() == self.thread_id {
return Some(ThreadExecutorTicker {
executor: &self.executor,
executor: self,
_marker: PhantomData::default(),
});
}
None
}

/// Returns true if `self` and `other`'s executor is same
pub fn is_same(&self, other: &Self) -> bool {
std::ptr::eq(self, other)
}
}

/// Used to tick the [`ThreadExecutor`]. The executor does not
/// make progress unless it is manually ticked on the thread it was
/// created on.
#[derive(Debug)]
pub struct ThreadExecutorTicker<'task, 'ticker> {
executor: &'ticker Executor<'task>,
executor: &'ticker ThreadExecutor<'task>,
// make type not send or sync
_marker: PhantomData<*const ()>,
}
impl<'task, 'ticker> ThreadExecutorTicker<'task, 'ticker> {
/// Tick the thread executor.
pub async fn tick(&self) {
self.executor.tick().await;
self.executor.executor.tick().await;
}

/// Synchronously try to tick a task on the executor.
/// Returns false if if does not find a task to tick.
pub fn try_tick(&self) -> bool {
self.executor.try_tick()
self.executor.executor.try_tick()
}
}

Expand Down

0 comments on commit 239b070

Please sign in to comment.