Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tasks] Handle TaskPool panicking threads #2307

Closed
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/bevy_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ use bevy_ecs::{
schedule::{ExclusiveSystemDescriptorCoercion, SystemLabel},
system::{IntoExclusiveSystem, IntoSystem},
};
use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool};
use bevy_utils::HashSet;
use std::ops::Range;
use task_pool_options::handle_task_pool_panicking_threads_system;

NathanSWard marked this conversation as resolved.
Show resolved Hide resolved
/// Adds core functionality to Apps.
#[derive(Default)]
Expand All @@ -46,6 +48,11 @@ impl Plugin for CorePlugin {
.unwrap_or_else(DefaultTaskPoolOptions::default)
.create_default_pools(app.world_mut());

#[cfg(not(target_arch = "wasm32"))]
app.add_system(handle_task_pool_panicking_threads_system::<IoTaskPool>.system())
.add_system(handle_task_pool_panicking_threads_system::<ComputeTaskPool>.system())
.add_system(handle_task_pool_panicking_threads_system::<AsyncComputeTaskPool>.system());

app.init_resource::<Time>()
.init_resource::<EntityLabels>()
.init_resource::<FixedTimesteps>()
Expand Down
79 changes: 57 additions & 22 deletions crates/bevy_core/src/task_pool_options.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
use bevy_ecs::world::World;
use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool, TaskPoolBuilder};
use bevy_ecs::{system::Res, world::World};
use bevy_tasks::{
AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool, TaskPoolBuilder, TaskPoolThreadPanicPolicy,
TaskPoolTrait,
};
use bevy_utils::tracing::trace;

/// DOCS: todo
#[derive(Clone, Debug)]
pub struct TaskPoolPolicies {
/// Used to determine number of threads to allocate
pub assignment_policy: TaskPoolThreadAssignmentPolicy,
/// Used to determine the panic policy of the task pool
pub panic_policy: TaskPoolThreadPanicPolicy,
}

/// Defines a simple way to determine how many threads to use given the number of remaining cores
/// and number of total cores
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct TaskPoolThreadAssignmentPolicy {
/// Force using at least this many threads
pub min_threads: usize,
Expand Down Expand Up @@ -42,13 +54,12 @@ pub struct DefaultTaskPoolOptions {
/// If the number of physical cores is grater than max_total_threads, force using
/// max_total_threads
pub max_total_threads: usize,

/// Used to determine number of IO threads to allocate
pub io: TaskPoolThreadAssignmentPolicy,
/// Used to determine number of async compute threads to allocate
pub async_compute: TaskPoolThreadAssignmentPolicy,
/// Used to determine number of compute threads to allocate
pub compute: TaskPoolThreadAssignmentPolicy,
/// Used to configure the IOTaskPool's inner policies
pub io: TaskPoolPolicies,
/// Used to configure the AsyncTaskPool's inner policies
pub async_compute: TaskPoolPolicies,
/// Used to configure the ComputeTaskPool's inner policies
pub compute: TaskPoolPolicies,
}

impl Default for DefaultTaskPoolOptions {
Expand All @@ -59,24 +70,33 @@ impl Default for DefaultTaskPoolOptions {
max_total_threads: std::usize::MAX,

// Use 25% of cores for IO, at least 1, no more than 4
io: TaskPoolThreadAssignmentPolicy {
min_threads: 1,
max_threads: 4,
percent: 0.25,
io: TaskPoolPolicies {
assignment_policy: TaskPoolThreadAssignmentPolicy {
min_threads: 1,
max_threads: 4,
percent: 0.25,
},
panic_policy: TaskPoolThreadPanicPolicy::Restart,
},

// Use 25% of cores for async compute, at least 1, no more than 4
async_compute: TaskPoolThreadAssignmentPolicy {
min_threads: 1,
max_threads: 4,
percent: 0.25,
async_compute: TaskPoolPolicies {
assignment_policy: TaskPoolThreadAssignmentPolicy {
min_threads: 1,
max_threads: 4,
percent: 0.25,
},
panic_policy: TaskPoolThreadPanicPolicy::Restart,
},

// Use all remaining cores for compute (at least 1)
compute: TaskPoolThreadAssignmentPolicy {
min_threads: 1,
max_threads: std::usize::MAX,
percent: 1.0, // This 1.0 here means "whatever is left over"
compute: TaskPoolPolicies {
assignment_policy: TaskPoolThreadAssignmentPolicy {
min_threads: 1,
max_threads: std::usize::MAX,
percent: 1.0, // This 1.0 here means "whatever is left over"
},
panic_policy: TaskPoolThreadPanicPolicy::Restart,
},
}
}
Expand Down Expand Up @@ -104,6 +124,7 @@ impl DefaultTaskPoolOptions {
// Determine the number of IO threads we will use
let io_threads = self
.io
.assignment_policy
.get_number_of_threads(remaining_threads, total_threads);

trace!("IO Threads: {}", io_threads);
Expand All @@ -113,6 +134,7 @@ impl DefaultTaskPoolOptions {
TaskPoolBuilder::default()
.num_threads(io_threads)
.thread_name("IO Task Pool".to_string())
.panic_policy(self.io.panic_policy)
.build(),
));
}
Expand All @@ -121,6 +143,7 @@ impl DefaultTaskPoolOptions {
// Determine the number of async compute threads we will use
let async_compute_threads = self
.async_compute
.assignment_policy
.get_number_of_threads(remaining_threads, total_threads);

trace!("Async Compute Threads: {}", async_compute_threads);
Expand All @@ -130,6 +153,7 @@ impl DefaultTaskPoolOptions {
TaskPoolBuilder::default()
.num_threads(async_compute_threads)
.thread_name("Async Compute Task Pool".to_string())
.panic_policy(self.async_compute.panic_policy)
.build(),
));
}
Expand All @@ -139,15 +163,26 @@ impl DefaultTaskPoolOptions {
// This is intentionally last so that an end user can specify 1.0 as the percent
let compute_threads = self
.compute
.assignment_policy
.get_number_of_threads(remaining_threads, total_threads);

trace!("Compute Threads: {}", compute_threads);
world.insert_resource(ComputeTaskPool(
TaskPoolBuilder::default()
.num_threads(compute_threads)
.thread_name("Compute Task Pool".to_string())
.panic_policy(self.compute.panic_policy)
.build(),
));
}
}
}

