From c330629b4cabb1384386bc3d8b0daa28a9d1c057 Mon Sep 17 00:00:00 2001 From: james7132 Date: Mon, 23 Jan 2023 17:25:45 -0800 Subject: [PATCH 01/10] Parallel queues --- crates/bevy_ecs/Cargo.toml | 1 - .../src/system/commands/parallel_scope.rs | 22 ++--- crates/bevy_render/src/view/visibility/mod.rs | 19 +--- crates/bevy_utils/Cargo.toml | 1 + crates/bevy_utils/src/lib.rs | 2 + crates/bevy_utils/src/parallel_queue.rs | 92 +++++++++++++++++++ 6 files changed, 107 insertions(+), 30 deletions(-) create mode 100644 crates/bevy_utils/src/parallel_queue.rs diff --git a/crates/bevy_ecs/Cargo.toml b/crates/bevy_ecs/Cargo.toml index cff59621cf887..63eb5286ff723 100644 --- a/crates/bevy_ecs/Cargo.toml +++ b/crates/bevy_ecs/Cargo.toml @@ -22,7 +22,6 @@ bevy_ecs_macros = { path = "macros", version = "0.9.0" } async-channel = "1.4" event-listener = "2.5" -thread_local = "1.1.4" fixedbitset = "0.4.2" fxhash = "0.2" downcast-rs = "1.2" diff --git a/crates/bevy_ecs/src/system/commands/parallel_scope.rs b/crates/bevy_ecs/src/system/commands/parallel_scope.rs index c249ba3e967c0..4524e48803eb8 100644 --- a/crates/bevy_ecs/src/system/commands/parallel_scope.rs +++ b/crates/bevy_ecs/src/system/commands/parallel_scope.rs @@ -1,6 +1,4 @@ -use std::cell::Cell; - -use thread_local::ThreadLocal; +use bevy_utils::Parallel; use crate::{ entity::Entities, @@ -14,7 +12,7 @@ use super::{CommandQueue, Commands}; #[doc(hidden)] #[derive(Default)] pub struct ParallelCommandsState { - thread_local_storage: ThreadLocal>, + thread_queues: Parallel, } /// An alternative to [`Commands`] that can be used in parallel contexts, such as those in [`Query::par_iter`](crate::system::Query::par_iter) @@ -62,8 +60,8 @@ unsafe impl SystemParam for ParallelCommands<'_, '_> { let _system_span = bevy_utils::tracing::info_span!("system_commands", name = _system_meta.name()) .entered(); - for cq in &mut state.thread_local_storage { - cq.get_mut().apply(world); + for cq in state.thread_queues.iter_mut() { + cq.apply(world); } } @@ -82,16 +80,10 @@ unsafe impl SystemParam for ParallelCommands<'_, '_> { impl<'w, 's> ParallelCommands<'w, 's> { pub fn command_scope(&self, f: impl FnOnce(Commands) -> R) -> R { - let store = &self.state.thread_local_storage; - let command_queue_cell = store.get_or_default(); - let mut command_queue = command_queue_cell.take(); - - let r = f(Commands::new_from_entities( + let mut command_queue = self.state.thread_queues.get(); + f(Commands::new_from_entities( &mut command_queue, self.entities, - )); - - command_queue_cell.set(command_queue); - r + )) } } diff --git a/crates/bevy_render/src/view/visibility/mod.rs b/crates/bevy_render/src/view/visibility/mod.rs index 30f28b8638c4c..11ce8701f9fb6 100644 --- a/crates/bevy_render/src/view/visibility/mod.rs +++ b/crates/bevy_render/src/view/visibility/mod.rs @@ -10,8 +10,7 @@ use bevy_reflect::Reflect; use bevy_reflect::{std_traits::ReflectDefault, FromReflect}; use bevy_transform::components::GlobalTransform; use bevy_transform::TransformSystem; -use std::cell::Cell; -use thread_local::ThreadLocal; +use bevy_utils::Parallel; use crate::{ camera::{ @@ -356,7 +355,7 @@ fn propagate_recursive( /// [`ComputedVisibility`] of all entities, and for each view also compute the [`VisibleEntities`] /// for that view. pub fn check_visibility( - mut thread_queues: Local>>>, + mut thread_queues: Local>>, mut view_query: Query<(&mut VisibleEntities, &Frustum, Option<&RenderLayers>), With>, mut visible_aabb_query: Query<( Entity, @@ -413,10 +412,7 @@ pub fn check_visibility( } computed_visibility.set_visible_in_view(); - let cell = thread_queues.get_or_default(); - let mut queue = cell.take(); - queue.push(entity); - cell.set(queue); + thread_queues.get().push(entity); }, ); @@ -434,16 +430,11 @@ pub fn check_visibility( } computed_visibility.set_visible_in_view(); - let cell = thread_queues.get_or_default(); - let mut queue = cell.take(); - queue.push(entity); - cell.set(queue); + thread_queues.get().push(entity); }, ); - for cell in thread_queues.iter_mut() { - visible_entities.entities.append(cell.get_mut()); - } + thread_queues.drain_into(&mut visible_entities.entities); } } diff --git a/crates/bevy_utils/Cargo.toml b/crates/bevy_utils/Cargo.toml index 5a019dbc38455..6fc49bfb6f03e 100644 --- a/crates/bevy_utils/Cargo.toml +++ b/crates/bevy_utils/Cargo.toml @@ -16,6 +16,7 @@ uuid = { version = "1.1", features = ["v4", "serde"] } hashbrown = { version = "0.12", features = ["serde"] } petgraph = "0.6" thiserror = "1.0" +thread_local = "1.0" [target.'cfg(target_arch = "wasm32")'.dependencies] getrandom = {version = "0.2.0", features = ["js"]} diff --git a/crates/bevy_utils/src/lib.rs b/crates/bevy_utils/src/lib.rs index a6a4aebcb254c..a31e482000ea4 100644 --- a/crates/bevy_utils/src/lib.rs +++ b/crates/bevy_utils/src/lib.rs @@ -19,12 +19,14 @@ pub mod syncunsafecell; mod default; mod float_ord; +mod parallel_queue; pub use ahash::AHasher; pub use default::default; pub use float_ord::*; pub use hashbrown; pub use instant::{Duration, Instant}; +pub use parallel_queue::*; pub use petgraph; pub use thiserror; pub use tracing; diff --git a/crates/bevy_utils/src/parallel_queue.rs b/crates/bevy_utils/src/parallel_queue.rs new file mode 100644 index 0000000000000..fdb129688b8b8 --- /dev/null +++ b/crates/bevy_utils/src/parallel_queue.rs @@ -0,0 +1,92 @@ +use core::{ + cell::Cell, + ops::{Deref, DerefMut, Drop}, +}; +use thread_local::ThreadLocal; + +/// A cohesive set of thread-local values of a given type. +/// +/// Mutable references can be fetched if `T: Default` via [`Parallel::get`]. +#[derive(Default)] +pub struct Parallel { + locals: ThreadLocal>, +} + +impl Parallel { + /// Gets a mutable iterator over all of the per-thread queues. + pub fn iter_mut(&mut self) -> impl Iterator { + self.locals.iter_mut().map(|cell| cell.get_mut()) + } + + /// Clears all of the stored thread local values. + pub fn clear(&mut self) { + self.locals.clear() + } +} + +impl Parallel { + /// Takes the thread-local value and replaces it with the default. + #[inline] + pub fn get(&self) -> ParRef<'_, T> { + let cell = self.locals.get_or_default(); + let value = cell.take(); + ParRef { cell, value } + } +} + +impl Parallel +where + I: IntoIterator + Default + Send + 'static, +{ + /// Collect all enqueued items from all threads and them into one + pub fn drain(&mut self) -> B + where + B: FromIterator, + { + self.locals + .iter_mut() + .flat_map(|item| item.take().into_iter()) + .collect() + } +} + +impl Parallel> { + /// Collect all enqueued items from all threads and them into one + pub fn drain_into(&mut self, out: &mut Vec) { + out.clear(); + let size = self + .locals + .iter_mut() + .map(|queue| queue.get_mut().len()) + .sum(); + out.reserve(size); + for queue in self.locals.iter_mut() { + out.append(queue.get_mut()); + } + } +} + +/// A retrieved thread-local reference to a value in [`Parallel`]. +pub struct ParRef<'a, T: Default> { + cell: &'a Cell, + value: T, +} + +impl<'a, T: Default> Deref for ParRef<'a, T> { + type Target = T; + fn deref(&self) -> &T { + &self.value + } +} + +impl<'a, T: Default> DerefMut for ParRef<'a, T> { + fn deref_mut(&mut self) -> &mut T { + &mut self.value + } +} + +impl<'a, T: Default> Drop for ParRef<'a, T> { + fn drop(&mut self) { + self.cell.set(core::mem::take(&mut self.value)); + } +} From a4e7700afd92468c74f873ee278b64733d1212f9 Mon Sep 17 00:00:00 2001 From: james7132 Date: Mon, 23 Jan 2023 17:58:31 -0800 Subject: [PATCH 02/10] Fix CI --- crates/bevy_utils/src/parallel_queue.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_utils/src/parallel_queue.rs b/crates/bevy_utils/src/parallel_queue.rs index fdb129688b8b8..3038392f6d409 100644 --- a/crates/bevy_utils/src/parallel_queue.rs +++ b/crates/bevy_utils/src/parallel_queue.rs @@ -20,7 +20,7 @@ impl Parallel { /// Clears all of the stored thread local values. pub fn clear(&mut self) { - self.locals.clear() + self.locals.clear(); } } From ac3e1d3f837197d91a5f9c243e5d6c2174fe959d Mon Sep 17 00:00:00 2001 From: james7132 Date: Mon, 10 Apr 2023 17:23:59 -0700 Subject: [PATCH 03/10] Remove ParRef --- .../src/system/commands/parallel_scope.rs | 8 ++-- crates/bevy_render/src/view/visibility/mod.rs | 8 +++- crates/bevy_utils/src/parallel_queue.rs | 45 +++++-------------- 3 files changed, 19 insertions(+), 42 deletions(-) diff --git a/crates/bevy_ecs/src/system/commands/parallel_scope.rs b/crates/bevy_ecs/src/system/commands/parallel_scope.rs index b34b091e1776e..b9d66330b05dc 100644 --- a/crates/bevy_ecs/src/system/commands/parallel_scope.rs +++ b/crates/bevy_ecs/src/system/commands/parallel_scope.rs @@ -61,10 +61,8 @@ impl SystemBuffer for ParallelCommandQueue { impl<'w, 's> ParallelCommands<'w, 's> { pub fn command_scope(&self, f: impl FnOnce(Commands) -> R) -> R { - let mut command_queue = self.state.thread_queues.get(); - f(Commands::new_from_entities( - &mut command_queue, - self.entities, - )) + self.state + .thread_queues + .scope(|command_queue| f(Commands::new_from_entities(command_queue, self.entities))) } } diff --git a/crates/bevy_render/src/view/visibility/mod.rs b/crates/bevy_render/src/view/visibility/mod.rs index e32d73b957b11..986ad8bd62b0b 100644 --- a/crates/bevy_render/src/view/visibility/mod.rs +++ b/crates/bevy_render/src/view/visibility/mod.rs @@ -407,7 +407,9 @@ pub fn check_visibility( } computed_visibility.set_visible_in_view(); - thread_queues.get().push(entity); + thread_queues.scope(|queue| { + queue.push(entity); + }) }, ); @@ -425,7 +427,9 @@ pub fn check_visibility( } computed_visibility.set_visible_in_view(); - thread_queues.get().push(entity); + thread_queues.scope(|queue| { + queue.push(entity); + }); }, ); diff --git a/crates/bevy_utils/src/parallel_queue.rs b/crates/bevy_utils/src/parallel_queue.rs index 3038392f6d409..d2c9722d87438 100644 --- a/crates/bevy_utils/src/parallel_queue.rs +++ b/crates/bevy_utils/src/parallel_queue.rs @@ -1,12 +1,9 @@ -use core::{ - cell::Cell, - ops::{Deref, DerefMut, Drop}, -}; +use core::cell::Cell; use thread_local::ThreadLocal; /// A cohesive set of thread-local values of a given type. /// -/// Mutable references can be fetched if `T: Default` via [`Parallel::get`]. +/// Mutable references can be fetched if `T: Default` via [`Parallel::scope`]. #[derive(Default)] pub struct Parallel { locals: ThreadLocal>, @@ -25,12 +22,15 @@ impl Parallel { } impl Parallel { - /// Takes the thread-local value and replaces it with the default. - #[inline] - pub fn get(&self) -> ParRef<'_, T> { + /// Retrieves the thread-local value for the current thread and runs `f` on it. + /// + /// If there is no thread-local value, it will be initialized to it's default. + pub fn scope(&self, f: impl FnOnce(&mut T) -> R) -> R { let cell = self.locals.get_or_default(); - let value = cell.take(); - ParRef { cell, value } + let mut value = cell.take(); + let ret = f(&mut value); + cell.set(value); + ret } } @@ -65,28 +65,3 @@ impl Parallel> { } } } - -/// A retrieved thread-local reference to a value in [`Parallel`]. -pub struct ParRef<'a, T: Default> { - cell: &'a Cell, - value: T, -} - -impl<'a, T: Default> Deref for ParRef<'a, T> { - type Target = T; - fn deref(&self) -> &T { - &self.value - } -} - -impl<'a, T: Default> DerefMut for ParRef<'a, T> { - fn deref_mut(&mut self) -> &mut T { - &mut self.value - } -} - -impl<'a, T: Default> Drop for ParRef<'a, T> { - fn drop(&mut self) { - self.cell.set(core::mem::take(&mut self.value)); - } -} From 8a4ad8236f51403a7089552401e12d0ddc552fad Mon Sep 17 00:00:00 2001 From: james7132 Date: Thu, 28 Sep 2023 23:30:48 -0700 Subject: [PATCH 04/10] Address review comments --- crates/bevy_render/src/view/visibility/mod.rs | 1 + crates/bevy_utils/src/parallel_queue.rs | 11 +++++++---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/crates/bevy_render/src/view/visibility/mod.rs b/crates/bevy_render/src/view/visibility/mod.rs index 444a96baeec6b..a0263bfabc2e8 100644 --- a/crates/bevy_render/src/view/visibility/mod.rs +++ b/crates/bevy_render/src/view/visibility/mod.rs @@ -472,6 +472,7 @@ pub fn check_visibility( }); }); + visible_entities.entities.clear(); thread_queues.drain_into(&mut visible_entities.entities); } } diff --git a/crates/bevy_utils/src/parallel_queue.rs b/crates/bevy_utils/src/parallel_queue.rs index d2c9722d87438..cb847a52f150e 100644 --- a/crates/bevy_utils/src/parallel_queue.rs +++ b/crates/bevy_utils/src/parallel_queue.rs @@ -39,21 +39,24 @@ where I: IntoIterator + Default + Send + 'static, { /// Collect all enqueued items from all threads and them into one - pub fn drain(&mut self) -> B + /// + /// The ordering is not guarenteed. + pub fn drain(&mut self) -> impl Iterator + '_ where B: FromIterator, { self.locals .iter_mut() .flat_map(|item| item.take().into_iter()) - .collect() } } impl Parallel> { - /// Collect all enqueued items from all threads and them into one + /// Collect all enqueued items from all threads and appends them to the end of a + /// single Vec. + /// + /// The ordering is not guarenteed. pub fn drain_into(&mut self, out: &mut Vec) { - out.clear(); let size = self .locals .iter_mut() From 94bf9ed1c31f2a1e081d5035abf3df9e19a7749e Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 29 Sep 2023 00:55:47 -0700 Subject: [PATCH 05/10] Fix CI --- crates/bevy_utils/src/parallel_queue.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/bevy_utils/src/parallel_queue.rs b/crates/bevy_utils/src/parallel_queue.rs index cb847a52f150e..98952989be919 100644 --- a/crates/bevy_utils/src/parallel_queue.rs +++ b/crates/bevy_utils/src/parallel_queue.rs @@ -39,7 +39,7 @@ where I: IntoIterator + Default + Send + 'static, { /// Collect all enqueued items from all threads and them into one - /// + /// /// The ordering is not guarenteed. pub fn drain(&mut self) -> impl Iterator + '_ where @@ -52,9 +52,9 @@ where } impl Parallel> { - /// Collect all enqueued items from all threads and appends them to the end of a + /// Collect all enqueued items from all threads and appends them to the end of a /// single Vec. - /// + /// /// The ordering is not guarenteed. pub fn drain_into(&mut self, out: &mut Vec) { let size = self From 8bda9e18ac7569832c50a41f6d2b52b93c0a0d20 Mon Sep 17 00:00:00 2001 From: James Liu Date: Mon, 12 Feb 2024 13:53:19 -0800 Subject: [PATCH 06/10] Apply suggestions from code review Co-authored-by: Joseph <21144246+JoJoJet@users.noreply.github.com> --- crates/bevy_ecs/src/system/commands/parallel_scope.rs | 5 ++++- crates/bevy_utils/src/parallel_queue.rs | 4 ++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/bevy_ecs/src/system/commands/parallel_scope.rs b/crates/bevy_ecs/src/system/commands/parallel_scope.rs index f2ddea0f5102e..2739ae6a57e50 100644 --- a/crates/bevy_ecs/src/system/commands/parallel_scope.rs +++ b/crates/bevy_ecs/src/system/commands/parallel_scope.rs @@ -64,6 +64,9 @@ impl<'w, 's> ParallelCommands<'w, 's> { pub fn command_scope(&self, f: impl FnOnce(Commands) -> R) -> R { self.state .thread_queues - .scope(|command_queue| f(Commands::new_from_entities(command_queue, self.entities))) + .scope(|queue| { + let commands = Commands::new_from_entities(queue, self.entities); + f(commands) + }) } } diff --git a/crates/bevy_utils/src/parallel_queue.rs b/crates/bevy_utils/src/parallel_queue.rs index 98952989be919..7c9c6f3d514cc 100644 --- a/crates/bevy_utils/src/parallel_queue.rs +++ b/crates/bevy_utils/src/parallel_queue.rs @@ -38,7 +38,7 @@ impl Parallel where I: IntoIterator + Default + Send + 'static, { - /// Collect all enqueued items from all threads and them into one + /// Drains all enqueued items from all threads and returns an iterator over them. /// /// The ordering is not guarenteed. pub fn drain(&mut self) -> impl Iterator + '_ @@ -47,7 +47,7 @@ where { self.locals .iter_mut() - .flat_map(|item| item.take().into_iter()) + .flat_map(|item| item.take()) } } From 5fda9822c1f08101fba1014ef10c6e5533dceef4 Mon Sep 17 00:00:00 2001 From: james7132 Date: Mon, 12 Feb 2024 14:26:46 -0800 Subject: [PATCH 07/10] Formatting --- crates/bevy_ecs/src/system/commands/parallel_scope.rs | 10 ++++------ crates/bevy_utils/src/lib.rs | 2 +- crates/bevy_utils/src/parallel_queue.rs | 4 +--- 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/crates/bevy_ecs/src/system/commands/parallel_scope.rs b/crates/bevy_ecs/src/system/commands/parallel_scope.rs index 2739ae6a57e50..865b6eb2b882f 100644 --- a/crates/bevy_ecs/src/system/commands/parallel_scope.rs +++ b/crates/bevy_ecs/src/system/commands/parallel_scope.rs @@ -62,11 +62,9 @@ impl<'w, 's> ParallelCommands<'w, 's> { /// /// For an example, see the type-level documentation for [`ParallelCommands`]. pub fn command_scope(&self, f: impl FnOnce(Commands) -> R) -> R { - self.state - .thread_queues - .scope(|queue| { - let commands = Commands::new_from_entities(queue, self.entities); - f(commands) - }) + self.state.thread_queues.scope(|queue| { + let commands = Commands::new_from_entities(queue, self.entities); + f(commands) + }) } } diff --git a/crates/bevy_utils/src/lib.rs b/crates/bevy_utils/src/lib.rs index 8c46ffa18f918..27d253fe742e4 100644 --- a/crates/bevy_utils/src/lib.rs +++ b/crates/bevy_utils/src/lib.rs @@ -20,9 +20,9 @@ pub mod uuid; mod cow_arc; mod default; mod float_ord; -mod parallel_queue; pub mod intern; mod once; +mod parallel_queue; pub use crate::uuid::Uuid; pub use ahash::{AHasher, RandomState}; diff --git a/crates/bevy_utils/src/parallel_queue.rs b/crates/bevy_utils/src/parallel_queue.rs index 7c9c6f3d514cc..2f6038a9d3b6f 100644 --- a/crates/bevy_utils/src/parallel_queue.rs +++ b/crates/bevy_utils/src/parallel_queue.rs @@ -45,9 +45,7 @@ where where B: FromIterator, { - self.locals - .iter_mut() - .flat_map(|item| item.take()) + self.locals.iter_mut().flat_map(|item| item.take()) } } From d1954d6f0bf49a3db8d8a7fd3d4c96d74e58e4b7 Mon Sep 17 00:00:00 2001 From: james7132 Date: Mon, 12 Feb 2024 14:39:09 -0800 Subject: [PATCH 08/10] Document the caveats about drain --- crates/bevy_utils/src/parallel_queue.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/bevy_utils/src/parallel_queue.rs b/crates/bevy_utils/src/parallel_queue.rs index 2f6038a9d3b6f..d2eded7a2a561 100644 --- a/crates/bevy_utils/src/parallel_queue.rs +++ b/crates/bevy_utils/src/parallel_queue.rs @@ -39,6 +39,10 @@ where I: IntoIterator + Default + Send + 'static, { /// Drains all enqueued items from all threads and returns an iterator over them. + /// + /// Unlike [`Vec::drain`], this will piecemeal remove chunks of the data stored. + /// If iteration is terminated part way, the rest of the enqueued items in the same + /// chunk will be dropped, and the rest of the undrained elements will remain. /// /// The ordering is not guarenteed. pub fn drain(&mut self) -> impl Iterator + '_ From 0e9c51a38504a7bc7c552ff478587d69813b3af7 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 18 Feb 2024 01:08:51 -0800 Subject: [PATCH 09/10] Formatting --- crates/bevy_utils/src/parallel_queue.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_utils/src/parallel_queue.rs b/crates/bevy_utils/src/parallel_queue.rs index d2eded7a2a561..500493747c163 100644 --- a/crates/bevy_utils/src/parallel_queue.rs +++ b/crates/bevy_utils/src/parallel_queue.rs @@ -39,7 +39,7 @@ where I: IntoIterator + Default + Send + 'static, { /// Drains all enqueued items from all threads and returns an iterator over them. - /// + /// /// Unlike [`Vec::drain`], this will piecemeal remove chunks of the data stored. /// If iteration is terminated part way, the rest of the enqueued items in the same /// chunk will be dropped, and the rest of the undrained elements will remain. From 41ba7e21c3619386a75986e8a79c0a36155c3688 Mon Sep 17 00:00:00 2001 From: James Liu Date: Mon, 19 Feb 2024 00:43:51 -0800 Subject: [PATCH 10/10] typo. Co-authored-by: Joseph <21144246+JoJoJet@users.noreply.github.com> --- crates/bevy_utils/src/parallel_queue.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_utils/src/parallel_queue.rs b/crates/bevy_utils/src/parallel_queue.rs index 500493747c163..91fdb2a9eef54 100644 --- a/crates/bevy_utils/src/parallel_queue.rs +++ b/crates/bevy_utils/src/parallel_queue.rs @@ -44,7 +44,7 @@ where /// If iteration is terminated part way, the rest of the enqueued items in the same /// chunk will be dropped, and the rest of the undrained elements will remain. /// - /// The ordering is not guarenteed. + /// The ordering is not guaranteed. pub fn drain(&mut self) -> impl Iterator + '_ where B: FromIterator,