From d6915ef87d0eeda1016130bf0cf4dabfbb7672d5 Mon Sep 17 00:00:00 2001 From: James Liu Date: Mon, 19 Feb 2024 08:31:15 -0800 Subject: [PATCH] refactor: Extract parallel queue abstraction (#7348) # Objective There's a repeating pattern of `ThreadLocal>>` which is very useful for low overhead, low contention multithreaded queues that have cropped up in a few places in the engine. This pattern is surprisingly useful when building deferred mutation across multiple threads, as noted by it's use in `ParallelCommands`. However, `ThreadLocal>>` is not only a mouthful, it's also hard to ensure the thread-local queue is replaced after it's been temporarily removed from the `Cell`. ## Solution Wrap the pattern into `bevy_utils::Parallel` which codifies the entire pattern and ensures the user follows the contract. Instead of fetching indivdual cells, removing the value, mutating it, and replacing it, `Parallel::get` returns a `ParRef<'a, T>` which contains the temporarily removed value and a reference back to the cell, and will write the mutated value back to the cell upon being dropped. I would like to use this to simplify the remaining part of #4899 that has not been adopted/merged. --- ## Changelog TODO --------- Co-authored-by: Joseph <21144246+JoJoJet@users.noreply.github.com> --- crates/bevy_ecs/Cargo.toml | 1 - .../src/system/commands/parallel_scope.rs | 25 +++---- crates/bevy_render/src/view/visibility/mod.rs | 17 ++--- crates/bevy_utils/Cargo.toml | 1 + crates/bevy_utils/src/lib.rs | 2 + crates/bevy_utils/src/parallel_queue.rs | 72 +++++++++++++++++++ 6 files changed, 90 insertions(+), 28 deletions(-) create mode 100644 crates/bevy_utils/src/parallel_queue.rs diff --git a/crates/bevy_ecs/Cargo.toml b/crates/bevy_ecs/Cargo.toml index dcb744c7450a65..77baf32dc33966 100644 --- a/crates/bevy_ecs/Cargo.toml +++ b/crates/bevy_ecs/Cargo.toml @@ -23,7 +23,6 @@ bevy_utils = { path = "../bevy_utils", version = "0.13.0" } bevy_ecs_macros = { path = "macros", version = "0.13.0" } async-channel = "2.1.0" -thread_local = "1.1.4" fixedbitset = "0.4.2" rustc-hash = "1.1" downcast-rs = "1.2" diff --git a/crates/bevy_ecs/src/system/commands/parallel_scope.rs b/crates/bevy_ecs/src/system/commands/parallel_scope.rs index 6296f0293393c7..865b6eb2b882f7 100644 --- a/crates/bevy_ecs/src/system/commands/parallel_scope.rs +++ b/crates/bevy_ecs/src/system/commands/parallel_scope.rs @@ -1,6 +1,4 @@ -use std::cell::Cell; - -use thread_local::ThreadLocal; +use bevy_utils::Parallel; use crate::{ self as bevy_ecs, @@ -13,7 +11,7 @@ use super::{CommandQueue, Commands}; #[derive(Default)] struct ParallelCommandQueue { - thread_local_storage: ThreadLocal>, + thread_queues: Parallel, } /// An alternative to [`Commands`] that can be used in parallel contexts, such as those in [`Query::par_iter`](crate::system::Query::par_iter) @@ -53,8 +51,8 @@ impl SystemBuffer for ParallelCommandQueue { fn apply(&mut self, _system_meta: &SystemMeta, world: &mut World) { #[cfg(feature = "trace")] let _system_span = _system_meta.commands_span.enter(); - for cq in &mut self.thread_local_storage { - cq.get_mut().apply(world); + for cq in self.thread_queues.iter_mut() { + cq.apply(world); } } } @@ -64,16 +62,9 @@ impl<'w, 's> ParallelCommands<'w, 's> { /// /// For an example, see the type-level documentation for [`ParallelCommands`]. pub fn command_scope(&self, f: impl FnOnce(Commands) -> R) -> R { - let store = &self.state.thread_local_storage; - let command_queue_cell = store.get_or_default(); - let mut command_queue = command_queue_cell.take(); - - let r = f(Commands::new_from_entities( - &mut command_queue, - self.entities, - )); - - command_queue_cell.set(command_queue); - r + self.state.thread_queues.scope(|queue| { + let commands = Commands::new_from_entities(queue, self.entities); + f(commands) + }) } } diff --git a/crates/bevy_render/src/view/visibility/mod.rs b/crates/bevy_render/src/view/visibility/mod.rs index ab43a0127fcb5b..ca2393f816d241 100644 --- a/crates/bevy_render/src/view/visibility/mod.rs +++ b/crates/bevy_render/src/view/visibility/mod.rs @@ -9,8 +9,7 @@ use bevy_ecs::prelude::*; use bevy_hierarchy::{Children, Parent}; use bevy_reflect::{std_traits::ReflectDefault, Reflect}; use bevy_transform::{components::GlobalTransform, TransformSystem}; -use std::cell::Cell; -use thread_local::ThreadLocal; +use bevy_utils::Parallel; use crate::deterministic::DeterministicRenderingConfig; use crate::{ @@ -372,7 +371,7 @@ fn reset_view_visibility(mut query: Query<&mut ViewVisibility>) { /// [`ViewVisibility`] of all entities, and for each view also compute the [`VisibleEntities`] /// for that view. pub fn check_visibility( - mut thread_queues: Local>>>, + mut thread_queues: Local>>, mut view_query: Query<( &mut VisibleEntities, &Frustum, @@ -440,15 +439,13 @@ pub fn check_visibility( } view_visibility.set(); - let cell = thread_queues.get_or_default(); - let mut queue = cell.take(); - queue.push(entity); - cell.set(queue); + thread_queues.scope(|queue| { + queue.push(entity); + }); }); - for cell in &mut thread_queues { - visible_entities.entities.append(cell.get_mut()); - } + visible_entities.entities.clear(); + thread_queues.drain_into(&mut visible_entities.entities); if deterministic_rendering_config.stable_sort_z_fighting { // We can use the faster unstable sort here because // the values (`Entity`) are guaranteed to be unique. diff --git a/crates/bevy_utils/Cargo.toml b/crates/bevy_utils/Cargo.toml index fb6505fe7ec125..9501044a244d6f 100644 --- a/crates/bevy_utils/Cargo.toml +++ b/crates/bevy_utils/Cargo.toml @@ -20,6 +20,7 @@ hashbrown = { version = "0.14", features = ["serde"] } bevy_utils_proc_macros = { version = "0.13.0", path = "macros" } petgraph = "0.6" thiserror = "1.0" +thread_local = "1.0" nonmax = "0.5" smallvec = { version = "1.11", features = ["serde", "union", "const_generics"] } diff --git a/crates/bevy_utils/src/lib.rs b/crates/bevy_utils/src/lib.rs index 00b2885427d8a5..ffd4f330efd216 100644 --- a/crates/bevy_utils/src/lib.rs +++ b/crates/bevy_utils/src/lib.rs @@ -22,6 +22,7 @@ mod default; mod float_ord; pub mod intern; mod once; +mod parallel_queue; pub use crate::uuid::Uuid; pub use ahash::{AHasher, RandomState}; @@ -30,6 +31,7 @@ pub use cow_arc::*; pub use default::default; pub use float_ord::*; pub use hashbrown; +pub use parallel_queue::*; pub use petgraph; pub use smallvec; pub use thiserror; diff --git a/crates/bevy_utils/src/parallel_queue.rs b/crates/bevy_utils/src/parallel_queue.rs new file mode 100644 index 00000000000000..91fdb2a9eef543 --- /dev/null +++ b/crates/bevy_utils/src/parallel_queue.rs @@ -0,0 +1,72 @@ +use core::cell::Cell; +use thread_local::ThreadLocal; + +/// A cohesive set of thread-local values of a given type. +/// +/// Mutable references can be fetched if `T: Default` via [`Parallel::scope`]. +#[derive(Default)] +pub struct Parallel { + locals: ThreadLocal>, +} + +impl Parallel { + /// Gets a mutable iterator over all of the per-thread queues. + pub fn iter_mut(&mut self) -> impl Iterator { + self.locals.iter_mut().map(|cell| cell.get_mut()) + } + + /// Clears all of the stored thread local values. + pub fn clear(&mut self) { + self.locals.clear(); + } +} + +impl Parallel { + /// Retrieves the thread-local value for the current thread and runs `f` on it. + /// + /// If there is no thread-local value, it will be initialized to it's default. + pub fn scope(&self, f: impl FnOnce(&mut T) -> R) -> R { + let cell = self.locals.get_or_default(); + let mut value = cell.take(); + let ret = f(&mut value); + cell.set(value); + ret + } +} + +impl Parallel +where + I: IntoIterator + Default + Send + 'static, +{ + /// Drains all enqueued items from all threads and returns an iterator over them. + /// + /// Unlike [`Vec::drain`], this will piecemeal remove chunks of the data stored. + /// If iteration is terminated part way, the rest of the enqueued items in the same + /// chunk will be dropped, and the rest of the undrained elements will remain. + /// + /// The ordering is not guaranteed. + pub fn drain(&mut self) -> impl Iterator + '_ + where + B: FromIterator, + { + self.locals.iter_mut().flat_map(|item| item.take()) + } +} + +impl Parallel> { + /// Collect all enqueued items from all threads and appends them to the end of a + /// single Vec. + /// + /// The ordering is not guarenteed. + pub fn drain_into(&mut self, out: &mut Vec) { + let size = self + .locals + .iter_mut() + .map(|queue| queue.get_mut().len()) + .sum(); + out.reserve(size); + for queue in self.locals.iter_mut() { + out.append(queue.get_mut()); + } + } +}