Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Fix asset_debug_server hang. There should be at most one ThreadExecut… #7825

Closed
10 changes: 8 additions & 2 deletions crates/bevy_ecs/src/schedule/executor/multi_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,11 +619,17 @@ fn evaluate_and_fold_conditions(conditions: &mut [BoxedCondition], world: &World
}

/// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread
#[derive(Resource, Default, Clone)]
#[derive(Resource, Clone)]
pub struct MainThreadExecutor(pub Arc<ThreadExecutor<'static>>);

impl Default for MainThreadExecutor {
fn default() -> Self {
Self::new()
}
}

impl MainThreadExecutor {
pub fn new() -> Self {
MainThreadExecutor(Arc::new(ThreadExecutor::new()))
MainThreadExecutor(TaskPool::get_thread_executor())
}
}
5 changes: 5 additions & 0 deletions crates/bevy_tasks/src/single_threaded_task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ impl TaskPoolBuilder {
pub struct TaskPool {}

impl TaskPool {
/// Just create a new `ThreadExecutor` for wasm
pub fn get_thread_executor() -> Arc<ThreadExecutor<'static>> {
Arc::new(ThreadExecutor::new())
}

/// Create a `TaskPool` with the default configuration.
pub fn new() -> Self {
TaskPoolBuilder::new().build()
Expand Down
11 changes: 8 additions & 3 deletions crates/bevy_tasks/src/task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,12 @@ pub struct TaskPool {
impl TaskPool {
thread_local! {
static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = async_executor::LocalExecutor::new();
static THREAD_EXECUTOR: ThreadExecutor<'static> = ThreadExecutor::new();
static THREAD_EXECUTOR: Arc<ThreadExecutor<'static>> = Arc::new(ThreadExecutor::new());
}

/// Each thread should only create one `ThreadExecutor`, otherwise, there are good chances they will deadlock
pub fn get_thread_executor() -> Arc<ThreadExecutor<'static>> {
Self::THREAD_EXECUTOR.with(|executor| executor.clone())
}

/// Create a `TaskPool` with the default configuration.
Expand Down Expand Up @@ -412,7 +417,7 @@ impl TaskPool {
loop {
let tick_forever = async {
loop {
external_ticker.tick().or(scope_ticker.tick()).await;
shuoli84 marked this conversation as resolved.
Show resolved Hide resolved
external_ticker.or_tick(&scope_ticker).await;
}
};
// we don't care if it errors. If a scoped task errors it will propagate
Expand All @@ -436,7 +441,7 @@ impl TaskPool {
loop {
let tick_forever = async {
loop {
external_ticker.tick().or(scope_ticker.tick()).await;
shuoli84 marked this conversation as resolved.
Show resolved Hide resolved
external_ticker.or_tick(&scope_ticker).await;
}
};
let _result = AssertUnwindSafe(tick_forever).catch_unwind().await.is_ok();
Expand Down
25 changes: 20 additions & 5 deletions crates/bevy_tasks/src/thread_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
};

use async_executor::{Executor, Task};
use futures_lite::Future;
use futures_lite::{Future, FutureExt};

/// An executor that can only be ticked on the thread it was instantiated on. But
/// can spawn `Send` tasks from other threads.
Expand Down Expand Up @@ -77,33 +77,48 @@ impl<'task> ThreadExecutor<'task> {
pub fn ticker<'ticker>(&'ticker self) -> Option<ThreadExecutorTicker<'task, 'ticker>> {
if thread::current().id() == self.thread_id {
return Some(ThreadExecutorTicker {
executor: &self.executor,
executor: self,
_marker: PhantomData::default(),
});
}
None
}

/// check whether `self` and `other` is the same executor
fn is_same_executor(&self, other: &Self) -> bool {
std::ptr::eq(self, other)
}
}

/// Used to tick the [`ThreadExecutor`]. The executor does not
/// make progress unless it is manually ticked on the thread it was
/// created on.
#[derive(Debug)]
pub struct ThreadExecutorTicker<'task, 'ticker> {
executor: &'ticker Executor<'task>,
executor: &'ticker ThreadExecutor<'task>,
// make type not send or sync
_marker: PhantomData<*const ()>,
}
impl<'task, 'ticker> ThreadExecutorTicker<'task, 'ticker> {
/// Tick the thread executor.
pub async fn tick(&self) {
self.executor.tick().await;
self.executor.executor.tick().await;
}

/// Synchronously try to tick a task on the executor.
/// Returns false if if does not find a task to tick.
pub fn try_tick(&self) -> bool {
self.executor.try_tick()
self.executor.executor.try_tick()
}

/// join tick `self` and `other` with `or`.
/// check [this pr](https://github.com/bevyengine/bevy/pull/7825) for reference
shuoli84 marked this conversation as resolved.
Show resolved Hide resolved
pub async fn or_tick(&self, other: &Self) {
if self.executor.is_same_executor(other.executor) {
shuoli84 marked this conversation as resolved.
Show resolved Hide resolved
self.tick().await;
} else {
self.tick().or(other.tick()).await;
}
}
}

Expand Down