Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Fix asset_debug_server hang. There should be at most one ThreadExecut… #7825

Closed
10 changes: 10 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,16 @@ description = "Demonstrates the creation and registration of a custom plugin"
category = "Application"
wasm = true

[[example]]
shuoli84 marked this conversation as resolved.
Show resolved Hide resolved
name = "nested_app"
path = "examples/app/nested_app.rs"

[package.metadata.example.nested_app]
name = "Nested"
description = "Demonstrates the creation of nested app"
category = "Application"
wasm = false

[[example]]
name = "plugin_group"
path = "examples/app/plugin_group.rs"
Expand Down
10 changes: 8 additions & 2 deletions crates/bevy_ecs/src/schedule/executor/multi_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,11 +619,17 @@ fn evaluate_and_fold_conditions(conditions: &mut [BoxedCondition], world: &World
}

/// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread
#[derive(Resource, Default, Clone)]
#[derive(Resource, Clone)]
pub struct MainThreadExecutor(pub Arc<ThreadExecutor<'static>>);

impl Default for MainThreadExecutor {
fn default() -> Self {
Self::new()
}
}

impl MainThreadExecutor {
pub fn new() -> Self {
MainThreadExecutor(Arc::new(ThreadExecutor::new()))
MainThreadExecutor(TaskPool::get_thread_executor())
}
}
5 changes: 5 additions & 0 deletions crates/bevy_tasks/src/single_threaded_task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ impl TaskPoolBuilder {
pub struct TaskPool {}

impl TaskPool {
/// Just create a new `ThreadExecutor` for wasm
pub fn get_thread_executor() -> Arc<ThreadExecutor<'static>> {
Arc::new(ThreadExecutor::new())
}

/// Create a `TaskPool` with the default configuration.
pub fn new() -> Self {
TaskPoolBuilder::new().build()
Expand Down
27 changes: 22 additions & 5 deletions crates/bevy_tasks/src/task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,12 @@ pub struct TaskPool {
impl TaskPool {
thread_local! {
static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = async_executor::LocalExecutor::new();
static THREAD_EXECUTOR: ThreadExecutor<'static> = ThreadExecutor::new();
static THREAD_EXECUTOR: Arc<ThreadExecutor<'static>> = Arc::new(ThreadExecutor::new());
}

/// Each thread should only create one `ThreadExecutor`, otherwise, there are good chances they will deadlock
pub fn get_thread_executor() -> Arc<ThreadExecutor<'static>> {
Self::THREAD_EXECUTOR.with(|executor| executor.clone())
}

/// Create a `TaskPool` with the default configuration.
Expand Down Expand Up @@ -411,8 +416,14 @@ impl TaskPool {
let execute_forever = async move {
loop {
let tick_forever = async {
loop {
external_ticker.tick().or(scope_ticker.tick()).await;
shuoli84 marked this conversation as resolved.
Show resolved Hide resolved
if external_ticker.conflict_with(&scope_ticker) {
loop {
external_ticker.tick().await;
}
} else {
loop {
external_ticker.tick().or(scope_ticker.tick()).await;
}
}
};
// we don't care if it errors. If a scoped task errors it will propagate
Expand All @@ -435,8 +446,14 @@ impl TaskPool {
let execute_forever = async {
loop {
let tick_forever = async {
loop {
external_ticker.tick().or(scope_ticker.tick()).await;
shuoli84 marked this conversation as resolved.
Show resolved Hide resolved
if external_ticker.conflict_with(&scope_ticker) {
loop {
external_ticker.tick().await;
}
} else {
loop {
external_ticker.tick().or(scope_ticker.tick()).await;
}
}
};
let _result = AssertUnwindSafe(tick_forever).catch_unwind().await.is_ok();
Expand Down
27 changes: 23 additions & 4 deletions crates/bevy_tasks/src/thread_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,19 @@ impl<'task> ThreadExecutor<'task> {
Self::default()
}

/// check whether same executor, if yes, then the `Ticker` should not
/// be used in same task. E.g, `ticker_1.or(ticker_2).await`. This will leak
/// the ticker and cause dead lock
pub fn is_same_executor(&self, other: &Self) -> bool {
if self.thread_id == other.thread_id {
// for same thread, there should be only one instance
assert!(std::ptr::eq(self, other));
true
} else {
false
}
}

/// Spawn a task on the thread executor
pub fn spawn<T: Send + 'task>(
&self,
Expand All @@ -77,7 +90,7 @@ impl<'task> ThreadExecutor<'task> {
pub fn ticker<'ticker>(&'ticker self) -> Option<ThreadExecutorTicker<'task, 'ticker>> {
if thread::current().id() == self.thread_id {
return Some(ThreadExecutorTicker {
executor: &self.executor,
executor: self,
_marker: PhantomData::default(),
});
}
Expand All @@ -90,20 +103,26 @@ impl<'task> ThreadExecutor<'task> {
/// created on.
#[derive(Debug)]
pub struct ThreadExecutorTicker<'task, 'ticker> {
executor: &'ticker Executor<'task>,
executor: &'ticker ThreadExecutor<'task>,
// make type not send or sync
_marker: PhantomData<*const ()>,
}
impl<'task, 'ticker> ThreadExecutorTicker<'task, 'ticker> {
/// if executor is same, then the Ticker should not used
/// in same task
pub fn conflict_with(&self, other: &Self) -> bool {
self.executor.is_same_executor(other.executor)
}

/// Tick the thread executor.
pub async fn tick(&self) {
self.executor.tick().await;
self.executor.executor.tick().await;
}

/// Synchronously try to tick a task on the executor.
/// Returns false if if does not find a task to tick.
pub fn try_tick(&self) -> bool {
self.executor.try_tick()
self.executor.executor.try_tick()
}
}

Expand Down
1 change: 1 addition & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ Example | Description
[Empty with Defaults](../examples/app/empty_defaults.rs) | An empty application with default plugins
[Headless](../examples/app/headless.rs) | An application that runs without default plugins
[Logs](../examples/app/logs.rs) | Illustrate how to use generate log output
[Nested](../examples/app/nested_app.rs) | Demonstrates the creation of nested app
[No Renderer](../examples/app/no_renderer.rs) | An application that runs with default plugins and displays an empty window, but without an actual renderer
[Plugin](../examples/app/plugin.rs) | Demonstrates the creation and registration of a custom plugin
[Plugin Group](../examples/app/plugin_group.rs) | Demonstrates the creation and registration of a custom plugin group
Expand Down
19 changes: 19 additions & 0 deletions examples/app/nested_app.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use bevy::prelude::*;

fn run_sub_app(mut sub_app: NonSendMut<DebugApp>) {
sub_app.app.update();
}

struct DebugApp {
app: App,
}

fn main() {
let mut app = App::new();

let sub_app = App::new();
app.insert_non_send_resource(DebugApp { app: sub_app });
app.add_system(run_sub_app);

app.update();
}