Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleaned up panic messages #8219

Merged
merged 15 commits into from
Apr 12, 2023
Merged
160 changes: 115 additions & 45 deletions crates/bevy_ecs/src/schedule/executor/multi_threaded.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::sync::Arc;
use std::{
any::Any,
sync::{Arc, Mutex},
};

use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor};
use bevy_utils::default;
Expand Down Expand Up @@ -63,12 +66,18 @@ struct SystemTaskMetadata {
is_exclusive: bool,
}

/// The result of running a system that is sent across a channel.
struct SystemResult {
system_index: usize,
success: bool,
}

/// Runs the schedule using a thread pool. Non-conflicting systems can run in parallel.
pub struct MultiThreadedExecutor {
/// Sends system completion events.
sender: Sender<usize>,
sender: Sender<SystemResult>,
/// Receives system completion events.
receiver: Receiver<usize>,
receiver: Receiver<SystemResult>,
/// Metadata for scheduling and running system tasks.
system_task_metadata: Vec<SystemTaskMetadata>,
/// Union of the accesses of all currently running systems.
Expand All @@ -77,6 +86,8 @@ pub struct MultiThreadedExecutor {
local_thread_running: bool,
/// Returns `true` if an exclusive system is running.
exclusive_running: bool,
/// The number of systems expected to run.
num_systems: usize,
/// The number of systems that are running.
num_running_systems: usize,
/// The number of systems that have completed.
Expand All @@ -99,6 +110,10 @@ pub struct MultiThreadedExecutor {
unapplied_systems: FixedBitSet,
/// Setting when true applies system buffers after all systems have run
apply_final_buffers: bool,
/// When set, tells the executor that a thread has panicked.
panic_payload: Arc<Mutex<Option<Box<dyn Any + Send>>>>,
/// When set, stops the executor from running any more systems.
stop_spawning: bool,
}

impl Default for MultiThreadedExecutor {
Expand Down Expand Up @@ -148,8 +163,8 @@ impl SystemExecutor for MultiThreadedExecutor {

fn run(&mut self, schedule: &mut SystemSchedule, world: &mut World) {
// reset counts
let num_systems = schedule.systems.len();
if num_systems == 0 {
self.num_systems = schedule.systems.len();
if self.num_systems == 0 {
return;
}
self.num_running_systems = 0;
Expand Down Expand Up @@ -182,23 +197,22 @@ impl SystemExecutor for MultiThreadedExecutor {
// the executor itself is a `Send` future so that it can run
// alongside systems that claim the local thread
let executor = async {
while self.num_completed_systems < num_systems {
while self.num_completed_systems < self.num_systems {
// SAFETY: self.ready_systems does not contain running systems
unsafe {
self.spawn_system_tasks(scope, systems, &mut conditions, world);
}

if self.num_running_systems > 0 {
// wait for systems to complete
let index =
self.receiver.recv().await.expect(
"A system has panicked so the executor cannot continue.",
);

self.finish_system_and_signal_dependents(index);
if let Ok(result) = self.receiver.recv().await {
self.finish_system_and_handle_dependents(result);
} else {
panic!("Channel closed unexpectedly!");
}

while let Ok(index) = self.receiver.try_recv() {
self.finish_system_and_signal_dependents(index);
while let Ok(result) = self.receiver.try_recv() {
self.finish_system_and_handle_dependents(result);
}

self.rebuild_active_access();
Expand All @@ -217,11 +231,21 @@ impl SystemExecutor for MultiThreadedExecutor {
if self.apply_final_buffers {
// Do one final apply buffers after all systems have completed
// Commands should be applied while on the scope's thread, not the executor's thread
apply_system_buffers(&self.unapplied_systems, systems, world.get_mut());
let res = apply_system_buffers(&self.unapplied_systems, systems, world.get_mut());
if let Err(payload) = res {
let mut panic_payload = self.panic_payload.lock().unwrap();
*panic_payload = Some(payload);
}
self.unapplied_systems.clear();
debug_assert!(self.unapplied_systems.is_clear());
}

// check to see if there was a panic
let mut payload = self.panic_payload.lock().unwrap();
if let Some(payload) = payload.take() {
std::panic::resume_unwind(payload);
}

debug_assert!(self.ready_systems.is_clear());
debug_assert!(self.running_systems.is_clear());
self.active_access.clear();
Expand All @@ -238,6 +262,7 @@ impl MultiThreadedExecutor {
sender,
receiver,
system_task_metadata: Vec::new(),
num_systems: 0,
num_running_systems: 0,
num_completed_systems: 0,
num_dependencies_remaining: Vec::new(),
Expand All @@ -252,6 +277,8 @@ impl MultiThreadedExecutor {
completed_systems: FixedBitSet::new(),
unapplied_systems: FixedBitSet::new(),
apply_final_buffers: true,
panic_payload: Arc::new(Mutex::new(None)),
stop_spawning: false,
}
}

Expand Down Expand Up @@ -438,6 +465,7 @@ impl MultiThreadedExecutor {
let system_span = info_span!("system", name = &*system.name());

let sender = self.sender.clone();
let panic_payload = self.panic_payload.clone();
let task = async move {
#[cfg(feature = "trace")]
let system_guard = system_span.enter();
Expand All @@ -447,14 +475,20 @@ impl MultiThreadedExecutor {
}));
#[cfg(feature = "trace")]
drop(system_guard);
if res.is_err() {
// close the channel to propagate the error to the
// multithreaded executor
sender.close();
} else {
sender
.try_send(system_index)
.unwrap_or_else(|error| unreachable!("{}", error));
// tell the executor that the system finished
sender
.try_send(SystemResult {
system_index,
success: res.is_ok(),
})
.unwrap_or_else(|error| unreachable!("{}", error));
if let Err(payload) = res {
eprintln!("Encountered a panic in system `{}`!", &*system.name());
// set the payload to propagate the error
{
let mut panic_payload = panic_payload.lock().unwrap();
*panic_payload = Some(payload);
}
}
};

Expand Down Expand Up @@ -491,26 +525,28 @@ impl MultiThreadedExecutor {
let system_span = info_span!("system", name = &*system.name());

let sender = self.sender.clone();
let panic_payload = self.panic_payload.clone();
if is_apply_system_buffers(system) {
// TODO: avoid allocation
let unapplied_systems = self.unapplied_systems.clone();
self.unapplied_systems.clear();
let task = async move {
#[cfg(feature = "trace")]
let system_guard = system_span.enter();
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
apply_system_buffers(&unapplied_systems, systems, world);
}));
let res = apply_system_buffers(&unapplied_systems, systems, world);
#[cfg(feature = "trace")]
drop(system_guard);
if res.is_err() {
// close the channel to propagate the error to the
// multithreaded executor
sender.close();
} else {
sender
.try_send(system_index)
.unwrap_or_else(|error| unreachable!("{}", error));
// tell the executor that the system finished
sender
.try_send(SystemResult {
system_index,
success: res.is_ok(),
})
.unwrap_or_else(|error| unreachable!("{}", error));
if let Err(payload) = res {
// set the payload to propagate the error
let mut panic_payload = panic_payload.lock().unwrap();
*panic_payload = Some(payload);
}
};

Expand All @@ -526,14 +562,21 @@ impl MultiThreadedExecutor {
}));
#[cfg(feature = "trace")]
drop(system_guard);
if res.is_err() {
// close the channel to propagate the error to the
// multithreaded executor
sender.close();
} else {
sender
.try_send(system_index)
.unwrap_or_else(|error| unreachable!("{}", error));
// tell the executor that the system finished
sender
.try_send(SystemResult {
system_index,
success: res.is_ok(),
})
.unwrap_or_else(|error| unreachable!("{}", error));
if let Err(payload) = res {
eprintln!(
"Encountered a panic in exclusive system `{}`!",
&*system.name()
);
// set the payload to propagate the error
let mut panic_payload = panic_payload.lock().unwrap();
*panic_payload = Some(payload);
}
};

Expand All @@ -546,7 +589,12 @@ impl MultiThreadedExecutor {
self.local_thread_running = true;
}

fn finish_system_and_signal_dependents(&mut self, system_index: usize) {
fn finish_system_and_handle_dependents(&mut self, result: SystemResult) {
let SystemResult {
system_index,
success,
} = result;

if self.system_task_metadata[system_index].is_exclusive {
self.exclusive_running = false;
}
Expand All @@ -561,7 +609,12 @@ impl MultiThreadedExecutor {
self.running_systems.set(system_index, false);
self.completed_systems.insert(system_index);
self.unapplied_systems.insert(system_index);

self.signal_dependents(system_index);

if !success {
self.stop_spawning_systems();
}
}

fn skip_system_and_signal_dependents(&mut self, system_index: usize) {
Expand All @@ -581,6 +634,13 @@ impl MultiThreadedExecutor {
}
}

fn stop_spawning_systems(&mut self) {
if !self.stop_spawning {
self.num_systems = self.num_completed_systems + self.num_running_systems;
self.stop_spawning = true;
}
}

fn rebuild_active_access(&mut self) {
self.active_access.clear();
for index in self.running_systems.ones() {
Expand All @@ -595,12 +655,22 @@ fn apply_system_buffers(
unapplied_systems: &FixedBitSet,
systems: &[SyncUnsafeCell<BoxedSystem>],
world: &mut World,
) {
) -> Result<(), Box<dyn std::any::Any + Send>> {
for system_index in unapplied_systems.ones() {
// SAFETY: none of these systems are running, no other references exist
let system = unsafe { &mut *systems[system_index].get() };
system.apply_buffers(world);
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
system.apply_buffers(world);
}));
if let Err(payload) = res {
eprintln!(
"Encountered a panic when applying buffers for system `{}`!",
&*system.name()
);
return Err(payload);
}
}
Ok(())
}

fn evaluate_and_fold_conditions(conditions: &mut [BoxedCondition], world: &World) -> bool {
Expand Down
9 changes: 8 additions & 1 deletion crates/bevy_ecs/src/schedule/executor/simple.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#[cfg(feature = "trace")]
use bevy_utils::tracing::info_span;
use fixedbitset::FixedBitSet;
use std::panic::AssertUnwindSafe;

use crate::{
schedule::{BoxedCondition, ExecutorKind, SystemExecutor, SystemSchedule},
Expand Down Expand Up @@ -78,9 +79,15 @@ impl SystemExecutor for SimpleExecutor {
let system = &mut schedule.systems[system_index];
#[cfg(feature = "trace")]
let system_span = info_span!("system", name = &*name).entered();
system.run((), world);
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
system.run((), world);
}));
#[cfg(feature = "trace")]
system_span.exit();
if let Err(payload) = res {
eprintln!("Encountered a panic in system `{}`!", &*system.name());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit too late now, but in a future PR something like this could make the error stand out a bit more

Suggested change
eprintln!("Encountered a panic in system `{}`!", &*system.name());
eprintln!("\x1b[91mERROR\x1b[0m: Encountered a panic in system `{}`!", &*system.name());

This will make the ERROR red and look like this:
image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this supported in all terminals on all platforms?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it works on windows at least and I would be surprised if it didn't work on linux. but I don't have a way to confirm it right now. There are crates that can handle that more gracefully though, just not sure if adding a crate just for that is required, although, tracing probably already uses one of them so we could just reuse that one since it's already in the dep tree.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And when logs are redirected to a file? I don't know of a single way to have colors...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use error! macro to have colors in terminal and normal output if colors aren't supported.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use error!

error! wasn't used because of verbosity concerns. I don't personally agree, but I suggested that approach as an alternative. I think we should instead change the default format of logs if the verbosity is a concern instead of avoiding it, but that's out of scope for here.

And when logs are redirected to a file

True, I didn't really consider that. At least it would just be a couple of weird characters at the end of the file, but it's indeed not ideal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should instead change the default format of logs if the verbosity is a concern instead of avoiding it,

Makes sense to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know of a single way to have colors...

Well, there's error!() that can deal with it correctly 😉. So it's a way, but like mentioned it was purposefully not used.

Copy link
Member

@mockersf mockersf Apr 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, there's error!() that can deal with it correctly

I meant by adding control characters. As far as I know the tracing library is using environment detection.

error! wasn't used because of verbosity concerns

Oh I thought it was because bevy_ecs can be used without logs and tracing enabled

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I was basing it on this comment #8219 (comment) but alice mentioned just after that that it won't work with tracing disabled.

Not sure what the best solution is, maybe we could just use a feature flag for tracing and if it's disabled just use the current log with no color?

It jus seems unfortunate to not use tracing where possible. For example, if someone sets up a tracing layer to output to a file they won't get those logs currently, which really isn't ideal.

std::panic::resume_unwind(payload);
}

system.apply_buffers(world);
}
Expand Down
9 changes: 8 additions & 1 deletion crates/bevy_ecs/src/schedule/executor/single_threaded.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#[cfg(feature = "trace")]
use bevy_utils::tracing::info_span;
use fixedbitset::FixedBitSet;
use std::panic::AssertUnwindSafe;

use crate::{
schedule::{
Expand Down Expand Up @@ -95,9 +96,15 @@ impl SystemExecutor for SingleThreadedExecutor {
} else {
#[cfg(feature = "trace")]
let system_span = info_span!("system", name = &*name).entered();
system.run((), world);
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
system.run((), world);
}));
#[cfg(feature = "trace")]
system_span.exit();
if let Err(payload) = res {
eprintln!("Encountered a panic in system `{}`!", &*system.name());
std::panic::resume_unwind(payload);
}
self.unapplied_systems.insert(system_index);
}
}
Expand Down
Loading