From 3e79c65bd71fb968de3587cb83c6cebf16a92ff0 Mon Sep 17 00:00:00 2001 From: Mike Hsu Date: Tue, 31 Jan 2023 11:46:14 -0800 Subject: [PATCH] close the channel so executor doesn't deadlock --- .../schedule_v3/executor/multi_threaded.rs | 60 +++++++++++++------ 1 file changed, 43 insertions(+), 17 deletions(-) diff --git a/crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs b/crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs index 21d6c5d4cdb7f7..94f63e95c9848e 100644 --- a/crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs +++ b/crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs @@ -1,3 +1,5 @@ +use std::panic::AssertUnwindSafe; + use bevy_tasks::{ComputeTaskPool, Scope, TaskPool}; use bevy_utils::default; use bevy_utils::syncunsafecell::SyncUnsafeCell; @@ -167,7 +169,7 @@ impl SystemExecutor for MultiThreadedExecutor { .receiver .recv() .await - .unwrap_or_else(|error| unreachable!("{}", error)); + .expect("A system has panicked so the executor cannot continue."); self.finish_system_and_signal_dependents(index); @@ -416,14 +418,22 @@ impl MultiThreadedExecutor { let task = async move { #[cfg(feature = "trace")] let system_guard = system_span.enter(); - // SAFETY: access is compatible - unsafe { system.run_unsafe((), world) }; + let res = std::panic::catch_unwind(AssertUnwindSafe(|| { + // SAFETY: access is compatible + unsafe { system.run_unsafe((), world) }; + })); #[cfg(feature = "trace")] drop(system_guard); - sender - .send(system_index) - .await - .unwrap_or_else(|error| unreachable!("{}", error)); + if res.is_err() { + // close the channel to propagate the error to the + // multithreaded executor + sender.close(); + } else { + sender + .send(system_index) + .await + .unwrap_or_else(|error| unreachable!("{}", error)); + } }; #[cfg(feature = "trace")] @@ -466,13 +476,21 @@ impl MultiThreadedExecutor { let task = async move { #[cfg(feature = "trace")] let system_guard = system_span.enter(); - apply_system_buffers(&unapplied_systems, systems, world); + let res = std::panic::catch_unwind(AssertUnwindSafe(|| { + apply_system_buffers(&unapplied_systems, systems, world); + })); #[cfg(feature = "trace")] drop(system_guard); - sender - .send(system_index) - .await - .unwrap_or_else(|error| unreachable!("{}", error)); + if res.is_err() { + // close the channel to propagate the error to the + // multithreaded executor + sender.close(); + } else { + sender + .send(system_index) + .await + .unwrap_or_else(|error| unreachable!("{}", error)); + } }; #[cfg(feature = "trace")] @@ -482,13 +500,21 @@ impl MultiThreadedExecutor { let task = async move { #[cfg(feature = "trace")] let system_guard = system_span.enter(); - system.run((), world); + let res = std::panic::catch_unwind(AssertUnwindSafe(|| { + system.run((), world); + })); #[cfg(feature = "trace")] drop(system_guard); - sender - .send(system_index) - .await - .unwrap_or_else(|error| unreachable!("{}", error)); + if res.is_err() { + // close the channel to propagate the error to the + // multithreaded executor + sender.close(); + } else { + sender + .send(system_index) + .await + .unwrap_or_else(|error| unreachable!("{}", error)); + } }; #[cfg(feature = "trace")]