#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn handle_task_pool_panicking_threads_system<
T: TaskPoolTrait + Send + Sync + 'static,
>(
task_pool: Res<T>,
) {
bevy_tasks::handle_task_pool_panicking_threads(&**task_pool);
}
2 changes: 2 additions & 0 deletions crates/bevy_tasks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ keywords = ["bevy"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tracing = { version = "0.1", features = ["release_max_level_info"] }
NathanSWard marked this conversation as resolved.
Show resolved Hide resolved
parking_lot = "0.11.0"
futures-lite = "1.4.0"
event-listener = "2.4.0"
async-executor = "1.3.0"
Expand Down
7 changes: 5 additions & 2 deletions crates/bevy_tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@ pub use slice::{ParallelSlice, ParallelSliceMut};
mod task;
pub use task::Task;

mod task_pool_common;
pub use task_pool_common::TaskPoolThreadPanicPolicy;

#[cfg(not(target_arch = "wasm32"))]
mod task_pool;
#[cfg(not(target_arch = "wasm32"))]
pub use task_pool::{Scope, TaskPool, TaskPoolBuilder};
pub use task_pool::{handle_task_pool_panicking_threads, Scope, TaskPool, TaskPoolBuilder};

#[cfg(target_arch = "wasm32")]
mod single_threaded_task_pool;
#[cfg(target_arch = "wasm32")]
pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder};

mod usages;
pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool};
pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool, TaskPoolTrait};

mod countdown_event;
pub use countdown_event::CountdownEvent;
Expand Down
5 changes: 5 additions & 0 deletions crates/bevy_tasks/src/single_threaded_task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
mem,
sync::{Arc, Mutex},
};
use crate::TaskPoolThreadPanicPolicy;

/// Used to create a TaskPool
#[derive(Debug, Default, Clone)]
Expand All @@ -26,6 +27,10 @@ impl TaskPoolBuilder {
self
}

pub fn panic_policy(self, _policy: TaskPoolThreadPanicPolicy) -> Self {
self
}

pub fn build(self) -> TaskPool {
TaskPool::new_internal()
}
Expand Down
Loading