Skip to content

Commit

Permalink
ecs: only prepare executor on changes. use parallel executor in App
Browse files Browse the repository at this point in the history
  • Loading branch information
cart committed Jul 16, 2020
1 parent 4712e96 commit 362fb92
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 75 deletions.
9 changes: 5 additions & 4 deletions crates/bevy_app/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use super::AppBuilder;
use bevy_ecs::{Resources, Schedule, World};
use bevy_ecs::{Resources, Schedule, World, ParallelExecutor};

#[derive(Default)]
pub struct App {
pub world: World,
pub resources: Resources,
pub runner: Option<Box<dyn Fn(App)>>,
pub schedule: Schedule,
pub executor: ParallelExecutor,
pub startup_schedule: Schedule,
pub startup_executor: ParallelExecutor,
}

impl App {
Expand All @@ -17,13 +19,12 @@ impl App {

pub fn update(&mut self) {
self.schedule.initialize(&mut self.resources);
self.schedule.run(&mut self.world, &mut self.resources);
self.executor.run(&mut self.schedule, &mut self.world, &mut self.resources);
}

pub fn run(mut self) {
self.startup_schedule.initialize(&mut self.resources);
self.startup_schedule
.run(&mut self.world, &mut self.resources);
self.startup_executor.run(&mut self.startup_schedule, &mut self.world, &mut self.resources);
if let Some(run) = self.runner.take() {
run(self)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/bevy_ecs/hecs/src/world.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ impl<A: DynamicBundle> core::iter::FromIterator<A> for World {

/// Determines freshness of information derived from `World::archetypes`
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub struct ArchetypesGeneration(u64);
pub struct ArchetypesGeneration(pub u64);

/// Entity IDs created by `World::spawn_batch`
pub struct SpawnBatchIter<'a, I>
Expand Down
128 changes: 58 additions & 70 deletions crates/bevy_ecs/src/parallel_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,42 @@ use crate::{
};
use crossbeam_channel::{Receiver, Sender};
use fixedbitset::FixedBitSet;
use hecs::World;
use hecs::{ArchetypesGeneration, World};
use rayon::ScopeFifo;
use std::sync::{Arc, Mutex};

#[derive(Default)]
pub struct ParallelExecutor {
stages: Vec<ExecutorStage>,
last_schedule_generation: usize,
}

impl Default for ParallelExecutor {
fn default() -> Self {
Self {
stages: Default::default(),
last_schedule_generation: usize::MAX, // MAX forces prepare to run the first time
}
}
}

impl ParallelExecutor {
pub fn prepare(&mut self, schedule: &mut Schedule, world: &World) {
let mut executor_stages = vec![ExecutorStage::default(); schedule.stage_order.len()];
let schedule_generation = schedule.generation();
let schedule_changed = schedule_generation != self.last_schedule_generation;

if schedule_changed {
self.stages.clear();
self.stages.resize_with(schedule.stage_order.len(), || ExecutorStage::default());
}

for (stage_index, stage_name) in schedule.stage_order.iter().enumerate() {
let executor_stage = &mut executor_stages[stage_index];
let executor_stage = &mut self.stages[stage_index];
if let Some(systems) = schedule.stages.get(stage_name) {
executor_stage.prepare(world, systems, schedule_generation);
executor_stage.prepare(world, systems, schedule_changed);
}
}

self.stages = executor_stages;
self.last_schedule_generation = schedule_generation;
}

pub fn run(&mut self, schedule: &mut Schedule, world: &mut World, resources: &mut Resources) {
Expand All @@ -50,7 +65,7 @@ pub struct ExecutorStage {

sender: Sender<usize>,
receiver: Receiver<usize>,
last_prepare_schedule_generation: usize,
last_archetypes_generation: ArchetypesGeneration,
}

impl Default for ExecutorStage {
Expand All @@ -63,7 +78,7 @@ impl Default for ExecutorStage {
running_systems: Default::default(),
sender,
receiver,
last_prepare_schedule_generation: usize::MAX, // MAX forces prepare to run the first time
last_archetypes_generation: ArchetypesGeneration(u64::MAX), // MAX forces prepare to run the first time
}
}
}
Expand All @@ -84,12 +99,8 @@ impl ExecutorStage {
&mut self,
world: &World,
systems: &Vec<Arc<Mutex<Box<dyn System>>>>,
schedule_generation: usize,
// last_world_generation
// last_world_generation_start_index <- for cases where we do a partial update midway through execution (then need to update everything that came before in the next execution)
schedule_changed: bool,
) {
let schedule_changed = self.last_prepare_schedule_generation != schedule_generation;

// if the schedule has changed, clear executor state / fill it with new defaults
if schedule_changed {
self.system_dependencies.clear();
Expand All @@ -103,18 +114,17 @@ impl ExecutorStage {
self.running_systems.grow(systems.len());
}

// TODO: check archetype generation here
let world_generation_changed = true;
let archetypes_generation = world.archetypes_generation();
let archetypes_generation_changed =
self.last_archetypes_generation != archetypes_generation;

if world_generation_changed {
if schedule_changed || archetypes_generation_changed {
// update each system's archetype access to latest world archetypes
for system in systems.iter() {
let mut system = system.lock().unwrap();
system.update_archetype_access(world);
}
}

if schedule_changed || world_generation_changed {
// calculate dependencies between systems and build execution order
let mut current_archetype_access = ArchetypeAccess::default();
let mut current_resource_access = TypeAccess::default();
Expand Down Expand Up @@ -176,7 +186,7 @@ impl ExecutorStage {
}
}

self.last_prepare_schedule_generation = schedule_generation;
self.last_archetypes_generation = archetypes_generation;
}

fn run_ready_systems<'run>(
Expand Down Expand Up @@ -316,7 +326,9 @@ impl ExecutorStage {
#[cfg(test)]
mod tests {
use super::ParallelExecutor;
use crate::{IntoQuerySystem, IntoThreadLocalSystem, Query, Res, Resources, Schedule, World, ResMut};
use crate::{
IntoQuerySystem, IntoThreadLocalSystem, Query, Res, ResMut, Resources, Schedule, World,
};
use fixedbitset::FixedBitSet;
use std::sync::{Arc, Mutex};

Expand Down Expand Up @@ -346,80 +358,58 @@ mod tests {

fn read_u32(counter: Res<Counter>, _query: Query<&u32>) {
let mut count = counter.count.lock().unwrap();
assert!(
*count < 2,
"should be one of the first two systems to run"
);
assert!(*count < 2, "should be one of the first two systems to run");
*count += 1;
}

fn write_float(counter: Res<Counter>, _query: Query<&f32>) {
let mut count = counter.count.lock().unwrap();
assert!(
*count < 2,
"should be one of the first two systems to run"
);
assert!(*count < 2, "should be one of the first two systems to run");
*count += 1;
}

fn read_u32_write_u64(counter: Res<Counter>, _query: Query<(&u32, &mut u64)>) {
let mut count = counter.count.lock().unwrap();
assert_eq!(
*count, 2,
"should always be the 3rd system to run"
);
assert_eq!(*count, 2, "should always be the 3rd system to run");
*count += 1;
}

fn read_u64(counter: Res<Counter>, _query: Query<&u64>) {
let mut count = counter.count.lock().unwrap();
assert_eq!(
*count, 3,
"should always be the 4th system to run"
);
assert_eq!(*count, 3, "should always be the 4th system to run");
*count += 1;
}

schedule.add_system_to_stage("A", read_u32.system());
schedule.add_system_to_stage("A", write_float.system());
schedule.add_system_to_stage("A", read_u32_write_u64.system());
schedule.add_system_to_stage("A", read_u64.system());

// B systems

fn write_u64(counter: Res<Counter>, _query: Query<&mut u64>) {
let mut count = counter.count.lock().unwrap();
assert_eq!(
*count, 4,
"should always be the 5th system to run"
);
assert_eq!(*count, 4, "should always be the 5th system to run");
*count += 1;
}

fn thread_local_system(_world: &mut World, resources: &mut Resources) {
let counter = resources.get::<Counter>().unwrap();
let mut count = counter.count.lock().unwrap();
assert_eq!(
*count, 5,
"should always be the 6th system to run"
);
assert_eq!(*count, 5, "should always be the 6th system to run");
*count += 1;
}

fn write_f32(counter: Res<Counter>, _query: Query<&mut f32>) {
let mut count = counter.count.lock().unwrap();
assert_eq!(
*count, 6,
"should always be the 7th system to run"
);
assert_eq!(*count, 6, "should always be the 7th system to run");
*count += 1;
}

schedule.add_system_to_stage("B", write_u64.system());
schedule.add_system_to_stage("B", thread_local_system.thread_local_system());
schedule.add_system_to_stage("B", write_f32.system());


// C systems

fn read_f64_res(counter: Res<Counter>, _f64_res: Res<f64>) {
Expand All @@ -440,21 +430,19 @@ mod tests {
*count += 1;
}

fn read_isize_write_f64_res(counter: Res<Counter>, _isize_res: Res<isize>, _f64_res: ResMut<f64>) {
fn read_isize_write_f64_res(
counter: Res<Counter>,
_isize_res: Res<isize>,
_f64_res: ResMut<f64>,
) {
let mut count = counter.count.lock().unwrap();
assert_eq!(
*count, 9,
"should always be the 10th system to run"
);
assert_eq!(*count, 9, "should always be the 10th system to run");
*count += 1;
}

fn write_f64_res(counter: Res<Counter>, _f64_res: ResMut<f64>) {
let mut count = counter.count.lock().unwrap();
assert_eq!(
*count, 10,
"should always be the 11th system to run"
);
assert_eq!(*count, 10, "should always be the 11th system to run");
*count += 1;
}

Expand All @@ -470,7 +458,7 @@ mod tests {
resources: &mut Resources,
) {
executor.prepare(schedule, world);

assert_eq!(
executor.stages[0].system_dependents,
vec![vec![2], vec![], vec![3], vec![]]
Expand All @@ -483,13 +471,13 @@ mod tests {
executor.stages[2].system_dependents,
vec![vec![2], vec![2], vec![3], vec![]]
);

let stage_0_len = executor.stages[0].system_dependencies.len();
let mut read_u32_write_u64_deps = FixedBitSet::with_capacity(stage_0_len);
read_u32_write_u64_deps.insert(0);
let mut read_u64_deps = FixedBitSet::with_capacity(stage_0_len);
read_u64_deps.insert(2);

assert_eq!(
executor.stages[0].system_dependencies,
vec![
Expand All @@ -499,7 +487,7 @@ mod tests {
read_u64_deps,
]
);

let stage_1_len = executor.stages[1].system_dependencies.len();
let mut thread_local_deps = FixedBitSet::with_capacity(stage_1_len);
thread_local_deps.insert(0);
Expand Down Expand Up @@ -528,9 +516,9 @@ mod tests {
write_f64_res_deps
]
);

executor.run(schedule, world, resources);

let counter = resources.get::<Counter>().unwrap();
assert_eq!(
*counter.count.lock().unwrap(),
Expand Down

0 comments on commit 362fb92

Please sign in to comment.