From cb415ff9959245274e6ecc5685ba91078b25cf27 Mon Sep 17 00:00:00 2001 From: james7132 Date: Tue, 25 May 2021 03:56:20 -0700 Subject: [PATCH 01/16] Add global init and get accessors for all newtyped TaskPools --- crates/bevy_asset/src/lib.rs | 3 +- crates/bevy_core/src/task_pool_options.rs | 46 ++++++----- .../src/schedule/executor_parallel.rs | 5 +- crates/bevy_tasks/Cargo.toml | 2 + crates/bevy_tasks/src/usages.rs | 77 ++++++++++++++++++- 5 files changed, 110 insertions(+), 23 deletions(-) diff --git a/crates/bevy_asset/src/lib.rs b/crates/bevy_asset/src/lib.rs index 62808e38a42c2..5bb5cbc2070a6 100644 --- a/crates/bevy_asset/src/lib.rs +++ b/crates/bevy_asset/src/lib.rs @@ -32,6 +32,7 @@ use bevy_ecs::{ system::IntoSystem, }; use bevy_tasks::IoTaskPool; +use std::ops::Deref; /// The names of asset stages in an App Schedule #[derive(Debug, Hash, PartialEq, Eq, Clone, StageLabel)] @@ -83,7 +84,7 @@ impl Plugin for AssetPlugin { .world() .get_resource::() .expect("`IoTaskPool` resource not found.") - .0 + .deref() .clone(); let source = create_platform_default_asset_io(app); diff --git a/crates/bevy_core/src/task_pool_options.rs b/crates/bevy_core/src/task_pool_options.rs index 19c9dad5bfe2f..401e88f9c3646 100644 --- a/crates/bevy_core/src/task_pool_options.rs +++ b/crates/bevy_core/src/task_pool_options.rs @@ -109,12 +109,15 @@ impl DefaultTaskPoolOptions { trace!("IO Threads: {}", io_threads); remaining_threads = remaining_threads.saturating_sub(io_threads); - world.insert_resource(IoTaskPool( - TaskPoolBuilder::default() - .num_threads(io_threads) - .thread_name("IO Task Pool".to_string()) - .build(), - )); + let task_pool = TaskPoolBuilder::default() + .num_threads(io_threads) + .thread_name("IO Task Pool".to_string()) + .build(); + + let io_task_pool = + IoTaskPool::init(task_pool).unwrap_or_else(|_| IoTaskPool::get().clone()); + + world.insert_resource(io_task_pool); } if !world.contains_resource::() { @@ -126,12 +129,15 @@ impl DefaultTaskPoolOptions { trace!("Async Compute Threads: {}", async_compute_threads); remaining_threads = remaining_threads.saturating_sub(async_compute_threads); - world.insert_resource(AsyncComputeTaskPool( - TaskPoolBuilder::default() - .num_threads(async_compute_threads) - .thread_name("Async Compute Task Pool".to_string()) - .build(), - )); + let task_pool = TaskPoolBuilder::default() + .num_threads(async_compute_threads) + .thread_name("Async Compute Task Pool".to_string()) + .build(); + + let async_task_pool = AsyncComputeTaskPool::init(task_pool) + .unwrap_or_else(|_| AsyncComputeTaskPool::get().clone()); + + world.insert_resource(async_task_pool); } if !world.contains_resource::() { @@ -142,12 +148,16 @@ impl DefaultTaskPoolOptions { .get_number_of_threads(remaining_threads, total_threads); trace!("Compute Threads: {}", compute_threads); - world.insert_resource(ComputeTaskPool( - TaskPoolBuilder::default() - .num_threads(compute_threads) - .thread_name("Compute Task Pool".to_string()) - .build(), - )); + + let task_pool = TaskPoolBuilder::default() + .num_threads(compute_threads) + .thread_name("Compute Task Pool".to_string()) + .build(); + + let compute_task_pool = + ComputeTaskPool::init(task_pool).unwrap_or_else(|_| ComputeTaskPool::get().clone()); + + world.insert_resource(compute_task_pool); } } } diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index af989ef9eecc1..4fbe70179f3fb 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -115,7 +115,10 @@ impl ParallelSystemExecutor for ParallelExecutor { self.update_archetypes(systems, world); let compute_pool = world - .get_resource_or_insert_with(|| ComputeTaskPool(TaskPool::default())) + .get_resource_or_insert_with(|| { + ComputeTaskPool::init(TaskPool::default()) + .unwrap_or_else(|_| ComputeTaskPool::get().clone()) + }) .clone(); compute_pool.scope(|scope| { self.prepare_systems(scope, systems, world); diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index f2c1bf41f396b..08b0092c00fb4 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -22,5 +22,7 @@ async-executor = "1.3.0" async-channel = "1.4.2" instant = { version = "0.1", features = ["wasm-bindgen"] } num_cpus = "1" +once_cell = "1.7" + [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen-futures = "0.4" diff --git a/crates/bevy_tasks/src/usages.rs b/crates/bevy_tasks/src/usages.rs index 7adf60639b116..56db0b67b0844 100644 --- a/crates/bevy_tasks/src/usages.rs +++ b/crates/bevy_tasks/src/usages.rs @@ -11,12 +11,39 @@ //! for consumption. (likely via channels) use super::TaskPool; +use once_cell::sync::OnceCell; use std::ops::Deref; +static COMPUTE_TASK_POOL: OnceCell = OnceCell::new(); +static ASYNC_COMPUTE_TASK_POOL: OnceCell = OnceCell::new(); +static IO_TASK_POOL: OnceCell = OnceCell::new(); + /// A newtype for a task pool for CPU-intensive work that must be completed to deliver the next /// frame #[derive(Clone, Debug)] -pub struct ComputeTaskPool(pub TaskPool); +pub struct ComputeTaskPool(TaskPool); + +impl ComputeTaskPool { + /// Initializes the global ComputeTaskPool instance. + /// + /// Returns the provided `[TaskPool]` the global instance has already been initialized. + pub fn init(task_pool: TaskPool) -> Result { + COMPUTE_TASK_POOL + .set(Self(task_pool)) + .map(|_| Self::get().clone()) + .map_err(|pool| pool.0) + } + + /// Gets the global ComputeTaskPool instance. + /// + /// Panics if no pool has been initialized yet. + pub fn get() -> &'static Self { + COMPUTE_TASK_POOL.get().expect( + "A ComputeTaskPool has not been initialized yet. Please call \ + ComputeTaskPool::init beforehand.", + ) + } +} impl Deref for ComputeTaskPool { type Target = TaskPool; @@ -28,7 +55,29 @@ impl Deref for ComputeTaskPool { /// A newtype for a task pool for CPU-intensive work that may span across multiple frames #[derive(Clone, Debug)] -pub struct AsyncComputeTaskPool(pub TaskPool); +pub struct AsyncComputeTaskPool(TaskPool); + +impl AsyncComputeTaskPool { + /// Initializes the global AsyncComputeTaskPool instance. + /// + /// Returns the provided `[TaskPool]` the global instance has already been initialized. + pub fn init(task_pool: TaskPool) -> Result { + ASYNC_COMPUTE_TASK_POOL + .set(Self(task_pool)) + .map(|_| Self::get().clone()) + .map_err(|pool| pool.0) + } + + /// Gets the global AsyncComputeTaskPool instance. + /// + /// Panics if no pool has been initialized yet. + pub fn get() -> &'static Self { + ASYNC_COMPUTE_TASK_POOL.get().expect( + "A AsyncComputeTaskPool has not been initialized yet. Please call \ + AsyncComputeTaskPool::init beforehand.", + ) + } +} impl Deref for AsyncComputeTaskPool { type Target = TaskPool; @@ -41,7 +90,29 @@ impl Deref for AsyncComputeTaskPool { /// A newtype for a task pool for IO-intensive work (i.e. tasks that spend very little time in a /// "woken" state) #[derive(Clone, Debug)] -pub struct IoTaskPool(pub TaskPool); +pub struct IoTaskPool(TaskPool); + +impl IoTaskPool { + /// Initializes the global IoTaskPool instance. + /// + /// Returns the provided `[TaskPool]` the global instance has already been initialized. + pub fn init(task_pool: TaskPool) -> Result { + IO_TASK_POOL + .set(Self(task_pool)) + .map(|_| Self::get().clone()) + .map_err(|pool| pool.0) + } + + /// Gets the global IoTaskPool instance. + /// + /// Panics if no pool has been initialized yet. + pub fn get() -> &'static Self { + IO_TASK_POOL.get().expect( + "A IoTaskPool has not been initialized yet. Please call \ + IoTaskPool::init beforehand.", + ) + } +} impl Deref for IoTaskPool { type Target = TaskPool; From 69f5e2372498954755d8198067959f6ec0cc2126 Mon Sep 17 00:00:00 2001 From: james7132 Date: Tue, 25 May 2021 07:36:19 -0700 Subject: [PATCH 02/16] Change init to return a static reference instead of a cloned copy --- crates/bevy_core/src/task_pool_options.rs | 11 +++++++---- crates/bevy_ecs/src/schedule/executor_parallel.rs | 1 + crates/bevy_tasks/src/usages.rs | 12 ++++++------ 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/crates/bevy_core/src/task_pool_options.rs b/crates/bevy_core/src/task_pool_options.rs index 401e88f9c3646..08eb10e609a01 100644 --- a/crates/bevy_core/src/task_pool_options.rs +++ b/crates/bevy_core/src/task_pool_options.rs @@ -114,8 +114,9 @@ impl DefaultTaskPoolOptions { .thread_name("IO Task Pool".to_string()) .build(); - let io_task_pool = - IoTaskPool::init(task_pool).unwrap_or_else(|_| IoTaskPool::get().clone()); + let io_task_pool = IoTaskPool::init(task_pool) + .map(|pool| pool.clone()) + .unwrap_or_else(|_| IoTaskPool::get().clone()); world.insert_resource(io_task_pool); } @@ -135,6 +136,7 @@ impl DefaultTaskPoolOptions { .build(); let async_task_pool = AsyncComputeTaskPool::init(task_pool) + .map(|pool| pool.clone()) .unwrap_or_else(|_| AsyncComputeTaskPool::get().clone()); world.insert_resource(async_task_pool); @@ -154,8 +156,9 @@ impl DefaultTaskPoolOptions { .thread_name("Compute Task Pool".to_string()) .build(); - let compute_task_pool = - ComputeTaskPool::init(task_pool).unwrap_or_else(|_| ComputeTaskPool::get().clone()); + let compute_task_pool = ComputeTaskPool::init(task_pool) + .map(|pool| pool.clone()) + .unwrap_or_else(|_| ComputeTaskPool::get().clone()); world.insert_resource(compute_task_pool); } diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index 4fbe70179f3fb..2351a3dbe6bb9 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -117,6 +117,7 @@ impl ParallelSystemExecutor for ParallelExecutor { let compute_pool = world .get_resource_or_insert_with(|| { ComputeTaskPool::init(TaskPool::default()) + .map(|pool| pool.clone()) .unwrap_or_else(|_| ComputeTaskPool::get().clone()) }) .clone(); diff --git a/crates/bevy_tasks/src/usages.rs b/crates/bevy_tasks/src/usages.rs index 56db0b67b0844..75c570d1f023c 100644 --- a/crates/bevy_tasks/src/usages.rs +++ b/crates/bevy_tasks/src/usages.rs @@ -27,10 +27,10 @@ impl ComputeTaskPool { /// Initializes the global ComputeTaskPool instance. /// /// Returns the provided `[TaskPool]` the global instance has already been initialized. - pub fn init(task_pool: TaskPool) -> Result { + pub fn init(task_pool: TaskPool) -> Result<&'static Self, TaskPool> { COMPUTE_TASK_POOL .set(Self(task_pool)) - .map(|_| Self::get().clone()) + .map(|_| Self::get()) .map_err(|pool| pool.0) } @@ -61,10 +61,10 @@ impl AsyncComputeTaskPool { /// Initializes the global AsyncComputeTaskPool instance. /// /// Returns the provided `[TaskPool]` the global instance has already been initialized. - pub fn init(task_pool: TaskPool) -> Result { + pub fn init(task_pool: TaskPool) -> Result<&'static Self, TaskPool> { ASYNC_COMPUTE_TASK_POOL .set(Self(task_pool)) - .map(|_| Self::get().clone()) + .map(|_| Self::get()) .map_err(|pool| pool.0) } @@ -96,10 +96,10 @@ impl IoTaskPool { /// Initializes the global IoTaskPool instance. /// /// Returns the provided `[TaskPool]` the global instance has already been initialized. - pub fn init(task_pool: TaskPool) -> Result { + pub fn init(task_pool: TaskPool) -> Result<&'static Self, TaskPool> { IO_TASK_POOL .set(Self(task_pool)) - .map(|_| Self::get().clone()) + .map(|_| Self::get()) .map_err(|pool| pool.0) } From f75eb85e40a63223423167a9b79dd17fbdf0e3d0 Mon Sep 17 00:00:00 2001 From: james7132 Date: Tue, 25 May 2021 09:02:44 -0700 Subject: [PATCH 03/16] Doc comment fixes --- crates/bevy_tasks/src/usages.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/bevy_tasks/src/usages.rs b/crates/bevy_tasks/src/usages.rs index 75c570d1f023c..08ecb05263905 100644 --- a/crates/bevy_tasks/src/usages.rs +++ b/crates/bevy_tasks/src/usages.rs @@ -26,7 +26,7 @@ pub struct ComputeTaskPool(TaskPool); impl ComputeTaskPool { /// Initializes the global ComputeTaskPool instance. /// - /// Returns the provided `[TaskPool]` the global instance has already been initialized. + /// Returns the provided `[TaskPool]` if the global instance has already been initialized. pub fn init(task_pool: TaskPool) -> Result<&'static Self, TaskPool> { COMPUTE_TASK_POOL .set(Self(task_pool)) @@ -36,6 +36,7 @@ impl ComputeTaskPool { /// Gets the global ComputeTaskPool instance. /// + /// # Panics /// Panics if no pool has been initialized yet. pub fn get() -> &'static Self { COMPUTE_TASK_POOL.get().expect( @@ -60,7 +61,7 @@ pub struct AsyncComputeTaskPool(TaskPool); impl AsyncComputeTaskPool { /// Initializes the global AsyncComputeTaskPool instance. /// - /// Returns the provided `[TaskPool]` the global instance has already been initialized. + /// Returns the provided `[TaskPool]` if the global instance has already been initialized. pub fn init(task_pool: TaskPool) -> Result<&'static Self, TaskPool> { ASYNC_COMPUTE_TASK_POOL .set(Self(task_pool)) @@ -70,6 +71,7 @@ impl AsyncComputeTaskPool { /// Gets the global AsyncComputeTaskPool instance. /// + /// # Panics /// Panics if no pool has been initialized yet. pub fn get() -> &'static Self { ASYNC_COMPUTE_TASK_POOL.get().expect( @@ -95,7 +97,7 @@ pub struct IoTaskPool(TaskPool); impl IoTaskPool { /// Initializes the global IoTaskPool instance. /// - /// Returns the provided `[TaskPool]` the global instance has already been initialized. + /// Returns the provided `[TaskPool]` if the global instance has already been initialized. pub fn init(task_pool: TaskPool) -> Result<&'static Self, TaskPool> { IO_TASK_POOL .set(Self(task_pool)) @@ -105,6 +107,7 @@ impl IoTaskPool { /// Gets the global IoTaskPool instance. /// + /// # Panics /// Panics if no pool has been initialized yet. pub fn get() -> &'static Self { IO_TASK_POOL.get().expect( From f2fc6f8da3bb9922acf0344f115bb13b980d42cb Mon Sep 17 00:00:00 2001 From: james7132 Date: Tue, 25 May 2021 16:05:43 -0700 Subject: [PATCH 04/16] Update relevant examples --- examples/asset/custom_asset_io.rs | 12 +++++------- examples/async_tasks/async_compute.rs | 3 ++- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/examples/asset/custom_asset_io.rs b/examples/asset/custom_asset_io.rs index 6c2b01126c319..5a87ced715a4e 100644 --- a/examples/asset/custom_asset_io.rs +++ b/examples/asset/custom_asset_io.rs @@ -3,7 +3,10 @@ use bevy::{ prelude::*, utils::BoxedFuture, }; -use std::path::{Path, PathBuf}; +use std::{ + ops::Deref, + path::{Path, PathBuf}, +}; /// A custom asset io implementation that simply defers to the platform default /// implementation. @@ -49,12 +52,7 @@ impl Plugin for CustomAssetIoPlugin { fn build(&self, app: &mut AppBuilder) { // must get a hold of the task pool in order to create the asset server - let task_pool = app - .world() - .get_resource::() - .expect("`IoTaskPool` resource not found.") - .0 - .clone(); + let task_pool = bevy::tasks::IoTaskPool::get().deref().clone(); let asset_io = { // the platform default asset io requires a reference to the app diff --git a/examples/async_tasks/async_compute.rs b/examples/async_tasks/async_compute.rs index a02433cd1ac72..d660bb0bfa2ee 100644 --- a/examples/async_tasks/async_compute.rs +++ b/examples/async_tasks/async_compute.rs @@ -45,7 +45,8 @@ fn add_assets( /// work that potentially spans multiple frames/ticks. A separate /// system, handle_tasks, will poll the spawned tasks on subsequent /// frames/ticks, and use the results to spawn cubes -fn spawn_tasks(mut commands: Commands, thread_pool: Res) { +fn spawn_tasks(mut commands: Commands) { + let thread_pool = AsyncComputeTaskPool::get(); for x in 0..NUM_CUBES { for y in 0..NUM_CUBES { for z in 0..NUM_CUBES { From 65bd41f3dcb90f675cf01f334749fe9e774095f3 Mon Sep 17 00:00:00 2001 From: james7132 Date: Mon, 9 May 2022 12:45:34 -0700 Subject: [PATCH 05/16] Remove task_pool parameter from par_for_each(_mut) --- .../bevy_ecs/ecs_bench_suite/heavy_compute.rs | 6 ++-- crates/bevy_ecs/src/lib.rs | 10 +++---- crates/bevy_ecs/src/query/state.rs | 28 +++++++++++-------- crates/bevy_ecs/src/system/query.rs | 22 +++++++++------ examples/ecs/parallel_query.rs | 16 ++++------- 5 files changed, 43 insertions(+), 39 deletions(-) diff --git a/benches/benches/bevy_ecs/ecs_bench_suite/heavy_compute.rs b/benches/benches/bevy_ecs/ecs_bench_suite/heavy_compute.rs index 4ddae1781ea1b..58dae22e89507 100644 --- a/benches/benches/bevy_ecs/ecs_bench_suite/heavy_compute.rs +++ b/benches/benches/bevy_ecs/ecs_bench_suite/heavy_compute.rs @@ -29,8 +29,8 @@ impl Benchmark { ) })); - fn sys(task_pool: Res, mut query: Query<(&mut Position, &mut Transform)>) { - query.par_for_each_mut(&task_pool, 128, |(mut pos, mut mat)| { + fn sys(mut query: Query<(&mut Position, &mut Transform)>) { + query.par_for_each_mut(128, |(mut pos, mut mat)| { for _ in 0..100 { mat.0 = mat.0.inverse(); } @@ -39,7 +39,7 @@ impl Benchmark { }); } - world.insert_resource(TaskPool::default()); + world.insert_resource(ComputeTaskPool(TaskPool::default())); let mut system = IntoSystem::into_system(sys); system.initialize(&mut world); system.update_archetype_component_access(&world); diff --git a/crates/bevy_ecs/src/lib.rs b/crates/bevy_ecs/src/lib.rs index 15a5ac64a8d22..5c2f3a71316d6 100644 --- a/crates/bevy_ecs/src/lib.rs +++ b/crates/bevy_ecs/src/lib.rs @@ -56,7 +56,7 @@ mod tests { query::{Added, ChangeTrackers, Changed, FilteredAccess, With, Without, WorldQuery}, world::{Mut, World}, }; - use bevy_tasks::TaskPool; + use bevy_tasks::{ComputeTaskPool, TaskPool}; use std::{ any::TypeId, sync::{ @@ -373,7 +373,7 @@ mod tests { #[test] fn par_for_each_dense() { let mut world = World::new(); - let task_pool = TaskPool::default(); + world.insert_resource(ComputeTaskPool(TaskPool::default())); let e1 = world.spawn().insert(A(1)).id(); let e2 = world.spawn().insert(A(2)).id(); let e3 = world.spawn().insert(A(3)).id(); @@ -382,7 +382,7 @@ mod tests { let results = Arc::new(Mutex::new(Vec::new())); world .query::<(Entity, &A)>() - .par_for_each(&world, &task_pool, 2, |(e, &A(i))| { + .par_for_each(&world, 2, |(e, &A(i))| { results.lock().unwrap().push((e, i)); }); results.lock().unwrap().sort(); @@ -395,8 +395,7 @@ mod tests { #[test] fn par_for_each_sparse() { let mut world = World::new(); - - let task_pool = TaskPool::default(); + world.insert_resource(ComputeTaskPool(TaskPool::default())); let e1 = world.spawn().insert(SparseStored(1)).id(); let e2 = world.spawn().insert(SparseStored(2)).id(); let e3 = world.spawn().insert(SparseStored(3)).id(); @@ -405,7 +404,6 @@ mod tests { let results = Arc::new(Mutex::new(Vec::new())); world.query::<(Entity, &SparseStored)>().par_for_each( &world, - &task_pool, 2, |(e, &SparseStored(i))| results.lock().unwrap().push((e, i)), ); diff --git a/crates/bevy_ecs/src/query/state.rs b/crates/bevy_ecs/src/query/state.rs index e01d8df5f2b35..da81765bf1d24 100644 --- a/crates/bevy_ecs/src/query/state.rs +++ b/crates/bevy_ecs/src/query/state.rs @@ -10,7 +10,7 @@ use crate::{ storage::TableId, world::{World, WorldId}, }; -use bevy_tasks::TaskPool; +use bevy_tasks::ComputeTaskPool; use fixedbitset::FixedBitSet; use std::fmt; @@ -683,15 +683,17 @@ impl QueryState { ); } - /// Runs `func` on each query result in parallel using the given `task_pool`. + /// Runs `func` on each query result in parallel. /// /// This can only be called for read-only queries, see [`Self::par_for_each_mut`] for /// write-queries. + /// + /// # Panics + /// [`ComputeTaskPool`] is not stored as a resource in `world`. #[inline] pub fn par_for_each<'w, FN: Fn(ROQueryItem<'w, Q>) + Send + Sync + Clone>( &mut self, world: &'w World, - task_pool: &TaskPool, batch_size: usize, func: FN, ) { @@ -700,7 +702,6 @@ impl QueryState { self.update_archetypes(world); self.par_for_each_unchecked_manual::, FN>( world, - task_pool, batch_size, func, world.last_change_tick(), @@ -709,12 +710,14 @@ impl QueryState { } } - /// Runs `func` on each query result in parallel using the given `task_pool`. + /// Runs `func` on each query result in parallel. + /// + /// # Panics + /// [`ComputeTaskPool`] is not stored as a resource in `world`. #[inline] pub fn par_for_each_mut<'w, FN: Fn(QueryItem<'w, Q>) + Send + Sync + Clone>( &mut self, world: &'w mut World, - task_pool: &TaskPool, batch_size: usize, func: FN, ) { @@ -723,7 +726,6 @@ impl QueryState { self.update_archetypes(world); self.par_for_each_unchecked_manual::, FN>( world, - task_pool, batch_size, func, world.last_change_tick(), @@ -732,10 +734,13 @@ impl QueryState { } } - /// Runs `func` on each query result in parallel using the given `task_pool`. + /// Runs `func` on each query result in parallel. /// /// This can only be called for read-only queries. /// + /// # Panics + /// [`ComputeTaskPool`] is not stored as a resource in `world`. + /// /// # Safety /// /// This does not check for mutable query correctness. To be safe, make sure mutable queries @@ -744,14 +749,12 @@ impl QueryState { pub unsafe fn par_for_each_unchecked<'w, FN: Fn(QueryItem<'w, Q>) + Send + Sync + Clone>( &mut self, world: &'w World, - task_pool: &TaskPool, batch_size: usize, func: FN, ) { self.update_archetypes(world); self.par_for_each_unchecked_manual::, FN>( world, - task_pool, batch_size, func, world.last_change_tick(), @@ -827,6 +830,9 @@ impl QueryState { /// the current change tick are given. This is faster than the equivalent /// iter() method, but cannot be chained like a normal [`Iterator`]. /// + /// # Panics + /// [`ComputeTaskPool`] is not stored as a resource in `world`. + /// /// # Safety /// /// This does not check for mutable query correctness. To be safe, make sure mutable queries @@ -840,12 +846,12 @@ impl QueryState { >( &self, world: &'w World, - task_pool: &TaskPool, batch_size: usize, func: FN, last_change_tick: u32, change_tick: u32, ) { + let task_pool = world.resource::().clone(); // NOTE: If you are changing query iteration code, remember to update the following places, where relevant: // QueryIter, QueryIterationCursor, QueryState::for_each_unchecked_manual, QueryState::par_for_each_unchecked_manual task_pool.scope(|scope| { diff --git a/crates/bevy_ecs/src/system/query.rs b/crates/bevy_ecs/src/system/query.rs index c65c85b487433..b9e4d34bc98b3 100644 --- a/crates/bevy_ecs/src/system/query.rs +++ b/crates/bevy_ecs/src/system/query.rs @@ -7,7 +7,6 @@ use crate::{ }, world::{Mut, World}, }; -use bevy_tasks::TaskPool; use std::{any::TypeId, fmt::Debug}; /// Provides scoped access to components in a [`World`]. @@ -493,7 +492,7 @@ impl<'w, 's, Q: WorldQuery, F: WorldQuery> Query<'w, 's, Q, F> { }; } - /// Runs `f` on each query result in parallel using the given [`TaskPool`]. + /// Runs `f` on each query result in parallel using the [`World`]'s [`ComputeTaskPool`]. /// /// This can only be called for immutable data, see [`Self::par_for_each_mut`] for /// mutable access. @@ -502,7 +501,7 @@ impl<'w, 's, Q: WorldQuery, F: WorldQuery> Query<'w, 's, Q, F> { /// /// The items in the query get sorted into batches. /// Internally, this function spawns a group of futures that each take on a `batch_size` sized section of the items (or less if the division is not perfect). - /// Then, the tasks in the [`TaskPool`] work through these futures. + /// Then, the tasks in the [`ComputeTaskPool`] work through these futures. /// /// You can use this value to tune between maximum multithreading ability (many small batches) and minimum parallelization overhead (few big batches). /// Rule of thumb: If the function body is (mostly) computationally expensive but there are not many items, a small batch size (=more batches) may help to even out the load. @@ -510,13 +509,16 @@ impl<'w, 's, Q: WorldQuery, F: WorldQuery> Query<'w, 's, Q, F> { /// /// # Arguments /// - ///* `task_pool` - The [`TaskPool`] to use ///* `batch_size` - The number of batches to spawn ///* `f` - The function to run on each item in the query + /// + /// # Panics + /// [`ComputeTaskPool`] is not stored as a resource in `world`. + /// + /// [`ComputeTaskPool`]: bevy_tasks::prelude::ComputeTaskPool #[inline] pub fn par_for_each<'this>( &'this self, - task_pool: &TaskPool, batch_size: usize, f: impl Fn(ROQueryItem<'this, Q>) + Send + Sync + Clone, ) { @@ -526,7 +528,6 @@ impl<'w, 's, Q: WorldQuery, F: WorldQuery> Query<'w, 's, Q, F> { self.state .par_for_each_unchecked_manual::, _>( self.world, - task_pool, batch_size, f, self.last_change_tick, @@ -535,12 +536,16 @@ impl<'w, 's, Q: WorldQuery, F: WorldQuery> Query<'w, 's, Q, F> { }; } - /// Runs `f` on each query result in parallel using the given [`TaskPool`]. + /// Runs `f` on each query result in parallel using the [`World`]'s [`ComputeTaskPool`]. /// See [`Self::par_for_each`] for more details. + /// + /// # Panics + /// [`ComputeTaskPool`] is not stored as a resource in `world`. + /// + /// [`ComputeTaskPool`]: bevy_tasks::prelude::ComputeTaskPool #[inline] pub fn par_for_each_mut<'a, FN: Fn(QueryItem<'a, Q>) + Send + Sync + Clone>( &'a mut self, - task_pool: &TaskPool, batch_size: usize, f: FN, ) { @@ -550,7 +555,6 @@ impl<'w, 's, Q: WorldQuery, F: WorldQuery> Query<'w, 's, Q, F> { self.state .par_for_each_unchecked_manual::, FN>( self.world, - task_pool, batch_size, f, self.last_change_tick, diff --git a/examples/ecs/parallel_query.rs b/examples/ecs/parallel_query.rs index a70eecf6d8b1a..a26706577d694 100644 --- a/examples/ecs/parallel_query.rs +++ b/examples/ecs/parallel_query.rs @@ -1,4 +1,4 @@ -use bevy::{prelude::*, tasks::prelude::*}; +use bevy::prelude::*; use rand::random; #[derive(Component, Deref)] @@ -21,26 +21,22 @@ fn spawn_system(mut commands: Commands, asset_server: Res) { } // Move sprites according to their velocity -fn move_system(pool: Res, mut sprites: Query<(&mut Transform, &Velocity)>) { +fn move_system(mut sprites: Query<(&mut Transform, &Velocity)>) { // Compute the new location of each sprite in parallel on the // ComputeTaskPool using batches of 32 sprites // - // This example is only for demonstrative purposes. Using a + // This example is only for demonstrative purposes. Using a // ParallelIterator for an inexpensive operation like addition on only 128 // elements will not typically be faster than just using a normal Iterator. // See the ParallelIterator documentation for more information on when // to use or not use ParallelIterator over a normal Iterator. - sprites.par_for_each_mut(&pool, 32, |(mut transform, velocity)| { + sprites.par_for_each_mut(32, |(mut transform, velocity)| { transform.translation += velocity.extend(0.0); }); } // Bounce sprites outside the window -fn bounce_system( - pool: Res, - windows: Res, - mut sprites: Query<(&Transform, &mut Velocity)>, -) { +fn bounce_system(windows: Res, mut sprites: Query<(&Transform, &mut Velocity)>) { let window = windows.primary(); let width = window.width(); let height = window.height(); @@ -51,7 +47,7 @@ fn bounce_system( sprites // Batch size of 32 is chosen to limit the overhead of // ParallelIterator, since negating a vector is very inexpensive. - .par_for_each_mut(&pool, 32, |(transform, mut v)| { + .par_for_each_mut(32, |(transform, mut v)| { if !(left < transform.translation.x && transform.translation.x < right && bottom < transform.translation.y From 8b32bcb39e614b13dc21007b8207b9711fe4b1dc Mon Sep 17 00:00:00 2001 From: james7132 Date: Mon, 9 May 2022 20:44:24 -0700 Subject: [PATCH 06/16] Remove Clone on TaskPool --- crates/bevy_asset/src/asset_server.rs | 14 +++----- crates/bevy_asset/src/lib.rs | 4 --- crates/bevy_asset/src/loader.rs | 8 ----- crates/bevy_gltf/Cargo.toml | 1 + crates/bevy_gltf/src/loader.rs | 4 +-- crates/bevy_tasks/src/task_pool.rs | 47 ++++++++++++--------------- crates/bevy_tasks/src/usages.rs | 6 ++-- 7 files changed, 31 insertions(+), 53 deletions(-) diff --git a/crates/bevy_asset/src/asset_server.rs b/crates/bevy_asset/src/asset_server.rs index 04d1f3bda07d4..ba3e48c7112e6 100644 --- a/crates/bevy_asset/src/asset_server.rs +++ b/crates/bevy_asset/src/asset_server.rs @@ -7,7 +7,7 @@ use crate::{ use anyhow::Result; use bevy_ecs::system::{Res, ResMut}; use bevy_log::warn; -use bevy_tasks::TaskPool; +use bevy_tasks::IoTaskPool; use bevy_utils::{Entry, HashMap, Uuid}; use crossbeam_channel::TryRecvError; use parking_lot::{Mutex, RwLock}; @@ -56,7 +56,6 @@ pub struct AssetServerInternal { loaders: RwLock>>, extension_to_loader_index: RwLock>, handle_to_path: Arc>>>, - task_pool: TaskPool, } /// Loads assets from the filesystem on background threads @@ -66,11 +65,11 @@ pub struct AssetServer { } impl AssetServer { - pub fn new(source_io: T, task_pool: TaskPool) -> Self { - Self::with_boxed_io(Box::new(source_io), task_pool) + pub fn new(source_io: T) -> Self { + Self::with_boxed_io(Box::new(source_io)) } - pub fn with_boxed_io(asset_io: Box, task_pool: TaskPool) -> Self { + pub fn with_boxed_io(asset_io: Box) -> Self { AssetServer { server: Arc::new(AssetServerInternal { loaders: Default::default(), @@ -79,7 +78,6 @@ impl AssetServer { asset_ref_counter: Default::default(), handle_to_path: Default::default(), asset_lifecycles: Default::default(), - task_pool, asset_io, }), } @@ -315,7 +313,6 @@ impl AssetServer { &self.server.asset_ref_counter.channel, self.asset_io(), version, - &self.server.task_pool, ); if let Err(err) = asset_loader @@ -377,8 +374,7 @@ impl AssetServer { pub(crate) fn load_untracked(&self, asset_path: AssetPath<'_>, force: bool) -> HandleId { let server = self.clone(); let owned_path = asset_path.to_owned(); - self.server - .task_pool + IoTaskPool::get() .spawn(async move { if let Err(err) = server.load_async(owned_path, force).await { warn!("{}", err); diff --git a/crates/bevy_asset/src/lib.rs b/crates/bevy_asset/src/lib.rs index f321a9475f643..b5ba1a1854d02 100644 --- a/crates/bevy_asset/src/lib.rs +++ b/crates/bevy_asset/src/lib.rs @@ -30,8 +30,6 @@ pub use path::*; use bevy_app::{prelude::Plugin, App}; use bevy_ecs::schedule::{StageLabel, SystemStage}; -use bevy_tasks::IoTaskPool; -use std::ops::Deref; /// The names of asset stages in an App Schedule #[derive(Debug, Hash, PartialEq, Eq, Clone, StageLabel)] @@ -81,8 +79,6 @@ pub fn create_platform_default_asset_io(app: &mut App) -> Box { } impl Plugin for AssetPlugin { - fn build(&self, app: &mut AppBuilder) { - if app.world().get_resource::().is_none() { fn build(&self, app: &mut App) { if !app.world.contains_resource::() { let source = create_platform_default_asset_io(app); diff --git a/crates/bevy_asset/src/loader.rs b/crates/bevy_asset/src/loader.rs index 5a5de9b8c11eb..5d6b87d8388ba 100644 --- a/crates/bevy_asset/src/loader.rs +++ b/crates/bevy_asset/src/loader.rs @@ -5,7 +5,6 @@ use crate::{ use anyhow::Result; use bevy_ecs::system::{Res, ResMut}; use bevy_reflect::{TypeUuid, TypeUuidDynamic}; -use bevy_tasks::TaskPool; use bevy_utils::{BoxedFuture, HashMap}; use crossbeam_channel::{Receiver, Sender}; use downcast_rs::{impl_downcast, Downcast}; @@ -84,7 +83,6 @@ pub struct LoadContext<'a> { pub(crate) labeled_assets: HashMap, BoxedLoadedAsset>, pub(crate) path: &'a Path, pub(crate) version: usize, - pub(crate) task_pool: &'a TaskPool, } impl<'a> LoadContext<'a> { @@ -93,7 +91,6 @@ impl<'a> LoadContext<'a> { ref_change_channel: &'a RefChangeChannel, asset_io: &'a dyn AssetIo, version: usize, - task_pool: &'a TaskPool, ) -> Self { Self { ref_change_channel, @@ -101,7 +98,6 @@ impl<'a> LoadContext<'a> { labeled_assets: Default::default(), version, path, - task_pool, } } @@ -144,10 +140,6 @@ impl<'a> LoadContext<'a> { asset_metas } - pub fn task_pool(&self) -> &TaskPool { - self.task_pool - } - pub fn asset_io(&self) -> &dyn AssetIo { self.asset_io } diff --git a/crates/bevy_gltf/Cargo.toml b/crates/bevy_gltf/Cargo.toml index c2cb4bcf6dfd0..03f36ebd06fa7 100644 --- a/crates/bevy_gltf/Cargo.toml +++ b/crates/bevy_gltf/Cargo.toml @@ -23,6 +23,7 @@ bevy_reflect = { path = "../bevy_reflect", version = "0.8.0-dev", features = ["b bevy_render = { path = "../bevy_render", version = "0.8.0-dev" } bevy_scene = { path = "../bevy_scene", version = "0.8.0-dev" } bevy_transform = { path = "../bevy_transform", version = "0.8.0-dev" } +bevy_tasks = { path = "../bevy_tasks", version = "0.8.0-dev" } bevy_utils = { path = "../bevy_utils", version = "0.8.0-dev" } # other diff --git a/crates/bevy_gltf/src/loader.rs b/crates/bevy_gltf/src/loader.rs index 9233d619a8dda..45debd495cde9 100644 --- a/crates/bevy_gltf/src/loader.rs +++ b/crates/bevy_gltf/src/loader.rs @@ -31,6 +31,7 @@ use bevy_render::{ }; use bevy_scene::Scene; use bevy_transform::{components::Transform, TransformBundle}; +use bevy_tasks::IoTaskPool; use bevy_utils::{HashMap, HashSet}; use gltf::{ @@ -394,8 +395,7 @@ async fn load_gltf<'a, 'b>( } } else { #[cfg(not(target_arch = "wasm32"))] - load_context - .task_pool() + IoTaskPool::get() .scope(|scope| { gltf.textures().for_each(|gltf_texture| { let linear_textures = &linear_textures; diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index ebd6ba6b41f4c..1d0f86e7cb5ed 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -60,29 +60,9 @@ impl TaskPoolBuilder { } } -#[derive(Debug)] -struct TaskPoolInner { - threads: Vec>, - shutdown_tx: async_channel::Sender<()>, -} - -impl Drop for TaskPoolInner { - fn drop(&mut self) { - self.shutdown_tx.close(); - - let panicking = thread::panicking(); - for join_handle in self.threads.drain(..) { - let res = join_handle.join(); - if !panicking { - res.expect("Task thread panicked while executing."); - } - } - } -} - /// A thread pool for executing tasks. Tasks are futures that are being automatically driven by /// the pool on threads owned by the pool. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct TaskPool { /// The executor for the pool /// @@ -92,7 +72,8 @@ pub struct TaskPool { executor: Arc>, /// Inner state of the pool - inner: Arc, + threads: Vec>, + shutdown_tx: async_channel::Sender<()>, } impl TaskPool { @@ -155,16 +136,14 @@ impl TaskPool { Self { executor, - inner: Arc::new(TaskPoolInner { - threads, - shutdown_tx, - }), + threads, + shutdown_tx, } } /// Return the number of threads owned by the task pool pub fn thread_num(&self) -> usize { - self.inner.threads.len() + self.threads.len() } /// Allows spawning non-`'static` futures on the thread pool. The function takes a callback, @@ -268,6 +247,20 @@ impl Default for TaskPool { } } +impl Drop for TaskPool { + fn drop(&mut self) { + self.shutdown_tx.close(); + + let panicking = thread::panicking(); + for join_handle in self.threads.drain(..) { + let res = join_handle.join(); + if !panicking { + res.expect("Task thread panicked while executing."); + } + } + } +} + /// A `TaskPool` scope for running one or more non-`'static` futures. /// /// For more information, see [`TaskPool::scope`]. diff --git a/crates/bevy_tasks/src/usages.rs b/crates/bevy_tasks/src/usages.rs index bffae22aa840e..42724a4f0ee19 100644 --- a/crates/bevy_tasks/src/usages.rs +++ b/crates/bevy_tasks/src/usages.rs @@ -20,7 +20,7 @@ static IO_TASK_POOL: OnceCell = OnceCell::new(); /// A newtype for a task pool for CPU-intensive work that must be completed to deliver the next /// frame -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct ComputeTaskPool(TaskPool); impl ComputeTaskPool { @@ -55,7 +55,7 @@ impl Deref for ComputeTaskPool { } /// A newtype for a task pool for CPU-intensive work that may span across multiple frames -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct AsyncComputeTaskPool(TaskPool); impl AsyncComputeTaskPool { @@ -91,7 +91,7 @@ impl Deref for AsyncComputeTaskPool { /// A newtype for a task pool for IO-intensive work (i.e. tasks that spend very little time in a /// "woken" state) -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct IoTaskPool(TaskPool); impl IoTaskPool { From 26413f19dde5542812c423df3962c3eac24110b9 Mon Sep 17 00:00:00 2001 From: james7132 Date: Mon, 9 May 2022 21:03:39 -0700 Subject: [PATCH 07/16] More fixes --- .../bevy_ecs/ecs_bench_suite/heavy_compute.rs | 3 ++- crates/bevy_core/src/task_pool_options.rs | 26 +++++++------------ crates/bevy_ecs/src/query/state.rs | 3 +-- .../src/schedule/executor_parallel.rs | 12 +++------ crates/bevy_gltf/src/loader.rs | 2 +- examples/asset/custom_asset_io.rs | 11 ++------ 6 files changed, 20 insertions(+), 37 deletions(-) diff --git a/benches/benches/bevy_ecs/ecs_bench_suite/heavy_compute.rs b/benches/benches/bevy_ecs/ecs_bench_suite/heavy_compute.rs index 58dae22e89507..912dcb91c1359 100644 --- a/benches/benches/bevy_ecs/ecs_bench_suite/heavy_compute.rs +++ b/benches/benches/bevy_ecs/ecs_bench_suite/heavy_compute.rs @@ -18,6 +18,8 @@ pub struct Benchmark(World, Box>); impl Benchmark { pub fn new() -> Self { + ComputeTaskPool::init(TaskPool::default()); + let mut world = World::default(); world.spawn_batch((0..1000).map(|_| { @@ -39,7 +41,6 @@ impl Benchmark { }); } - world.insert_resource(ComputeTaskPool(TaskPool::default())); let mut system = IntoSystem::into_system(sys); system.initialize(&mut world); system.update_archetype_component_access(&world); diff --git a/crates/bevy_core/src/task_pool_options.rs b/crates/bevy_core/src/task_pool_options.rs index 08eb10e609a01..0c31203800598 100644 --- a/crates/bevy_core/src/task_pool_options.rs +++ b/crates/bevy_core/src/task_pool_options.rs @@ -1,6 +1,6 @@ use bevy_ecs::world::World; use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool, TaskPoolBuilder}; -use bevy_utils::tracing::trace; +use bevy_utils::tracing::{trace, warn}; /// Defines a simple way to determine how many threads to use given the number of remaining cores /// and number of total cores @@ -114,11 +114,9 @@ impl DefaultTaskPoolOptions { .thread_name("IO Task Pool".to_string()) .build(); - let io_task_pool = IoTaskPool::init(task_pool) - .map(|pool| pool.clone()) - .unwrap_or_else(|_| IoTaskPool::get().clone()); - - world.insert_resource(io_task_pool); + if let Err(_) = IoTaskPool::init(task_pool) { + warn!("IoTaskPool already initialized."); + } } if !world.contains_resource::() { @@ -135,11 +133,9 @@ impl DefaultTaskPoolOptions { .thread_name("Async Compute Task Pool".to_string()) .build(); - let async_task_pool = AsyncComputeTaskPool::init(task_pool) - .map(|pool| pool.clone()) - .unwrap_or_else(|_| AsyncComputeTaskPool::get().clone()); - - world.insert_resource(async_task_pool); + if let Err(_) = AsyncComputeTaskPool::init(task_pool) { + warn!("AsynComputeTaskPool already initialized."); + } } if !world.contains_resource::() { @@ -156,11 +152,9 @@ impl DefaultTaskPoolOptions { .thread_name("Compute Task Pool".to_string()) .build(); - let compute_task_pool = ComputeTaskPool::init(task_pool) - .map(|pool| pool.clone()) - .unwrap_or_else(|_| ComputeTaskPool::get().clone()); - - world.insert_resource(compute_task_pool); + if let Err(_) = ComputeTaskPool::init(task_pool) { + warn!("ComputeTaskPool already initialized."); + } } } } diff --git a/crates/bevy_ecs/src/query/state.rs b/crates/bevy_ecs/src/query/state.rs index da81765bf1d24..b676305cfb78e 100644 --- a/crates/bevy_ecs/src/query/state.rs +++ b/crates/bevy_ecs/src/query/state.rs @@ -851,10 +851,9 @@ impl QueryState { last_change_tick: u32, change_tick: u32, ) { - let task_pool = world.resource::().clone(); // NOTE: If you are changing query iteration code, remember to update the following places, where relevant: // QueryIter, QueryIterationCursor, QueryState::for_each_unchecked_manual, QueryState::par_for_each_unchecked_manual - task_pool.scope(|scope| { + ComputeTaskPool::get().scope(|scope| { if QF::IS_DENSE && >::IS_DENSE { let tables = &world.storages().tables; for table_id in &self.matched_table_ids { diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index 7289b7be5ab48..4d4218fbcf9c8 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -56,6 +56,9 @@ pub struct ParallelExecutor { impl Default for ParallelExecutor { fn default() -> Self { + ComputeTaskPool::init(TaskPool::default()) + .map(|pool| pool.clone()) + .unwrap_or_else(|_| ComputeTaskPool::get().clone()); let (finish_sender, finish_receiver) = async_channel::unbounded(); Self { system_metadata: Default::default(), @@ -123,14 +126,7 @@ impl ParallelSystemExecutor for ParallelExecutor { } } - let compute_pool = world - .get_resource_or_insert_with(|| { - ComputeTaskPool::init(TaskPool::default()) - .map(|pool| pool.clone()) - .unwrap_or_else(|_| ComputeTaskPool::get().clone()) - }) - .clone(); - compute_pool.scope(|scope| { + ComputeTaskPool::get().scope(|scope| { self.prepare_systems(scope, systems, world); let parallel_executor = async { // All systems have been ran if there are no queued or running systems. diff --git a/crates/bevy_gltf/src/loader.rs b/crates/bevy_gltf/src/loader.rs index 45debd495cde9..9408c7447a341 100644 --- a/crates/bevy_gltf/src/loader.rs +++ b/crates/bevy_gltf/src/loader.rs @@ -30,8 +30,8 @@ use bevy_render::{ view::VisibleEntities, }; use bevy_scene::Scene; -use bevy_transform::{components::Transform, TransformBundle}; use bevy_tasks::IoTaskPool; +use bevy_transform::{components::Transform, TransformBundle}; use bevy_utils::{HashMap, HashSet}; use gltf::{ diff --git a/examples/asset/custom_asset_io.rs b/examples/asset/custom_asset_io.rs index bc58739022d20..d6c00af25cec9 100644 --- a/examples/asset/custom_asset_io.rs +++ b/examples/asset/custom_asset_io.rs @@ -3,10 +3,7 @@ use bevy::{ prelude::*, utils::BoxedFuture, }; -use std::{ - ops::Deref, - path::{Path, PathBuf}, -}; +use std::path::{Path, PathBuf}; /// A custom asset io implementation that simply defers to the platform default /// implementation. @@ -50,10 +47,6 @@ struct CustomAssetIoPlugin; impl Plugin for CustomAssetIoPlugin { fn build(&self, app: &mut App) { - // must get a hold of the task pool in order to create the asset server - - let task_pool = bevy::tasks::IoTaskPool::get().deref().clone(); - let asset_io = { // the platform default asset io requires a reference to the app // builder to find its configuration @@ -67,7 +60,7 @@ impl Plugin for CustomAssetIoPlugin { // the asset server is constructed and added the resource manager - app.insert_resource(AssetServer::new(asset_io, task_pool)); + app.insert_resource(AssetServer::new(asset_io)); } } From ee4f701e21388550c951600dc6dd71957589051c Mon Sep 17 00:00:00 2001 From: james7132 Date: Mon, 9 May 2022 21:39:09 -0700 Subject: [PATCH 08/16] Fix CI --- .../bevy_ecs/ecs_bench_suite/heavy_compute.rs | 2 +- crates/bevy_asset/src/asset_server.rs | 4 ++-- crates/bevy_asset/src/debug_asset_server.rs | 12 ++++++------ crates/bevy_core/src/lib.rs | 2 +- crates/bevy_core/src/task_pool_options.rs | 15 +++++++-------- crates/bevy_ecs/src/lib.rs | 4 ++-- crates/bevy_ecs/src/schedule/executor_parallel.rs | 4 +--- crates/bevy_tasks/src/usages.rs | 12 ++++++------ 8 files changed, 26 insertions(+), 29 deletions(-) diff --git a/benches/benches/bevy_ecs/ecs_bench_suite/heavy_compute.rs b/benches/benches/bevy_ecs/ecs_bench_suite/heavy_compute.rs index 912dcb91c1359..fa07cd9ccf4be 100644 --- a/benches/benches/bevy_ecs/ecs_bench_suite/heavy_compute.rs +++ b/benches/benches/bevy_ecs/ecs_bench_suite/heavy_compute.rs @@ -1,5 +1,5 @@ use bevy_ecs::prelude::*; -use bevy_tasks::TaskPool; +use bevy_tasks::{ComputeTaskPool, TaskPool}; use glam::*; #[derive(Component, Copy, Clone)] diff --git a/crates/bevy_asset/src/asset_server.rs b/crates/bevy_asset/src/asset_server.rs index ba3e48c7112e6..6b24d4c7037b6 100644 --- a/crates/bevy_asset/src/asset_server.rs +++ b/crates/bevy_asset/src/asset_server.rs @@ -616,8 +616,8 @@ mod test { fn setup(asset_path: impl AsRef) -> AssetServer { use crate::FileAssetIo; - - AssetServer::new(FileAssetIo::new(asset_path, false), Default::default()) + let _ = IoTaskPool::init(Default::default()); + AssetServer::new(FileAssetIo::new(asset_path, false)) } #[test] diff --git a/crates/bevy_asset/src/debug_asset_server.rs b/crates/bevy_asset/src/debug_asset_server.rs index d4970dabcc071..93ced91875474 100644 --- a/crates/bevy_asset/src/debug_asset_server.rs +++ b/crates/bevy_asset/src/debug_asset_server.rs @@ -58,14 +58,14 @@ impl Default for HandleMap { impl Plugin for DebugAssetServerPlugin { fn build(&self, app: &mut bevy_app::App) { + let _ = IoTaskPool::init( + TaskPoolBuilder::default() + .num_threads(2) + .thread_name("Debug Asset Server IO Task Pool".to_string()) + .build(), + ); let mut debug_asset_app = App::new(); debug_asset_app - .insert_resource(IoTaskPool( - TaskPoolBuilder::default() - .num_threads(2) - .thread_name("Debug Asset Server IO Task Pool".to_string()) - .build(), - )) .insert_resource(AssetServerSettings { asset_folder: "crates".to_string(), watch_for_changes: true, diff --git a/crates/bevy_core/src/lib.rs b/crates/bevy_core/src/lib.rs index 44b2be465b9cf..68197ed8c6096 100644 --- a/crates/bevy_core/src/lib.rs +++ b/crates/bevy_core/src/lib.rs @@ -44,7 +44,7 @@ impl Plugin for CorePlugin { .get_resource::() .cloned() .unwrap_or_default() - .create_default_pools(&mut app.world); + .create_default_pools(); app.init_resource::