From fabbc478445316fa3dab6b34b65a09bb48e6e9ac Mon Sep 17 00:00:00 2001 From: Mike Hsu Date: Tue, 31 Jan 2023 10:46:20 -0800 Subject: [PATCH] move MainThreadExecutor to schedule_v3 --- .../src/schedule/executor_parallel.rs | 22 +--- .../bevy_ecs/src/schedule_v3/executor/mod.rs | 2 +- .../schedule_v3/executor/multi_threaded.rs | 105 +++++++++++------- crates/bevy_render/src/pipelined_rendering.rs | 3 +- 4 files changed, 73 insertions(+), 59 deletions(-) diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index 0db9627633ba1e..dd966ac3cf0362 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -1,15 +1,12 @@ -use std::sync::Arc; - -use crate as bevy_ecs; use crate::{ archetype::ArchetypeComponentId, query::Access, schedule::{ParallelSystemExecutor, SystemContainer}, - system::Resource, + schedule_v3::MainThreadExecutor, world::World, }; use async_channel::{Receiver, Sender}; -use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor}; +use bevy_tasks::{ComputeTaskPool, Scope, TaskPool}; #[cfg(feature = "trace")] use bevy_utils::tracing::Instrument; use event_listener::Event; @@ -18,16 +15,6 @@ use fixedbitset::FixedBitSet; #[cfg(test)] use scheduling_event::*; -/// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread -#[derive(Resource, Default, Clone)] -pub struct MainThreadExecutor(pub Arc>); - -impl MainThreadExecutor { - pub fn new() -> Self { - MainThreadExecutor(Arc::new(ThreadExecutor::new())) - } -} - struct SystemSchedulingMetadata { /// Used to signal the system's task to start the system. start: Event, @@ -138,7 +125,10 @@ impl ParallelSystemExecutor for ParallelExecutor { } } - let thread_executor = world.get_resource::().map(|e| &*e.0); + let thread_executor = world + .get_resource::() + .map(|e| e.0.clone()); + let thread_executor = thread_executor.as_deref(); ComputeTaskPool::init(TaskPool::default).scope_with_executor( false, diff --git a/crates/bevy_ecs/src/schedule_v3/executor/mod.rs b/crates/bevy_ecs/src/schedule_v3/executor/mod.rs index bfc1eef14d6098..8fc1788cb72b57 100644 --- a/crates/bevy_ecs/src/schedule_v3/executor/mod.rs +++ b/crates/bevy_ecs/src/schedule_v3/executor/mod.rs @@ -2,7 +2,7 @@ mod multi_threaded; mod simple; mod single_threaded; -pub use self::multi_threaded::MultiThreadedExecutor; +pub use self::multi_threaded::{MainThreadExecutor, MultiThreadedExecutor}; pub use self::simple::SimpleExecutor; pub use self::single_threaded::SingleThreadedExecutor; diff --git a/crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs b/crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs index 3debba3ac59b41..107bfe8d0707c1 100644 --- a/crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs +++ b/crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs @@ -1,14 +1,16 @@ -use bevy_tasks::{ComputeTaskPool, Scope, TaskPool}; +use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor}; use bevy_utils::default; use bevy_utils::syncunsafecell::SyncUnsafeCell; #[cfg(feature = "trace")] use bevy_utils::tracing::{info_span, Instrument}; +use std::sync::Arc; use async_channel::{Receiver, Sender}; use fixedbitset::FixedBitSet; use crate::{ archetype::ArchetypeComponentId, + prelude::Resource, query::Access, schedule_v3::{ is_apply_system_buffers, BoxedCondition, ExecutorKind, SystemExecutor, SystemSchedule, @@ -17,6 +19,8 @@ use crate::{ world::World, }; +use crate as bevy_ecs; + /// A funky borrow split of [`SystemSchedule`] required by the [`MultiThreadedExecutor`]. struct SyncUnsafeSchedule<'a> { systems: &'a [SyncUnsafeCell], @@ -145,59 +149,68 @@ impl SystemExecutor for MultiThreadedExecutor { } } + let thread_executor = world + .get_resource::() + .map(|e| e.0.clone()); + let thread_executor = thread_executor.as_deref(); + let world = SyncUnsafeCell::from_mut(world); let SyncUnsafeSchedule { systems, mut conditions, } = SyncUnsafeSchedule::new(schedule); - ComputeTaskPool::init(TaskPool::default).scope(|scope| { - // the executor itself is a `Send` future so that it can run - // alongside systems that claim the local thread - let executor = async { - while self.num_completed_systems < num_systems { - // SAFETY: self.ready_systems does not contain running systems - unsafe { - self.spawn_system_tasks(scope, systems, &mut conditions, world); - } - - if self.num_running_systems > 0 { - // wait for systems to complete - let index = self - .receiver - .recv() - .await - .unwrap_or_else(|error| unreachable!("{}", error)); + ComputeTaskPool::init(TaskPool::default).scope_with_executor( + false, + thread_executor, + |scope| { + // the executor itself is a `Send` future so that it can run + // alongside systems that claim the local thread + let executor = async { + while self.num_completed_systems < num_systems { + // SAFETY: self.ready_systems does not contain running systems + unsafe { + self.spawn_system_tasks(scope, systems, &mut conditions, world); + } - self.finish_system_and_signal_dependents(index); + if self.num_running_systems > 0 { + // wait for systems to complete + let index = self + .receiver + .recv() + .await + .unwrap_or_else(|error| unreachable!("{}", error)); - while let Ok(index) = self.receiver.try_recv() { self.finish_system_and_signal_dependents(index); - } - self.rebuild_active_access(); + while let Ok(index) = self.receiver.try_recv() { + self.finish_system_and_signal_dependents(index); + } + + self.rebuild_active_access(); + } } - } - // SAFETY: all systems have completed - let world = unsafe { &mut *world.get() }; - apply_system_buffers(&mut self.unapplied_systems, systems, world); - - debug_assert!(self.ready_systems.is_clear()); - debug_assert!(self.running_systems.is_clear()); - debug_assert!(self.unapplied_systems.is_clear()); - self.active_access.clear(); - self.evaluated_sets.clear(); - self.skipped_systems.clear(); - self.completed_systems.clear(); - }; + // SAFETY: all systems have completed + let world = unsafe { &mut *world.get() }; + apply_system_buffers(&mut self.unapplied_systems, systems, world); - #[cfg(feature = "trace")] - let executor_span = info_span!("schedule_task"); - #[cfg(feature = "trace")] - let executor = executor.instrument(executor_span); - scope.spawn(executor); - }); + debug_assert!(self.ready_systems.is_clear()); + debug_assert!(self.running_systems.is_clear()); + debug_assert!(self.unapplied_systems.is_clear()); + self.active_access.clear(); + self.evaluated_sets.clear(); + self.skipped_systems.clear(); + self.completed_systems.clear(); + }; + + #[cfg(feature = "trace")] + let executor_span = info_span!("schedule_task"); + #[cfg(feature = "trace")] + let executor = executor.instrument(executor_span); + scope.spawn(executor); + }, + ); } } @@ -573,3 +586,13 @@ fn evaluate_and_fold_conditions(conditions: &mut [BoxedCondition], world: &World }) .fold(true, |acc, res| acc && res) } + +/// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread +#[derive(Resource, Default, Clone)] +pub struct MainThreadExecutor(pub Arc>); + +impl MainThreadExecutor { + pub fn new() -> Self { + MainThreadExecutor(Arc::new(ThreadExecutor::new())) + } +} diff --git a/crates/bevy_render/src/pipelined_rendering.rs b/crates/bevy_render/src/pipelined_rendering.rs index 43bbbf84090609..516911aa038aee 100644 --- a/crates/bevy_render/src/pipelined_rendering.rs +++ b/crates/bevy_render/src/pipelined_rendering.rs @@ -2,7 +2,8 @@ use async_channel::{Receiver, Sender}; use bevy_app::{App, AppLabel, Plugin, SubApp}; use bevy_ecs::{ - schedule::{MainThreadExecutor, StageLabel, SystemStage}, + schedule::{StageLabel, SystemStage}, + schedule_v3::MainThreadExecutor, system::Resource, world::{Mut, World}, };