diff --git a/crates/bevy_ecs/src/query/fetch.rs b/crates/bevy_ecs/src/query/fetch.rs index 2764f897310f2..aa5dd93e44820 100644 --- a/crates/bevy_ecs/src/query/fetch.rs +++ b/crates/bevy_ecs/src/query/fetch.rs @@ -46,7 +46,7 @@ pub trait WorldQuery { } pub trait Fetch<'w>: Sized { - type Item; + type Item: Send; type State: FetchState; /// Creates a new instance of this fetch. diff --git a/crates/bevy_ecs/src/query/mod.rs b/crates/bevy_ecs/src/query/mod.rs index 99e140846073c..ad687f96cfb7e 100644 --- a/crates/bevy_ecs/src/query/mod.rs +++ b/crates/bevy_ecs/src/query/mod.rs @@ -2,12 +2,14 @@ mod access; mod fetch; mod filter; mod iter; +mod par_iter; mod state; pub use access::*; pub use fetch::*; pub use filter::*; pub use iter::*; +pub use par_iter::*; pub use state::*; #[cfg(test)] diff --git a/crates/bevy_ecs/src/query/par_iter.rs b/crates/bevy_ecs/src/query/par_iter.rs new file mode 100644 index 0000000000000..3c865a477a296 --- /dev/null +++ b/crates/bevy_ecs/src/query/par_iter.rs @@ -0,0 +1,396 @@ +use std::marker::PhantomData; + +use bevy_tasks::{ParallelIterator, TaskPool}; + +use crate::{ + archetype::{ArchetypeId, Archetypes}, + query::{Fetch, FilterFetch, QueryState, WorldQuery}, + storage::{TableId, Tables}, + world::World, +}; + +pub struct ParQueryIter<'w, 's, Q: WorldQuery, F: WorldQuery> +where + F::Fetch: FilterFetch, +{ + batch_size: usize, + offset: usize, + world: &'w World, + query_state: &'s QueryState, + last_change_tick: u32, + change_tick: u32, + dos: DenseOrSparse<'w, 's>, +} + +enum DenseOrSparse<'w, 's> { + Dense { + tables: &'w Tables, + table_id_iter: std::slice::Iter<'s, TableId>, + table: Option, + }, + Sparse { + archetypes: &'w Archetypes, + archetype_id_iter: std::slice::Iter<'s, ArchetypeId>, + archetype: Option, + }, +} + +impl<'w, 's, Q: WorldQuery, F: WorldQuery> ParQueryIter<'w, 's, Q, F> +where + F::Fetch: FilterFetch, +{ + pub(crate) unsafe fn new( + world: &'w World, + query_state: &'s QueryState, + batch_size: usize, + last_change_tick: u32, + change_tick: u32, + ) -> Self { + let fetch = ::init( + world, + &query_state.fetch_state, + last_change_tick, + change_tick, + ); + let filter = ::init( + world, + &query_state.filter_state, + last_change_tick, + change_tick, + ); + if fetch.is_dense() && filter.is_dense() { + let tables = &world.storages().tables; + let mut table_id_iter = query_state.matched_table_ids.iter(); + ParQueryIter { + batch_size, + offset: 0, + world, + query_state, + last_change_tick, + change_tick, + dos: DenseOrSparse::Dense { + table: table_id_iter.next().copied(), + table_id_iter, + tables, + }, + } + } else { + let archetypes = world.archetypes(); + let mut archetype_id_iter = query_state.matched_archetype_ids.iter(); + ParQueryIter { + batch_size, + offset: 0, + world, + query_state, + last_change_tick, + change_tick, + dos: DenseOrSparse::Sparse { + archetype: archetype_id_iter.next().copied(), + archetype_id_iter, + archetypes, + }, + } + } + } +} + +type QItem<'w, Q> = <::Fetch as Fetch<'w>>::Item; + +pub struct IntoBatchIterator<'w, 's, Q: WorldQuery, F: WorldQuery> +where + F::Fetch: FilterFetch, +{ + world: &'w World, + state: &'s QueryState, + index_range: as IntoIterator>::IntoIter, + last_change_tick: u32, + change_tick: u32, + tor: TableOrArchetype, +} +enum TableOrArchetype { + Table(TableId), + Archetype(ArchetypeId), +} + +impl<'w, 's, Q: WorldQuery, F: WorldQuery> IntoIterator for IntoBatchIterator<'w, 's, Q, F> +where + F::Fetch: FilterFetch, +{ + type Item = QItem<'w, Q>; + + type IntoIter = BatchIterator<'w, Q, F>; + + fn into_iter(self) -> Self::IntoIter { + unsafe { + let mut fetch = ::init( + self.world, + &self.state.fetch_state, + self.last_change_tick, + self.change_tick, + ); + let mut filter = ::init( + self.world, + &self.state.filter_state, + self.last_change_tick, + self.change_tick, + ); + + let tables = &self.world.storages().tables; + let dense = match self.tor { + TableOrArchetype::Table(table) => { + let table = &tables[table]; + fetch.set_table(&self.state.fetch_state, table); + filter.set_table(&self.state.filter_state, table); + true + } + TableOrArchetype::Archetype(archetype_id) => { + let archetype = &self.world.archetypes[archetype_id]; + fetch.set_archetype(&self.state.fetch_state, archetype, tables); + filter.set_archetype(&self.state.filter_state, archetype, tables); + false + } + }; + + BatchIterator { + marker: PhantomData, + dense, + fetch, + filter, + ir: self.index_range, + } + } + } +} + +pub struct BatchIterator<'w, Q: WorldQuery, F: WorldQuery> +where + F::Fetch: FilterFetch, +{ + marker: PhantomData<&'w ()>, + dense: bool, + fetch: Q::Fetch, + filter: F::Fetch, + ir: as IntoIterator>::IntoIter, +} + +impl<'w, Q: WorldQuery, F: WorldQuery> Iterator for BatchIterator<'w, Q, F> +where + F::Fetch: FilterFetch, +{ + type Item = QItem<'w, Q>; + + fn next(&mut self) -> Option { + if let Some(index) = self.ir.next() { + unsafe { + Some(if self.dense { + if !self.filter.table_filter_fetch(index) { + return self.next(); + } + self.fetch.table_fetch(index) + } else { + if !self.filter.archetype_filter_fetch(index) { + return self.next(); + } + self.fetch.archetype_fetch(index) + }) + } + } else { + None + } + } +} + +impl<'w, 's, Q: WorldQuery, F: WorldQuery> ParallelIterator> + for ParQueryIter<'w, 's, Q, F> +where + F::Fetch: FilterFetch, +{ + type Item = QItem<'w, Q>; + + fn next_batch(&mut self) -> Option> { + let Self { + batch_size, + offset, + world, + query_state, + last_change_tick, + change_tick, + dos, + } = self; + match dos { + DenseOrSparse::Dense { + tables, + table_id_iter, + table, + } => { + if let Some(table) = table { + if *offset >= tables[*table].len() { + if let Some(&id) = table_id_iter.next() { + dbg!(id); + *table = id; + *offset = 0; + } else { + return None; + } + } + let table_id = *table; + let table = &tables[table_id]; + let len = (*batch_size).min(table.len() - *offset); + let range = (*offset)..((*offset) + len); + *offset += *batch_size; + Some(IntoBatchIterator { + world: *world, + state: *query_state, + index_range: range, + last_change_tick: *last_change_tick, + change_tick: *change_tick, + tor: TableOrArchetype::Table(table_id), + }) + } else if let Some(&id) = table_id_iter.next() { + dbg!(id); + *table = Some(id); + *offset = 0; + self.next_batch() + } else { + None + } + } + DenseOrSparse::Sparse { + archetypes, + archetype_id_iter, + archetype, + } => { + if let Some(archetype) = archetype { + if *offset >= archetypes[*archetype].len() { + if let Some(&id) = archetype_id_iter.next() { + *archetype = id; + *offset = 0; + } else { + return None; + } + } + let archetype_id = *archetype; + let archetype = &archetypes[archetype_id]; + let len = (*batch_size).min(archetype.len() - *offset); + let range = (*offset)..((*offset) + len); + *offset += *batch_size; + Some(IntoBatchIterator { + world: *world, + state: *query_state, + index_range: range, + last_change_tick: *last_change_tick, + change_tick: *change_tick, + tor: TableOrArchetype::Archetype(archetype_id), + }) + } else if let Some(&id) = archetype_id_iter.next() { + *archetype = Some(id); + *offset = 0; + self.next_batch() + } else { + None + } + } + } + } + + fn fold(self, pool: &TaskPool, init: C, f: E) -> Vec + where + E: FnMut(C, Self::Item) -> C + Send + Sync + Clone, + C: Clone + Send + Sync + 'static, + { + let Self { + offset, + batch_size, + world, + query_state, + last_change_tick, + change_tick, + dos, + } = self; + pool.scope(move |scope| match dos { + DenseOrSparse::Dense { + tables, + table_id_iter, + table, + } => { + for table_id in table.into_iter().chain(table_id_iter.copied()) { + let table = &tables[table_id]; + let mut offset = offset; + while offset < table.len() { + let func = f.clone(); + let init = init.clone(); + scope.spawn(async move { + unsafe { + let mut fetch = ::init( + world, + &query_state.fetch_state, + last_change_tick, + change_tick, + ); + let mut filter = ::init( + world, + &query_state.filter_state, + last_change_tick, + change_tick, + ); + let tables = &world.storages().tables; + let table = &tables[table_id]; + fetch.set_table(&query_state.fetch_state, table); + filter.set_table(&query_state.filter_state, table); + let len = batch_size.min(table.len() - offset); + (offset..offset + len) + .filter(|&table_index| filter.table_filter_fetch(table_index)) + .map(|table_index| fetch.table_fetch(table_index)) + .fold(init, func) + } + }); + offset += batch_size; + } + } + } + DenseOrSparse::Sparse { + archetypes, + archetype_id_iter, + archetype, + } => { + for archetype_id in archetype.into_iter().chain(archetype_id_iter.copied()) { + let mut offset = offset; + let archetype = &archetypes[archetype_id]; + while offset < archetype.len() { + let func = f.clone(); + let init = init.clone(); + scope.spawn(async move { + unsafe { + let mut fetch = ::init( + world, + &query_state.fetch_state, + last_change_tick, + change_tick, + ); + let mut filter = ::init( + world, + &query_state.filter_state, + last_change_tick, + change_tick, + ); + let tables = &world.storages().tables; + let archetype = &world.archetypes[archetype_id]; + fetch.set_archetype(&query_state.fetch_state, archetype, tables); + filter.set_archetype(&query_state.filter_state, archetype, tables); + + let len = batch_size.min(archetype.len() - offset); + (offset..offset + len) + .filter(|&table_index| { + filter.archetype_filter_fetch(table_index) + }) + .map(|table_index| fetch.archetype_fetch(table_index)) + .fold(init, func) + } + }); + offset += batch_size; + } + } + } + }) + } +} diff --git a/crates/bevy_ecs/src/query/state.rs b/crates/bevy_ecs/src/query/state.rs index 05c009c6e82db..eaf4b8ec130fc 100644 --- a/crates/bevy_ecs/src/query/state.rs +++ b/crates/bevy_ecs/src/query/state.rs @@ -3,8 +3,8 @@ use crate::{ component::ComponentId, entity::Entity, query::{ - Access, Fetch, FetchState, FilterFetch, FilteredAccess, QueryCombinationIter, QueryIter, - ReadOnlyFetch, WorldQuery, + Access, Fetch, FetchState, FilterFetch, FilteredAccess, ParQueryIter, QueryCombinationIter, + QueryIter, ReadOnlyFetch, WorldQuery, }, storage::TableId, world::{World, WorldId}, @@ -202,6 +202,19 @@ where unsafe { self.iter_unchecked(world) } } + #[inline] + pub fn par_iter<'w, 's>( + &'s mut self, + world: &'w World, + batch_size: usize, + ) -> ParQueryIter<'w, 's, Q, F> + where + Q::Fetch: ReadOnlyFetch, + { + // SAFETY: query is read only + unsafe { self.par_iter_unchecked(world, batch_size) } + } + #[inline] pub fn iter_mut<'w, 's>(&'s mut self, world: &'w mut World) -> QueryIter<'w, 's, Q, F> { // SAFETY: query has unique world access @@ -209,6 +222,14 @@ where } #[inline] + pub fn par_iter_mut<'w, 's>( + &'s mut self, + world: &'w mut World, + batch_size: usize, + ) -> ParQueryIter<'w, 's, Q, F> { + // SAFETY: query has unique world access + unsafe { self.par_iter_unchecked(world, batch_size) } + } pub fn iter_combinations<'w, 's, const K: usize>( &'s mut self, world: &'w World, @@ -247,6 +268,19 @@ where /// This does not check for mutable query correctness. To be safe, make sure mutable queries /// have unique access to the components they query. #[inline] + pub unsafe fn par_iter_unchecked<'w, 's>( + &'s mut self, + world: &'w World, + batch_size: usize, + ) -> ParQueryIter<'w, 's, Q, F> { + self.validate_world_and_update_archetypes(world); + self.par_iter_unchecked_manual( + world, + batch_size, + world.last_change_tick(), + world.read_change_tick(), + ) + } pub unsafe fn iter_combinations_unchecked<'w, 's, const K: usize>( &'s mut self, world: &'w World, @@ -280,6 +314,15 @@ where /// This does not validate that `world.id()` matches `self.world_id`. Calling this on a `world` /// with a mismatched WorldId is unsound. #[inline] + pub(crate) unsafe fn par_iter_unchecked_manual<'w, 's>( + &'s self, + world: &'w World, + batch_size: usize, + last_change_tick: u32, + change_tick: u32, + ) -> ParQueryIter<'w, 's, Q, F> { + ParQueryIter::new(world, self, batch_size, last_change_tick, change_tick) + } pub(crate) unsafe fn iter_combinations_unchecked_manual<'w, 's, const K: usize>( &'s self, world: &'w World, diff --git a/crates/bevy_ecs/src/system/query.rs b/crates/bevy_ecs/src/system/query.rs index 6173889eeef33..925417e410a50 100644 --- a/crates/bevy_ecs/src/system/query.rs +++ b/crates/bevy_ecs/src/system/query.rs @@ -2,8 +2,8 @@ use crate::{ component::Component, entity::Entity, query::{ - Fetch, FilterFetch, QueryCombinationIter, QueryEntityError, QueryIter, QueryState, - ReadOnlyFetch, WorldQuery, + Fetch, FilterFetch, ParQueryIter, QueryCombinationIter, QueryEntityError, QueryIter, + QueryState, ReadOnlyFetch, WorldQuery, }, world::{Mut, World}, }; @@ -192,6 +192,40 @@ where } } + /// Returns an [`bevy_tasks::ParallelIterator`] over the query results. + /// + /// This can only be called for read-only queries, see [`Self::iter_mut`] for write-queries. + #[inline] + pub fn par_iter(&self, batch_size: usize) -> ParQueryIter<'_, '_, Q, F> + where + Q::Fetch: ReadOnlyFetch, + { + // SAFE: system runs without conflicts with other systems. + // same-system queries have runtime borrow checks when they conflict + unsafe { + self.state.par_iter_unchecked_manual( + self.world, + batch_size, + self.last_change_tick, + self.change_tick, + ) + } + } + + /// Returns an [`bevy_tasks::ParallelIterator`] over the query results. + #[inline] + pub fn par_iter_mut(&mut self, batch_size: usize) -> ParQueryIter<'_, '_, Q, F> { + // SAFE: system runs without conflicts with other systems. + // same-system queries have runtime borrow checks when they conflict + unsafe { + self.state.par_iter_unchecked_manual( + self.world, + batch_size, + self.last_change_tick, + self.change_tick, + ) + } + } /// Iterates over all possible combinations of `K` query results without repetition. /// /// The returned value is not an `Iterator`, because that would lead to aliasing of mutable references. diff --git a/crates/bevy_tasks/src/iter/adapters.rs b/crates/bevy_tasks/src/iter/adapters.rs index 047f6fb8ad5f0..d6595ebb56e0e 100644 --- a/crates/bevy_tasks/src/iter/adapters.rs +++ b/crates/bevy_tasks/src/iter/adapters.rs @@ -7,22 +7,67 @@ pub struct Chain { pub(crate) left_in_progress: bool, } -impl ParallelIterator for Chain +enum ChainBatch { + A(A), + B(B), +} + +enum Either { + A(A), + B(B), +} + +impl IntoIterator for ChainBatch +where + A: IntoIterator + Send, + B: IntoIterator + Send, + I1: Iterator, + I2: Iterator, +{ + type Item = T; + + type IntoIter = Either; + + fn into_iter(self) -> Self::IntoIter { + match self { + ChainBatch::A(a) => Either::A(a.into_iter()), + ChainBatch::B(b) => Either::B(b.into_iter()), + } + } +} + +impl Iterator for Either where - B: Iterator + Send, - T: ParallelIterator, - U: ParallelIterator, + A: Iterator, + B: Iterator, +{ + type Item = T; + + fn next(&mut self) -> Option { + match self { + Either::A(a) => a.into_iter().next(), + Either::B(b) => b.into_iter().next(), + } + } +} + +impl ParallelIterator> for Chain +where + I1: IntoIterator + Send, + I2: IntoIterator + Send, + T: ParallelIterator, + U: ParallelIterator, { type Item = T::Item; - fn next_batch(&mut self) -> Option { + fn next_batch(&mut self) -> Option> { if self.left_in_progress { match self.left.next_batch() { - b @ Some(_) => return b, + b @ Some(_) => return b.map(ChainBatch::A), None => self.left_in_progress = false, } } - self.right.next_batch() + self.right.next_batch().map(ChainBatch::B) } } @@ -32,16 +77,33 @@ pub struct Map { pub(crate) f: F, } -impl ParallelIterator> for Map +impl ParallelIterator> for Map where - B: Iterator + Send, + B: IntoIterator + Send, U: ParallelIterator, F: FnMut(U::Item) -> T + Send + Clone, { type Item = T; - fn next_batch(&mut self) -> Option> { - self.iter.next_batch().map(|b| b.map(self.f.clone())) + fn next_batch(&mut self) -> Option> { + self.iter.next_batch().map(|b| Map { + f: self.f.clone(), + iter: b, + }) + } +} + +impl IntoIterator for Map +where + B: IntoIterator + Send, + F: FnMut(B::Item) -> T + Send + Clone, +{ + type Item = T; + + type IntoIter = std::iter::Map; + + fn into_iter(self) -> Self::IntoIter { + self.iter.into_iter().map(self.f) } } @@ -51,18 +113,33 @@ pub struct Filter { pub(crate) predicate: F, } -impl ParallelIterator> for Filter +impl ParallelIterator> for Filter where - B: Iterator + Send, + B: IntoIterator + Send, P: ParallelIterator, F: FnMut(&P::Item) -> bool + Send + Clone, { type Item = P::Item; - fn next_batch(&mut self) -> Option> { - self.iter - .next_batch() - .map(|b| b.filter(self.predicate.clone())) + fn next_batch(&mut self) -> Option> { + self.iter.next_batch().map(|iter| Filter { + iter, + predicate: self.predicate.clone(), + }) + } +} + +impl IntoIterator for Filter +where + I: IntoIterator + Send, + F: FnMut(&I::Item) -> bool + Send + Clone, +{ + type Item = I::Item; + + type IntoIter = std::iter::Filter; + + fn into_iter(self) -> Self::IntoIter { + self.iter.into_iter().filter(self.predicate) } } @@ -72,16 +149,33 @@ pub struct FilterMap { pub(crate) f: F, } -impl ParallelIterator> for FilterMap +impl ParallelIterator> for FilterMap where - B: Iterator + Send, + B: IntoIterator + Send, P: ParallelIterator, F: FnMut(P::Item) -> Option + Send + Clone, { type Item = R; - fn next_batch(&mut self) -> Option> { - self.iter.next_batch().map(|b| b.filter_map(self.f.clone())) + fn next_batch(&mut self) -> Option> { + self.iter.next_batch().map(|b| FilterMap { + iter: b, + f: self.f.clone(), + }) + } +} + +impl IntoIterator for FilterMap +where + I: IntoIterator + Send, + F: FnMut(I::Item) -> Option + Send + Clone, +{ + type Item = R; + + type IntoIter = std::iter::FilterMap; + + fn into_iter(self) -> Self::IntoIter { + self.iter.into_iter().filter_map(self.f) } } @@ -91,9 +185,9 @@ pub struct FlatMap { pub(crate) f: F, } -impl ParallelIterator> for FlatMap +impl ParallelIterator> for FlatMap where - B: Iterator + Send, + B: IntoIterator + Send, P: ParallelIterator, F: FnMut(P::Item) -> U + Send + Clone, U: IntoIterator, @@ -103,8 +197,27 @@ where // This extends each batch using the flat map. The other option is // to turn each IntoIter into its own batch. - fn next_batch(&mut self) -> Option> { - self.iter.next_batch().map(|b| b.flat_map(self.f.clone())) + fn next_batch(&mut self) -> Option> { + self.iter.next_batch().map(|b| FlatMap { + iter: b, + f: self.f.clone(), + }) + } +} + +impl IntoIterator for FlatMap +where + I: IntoIterator + Send, + F: FnMut(I::Item) -> U + Send + Clone, + U: IntoIterator, + U::IntoIter: Send, +{ + type Item = U::Item; + + type IntoIter = std::iter::FlatMap; + + fn into_iter(self) -> Self::IntoIter { + self.iter.into_iter().flat_map(self.f) } } @@ -113,9 +226,9 @@ pub struct Flatten

{ pub(crate) iter: P, } -impl ParallelIterator> for Flatten

+impl ParallelIterator> for Flatten

where - B: Iterator + Send, + B: IntoIterator + Send, P: ParallelIterator, B::Item: IntoIterator, ::IntoIter: Send, @@ -124,8 +237,22 @@ where // This extends each batch using the flatten. The other option is to // turn each IntoIter into its own batch. - fn next_batch(&mut self) -> Option> { - self.iter.next_batch().map(|b| b.flatten()) + fn next_batch(&mut self) -> Option> { + self.iter.next_batch().map(|b| Flatten { iter: b }) + } +} + +impl IntoIterator for Flatten +where + I: IntoIterator + Send, + I::Item: IntoIterator, +{ + type Item = ::Item; + + type IntoIter = std::iter::Flatten; + + fn into_iter(self) -> Self::IntoIter { + self.iter.into_iter().flatten() } } @@ -136,7 +263,7 @@ pub struct Fuse

{ impl ParallelIterator for Fuse

where - B: Iterator + Send, + B: IntoIterator + Send, P: ParallelIterator, { type Item = P::Item; @@ -161,16 +288,33 @@ pub struct Inspect { pub(crate) f: F, } -impl ParallelIterator> for Inspect +impl ParallelIterator> for Inspect where - B: Iterator + Send, + B: IntoIterator + Send, P: ParallelIterator, F: FnMut(&P::Item) + Send + Clone, { type Item = P::Item; - fn next_batch(&mut self) -> Option> { - self.iter.next_batch().map(|b| b.inspect(self.f.clone())) + fn next_batch(&mut self) -> Option> { + self.iter.next_batch().map(|b| Inspect { + iter: b, + f: self.f.clone(), + }) + } +} + +impl IntoIterator for Inspect +where + I: IntoIterator + Send, + F: FnMut(&I::Item) + Send + Clone, +{ + type Item = I::Item; + + type IntoIter = std::iter::Inspect; + + fn into_iter(self) -> Self::IntoIter { + self.iter.into_iter().inspect(self.f) } } @@ -179,16 +323,30 @@ pub struct Copied

{ pub(crate) iter: P, } -impl<'a, B, P, T> ParallelIterator> for Copied

+impl<'a, B, P, T> ParallelIterator> for Copied

where - B: Iterator + Send, + B: IntoIterator + Send, P: ParallelIterator, T: 'a + Copy, { type Item = T; - fn next_batch(&mut self) -> Option> { - self.iter.next_batch().map(|b| b.copied()) + fn next_batch(&mut self) -> Option> { + self.iter.next_batch().map(|b| Copied { iter: b }) + } +} + +impl<'a, I, T> IntoIterator for Copied +where + I: IntoIterator + Send, + T: 'a + Copy, +{ + type Item = T; + + type IntoIter = std::iter::Copied; + + fn into_iter(self) -> Self::IntoIter { + self.iter.into_iter().copied() } } @@ -197,16 +355,30 @@ pub struct Cloned

{ pub(crate) iter: P, } -impl<'a, B, P, T> ParallelIterator> for Cloned

+impl<'a, B, P, T> ParallelIterator> for Cloned

where - B: Iterator + Send, + B: IntoIterator + Send, P: ParallelIterator, T: 'a + Copy, { type Item = T; - fn next_batch(&mut self) -> Option> { - self.iter.next_batch().map(|b| b.cloned()) + fn next_batch(&mut self) -> Option> { + self.iter.next_batch().map(|b| Cloned { iter: b }) + } +} + +impl<'a, I, T> IntoIterator for Cloned +where + I: IntoIterator + Send, + T: 'a + Copy, +{ + type Item = T; + + type IntoIter = std::iter::Cloned; + + fn into_iter(self) -> Self::IntoIter { + self.iter.into_iter().cloned() } } @@ -218,7 +390,7 @@ pub struct Cycle

{ impl ParallelIterator for Cycle

where - B: Iterator + Send, + B: IntoIterator + Send, P: ParallelIterator + Clone, { type Item = P::Item; diff --git a/crates/bevy_tasks/src/iter/mod.rs b/crates/bevy_tasks/src/iter/mod.rs index 91147fceb1acf..7f5a42c7eef15 100644 --- a/crates/bevy_tasks/src/iter/mod.rs +++ b/crates/bevy_tasks/src/iter/mod.rs @@ -13,8 +13,8 @@ pub use adapters::*; /// using ParallelIterator. pub trait ParallelIterator where - B: Iterator + Send, - Self: Sized + Send, + B: IntoIterator + Send, + Self: Sized, { type Item; @@ -38,7 +38,7 @@ where fn count(mut self, pool: &TaskPool) -> usize { pool.scope(|s| { while let Some(batch) = self.next_batch() { - s.spawn(async move { batch.count() }) + s.spawn(async move { batch.into_iter().count() }) } }) .iter() @@ -51,7 +51,7 @@ where fn last(mut self, _pool: &TaskPool) -> Option { let mut last_item = None; while let Some(batch) = self.next_batch() { - last_item = batch.last(); + last_item = batch.into_iter().last(); } last_item } @@ -103,18 +103,11 @@ where /// Calls a closure on each item of a parallel iterator. /// /// See [`Iterator::for_each()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.for_each) - fn for_each(mut self, pool: &TaskPool, f: F) + fn for_each(self, pool: &TaskPool, mut f: F) where F: FnMut(Self::Item) + Send + Clone + Sync, { - pool.scope(|s| { - while let Some(batch) = self.next_batch() { - let newf = f.clone(); - s.spawn(async move { - batch.for_each(newf); - }); - } - }); + self.fold(pool, (), move |(), v| f(v)); } /// Creates a parallel iterator which uses a closure to determine @@ -199,7 +192,7 @@ where { pool.scope(|s| { while let Some(batch) = self.next_batch() { - s.spawn(async move { batch.collect::>() }); + s.spawn(async move { batch.into_iter().collect::>() }); } }) .into_iter() @@ -221,7 +214,7 @@ where pool.scope(|s| { while let Some(batch) = self.next_batch() { let newf = f.clone(); - s.spawn(async move { batch.partition::, F>(newf) }) + s.spawn(async move { batch.into_iter().partition::, F>(newf) }) } }) .into_iter() @@ -239,7 +232,7 @@ where /// results (in batch order).* /// /// See [`Iterator::fold()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold) - fn fold(mut self, pool: &TaskPool, init: C, f: F) -> Vec + fn fold(mut self, pool: &TaskPool, init: C, f: F) -> Vec where F: FnMut(C, Self::Item) -> C + Send + Sync + Clone, C: Clone + Send + Sync + 'static, @@ -248,7 +241,7 @@ where while let Some(batch) = self.next_batch() { let newf = f.clone(); let newi = init.clone(); - s.spawn(async move { batch.fold(newi, newf) }); + s.spawn(async move { batch.into_iter().fold(newi, newf) }); } }) } @@ -263,9 +256,9 @@ where F: FnMut(Self::Item) -> bool + Send + Sync + Clone, { pool.scope(|s| { - while let Some(mut batch) = self.next_batch() { + while let Some(batch) = self.next_batch() { let newf = f.clone(); - s.spawn(async move { batch.all(newf) }); + s.spawn(async move { batch.into_iter().all(newf) }); } }) .into_iter() @@ -282,9 +275,9 @@ where F: FnMut(Self::Item) -> bool + Send + Sync + Clone, { pool.scope(|s| { - while let Some(mut batch) = self.next_batch() { + while let Some(batch) = self.next_batch() { let newf = f.clone(); - s.spawn(async move { batch.any(newf) }); + s.spawn(async move { batch.into_iter().any(newf) }); } }) .into_iter() @@ -336,7 +329,7 @@ where { pool.scope(|s| { while let Some(batch) = self.next_batch() { - s.spawn(async move { batch.max() }); + s.spawn(async move { batch.into_iter().max() }); } }) .into_iter() @@ -353,7 +346,7 @@ where { pool.scope(|s| { while let Some(batch) = self.next_batch() { - s.spawn(async move { batch.min() }); + s.spawn(async move { batch.into_iter().min() }); } }) .into_iter() @@ -373,7 +366,7 @@ where pool.scope(|s| { while let Some(batch) = self.next_batch() { let newf = f.clone(); - s.spawn(async move { batch.max_by_key(newf) }); + s.spawn(async move { batch.into_iter().max_by_key(newf) }); } }) .into_iter() @@ -393,7 +386,7 @@ where pool.scope(|s| { while let Some(batch) = self.next_batch() { let newf = f.clone(); - s.spawn(async move { batch.max_by(newf) }); + s.spawn(async move { batch.into_iter().max_by(newf) }); } }) .into_iter() @@ -413,7 +406,7 @@ where pool.scope(|s| { while let Some(batch) = self.next_batch() { let newf = f.clone(); - s.spawn(async move { batch.min_by_key(newf) }); + s.spawn(async move { batch.into_iter().min_by_key(newf) }); } }) .into_iter() @@ -433,7 +426,7 @@ where pool.scope(|s| { while let Some(batch) = self.next_batch() { let newf = f.clone(); - s.spawn(async move { batch.min_by(newf) }); + s.spawn(async move { batch.into_iter().min_by(newf) }); } }) .into_iter() @@ -486,7 +479,7 @@ where { pool.scope(|s| { while let Some(batch) = self.next_batch() { - s.spawn(async move { batch.sum() }); + s.spawn(async move { batch.into_iter().sum() }); } }) .into_iter() @@ -503,7 +496,7 @@ where { pool.scope(|s| { while let Some(batch) = self.next_batch() { - s.spawn(async move { batch.product() }); + s.spawn(async move { batch.into_iter().product() }); } }) .into_iter() diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 8f248e1005c38..8a7221aa04d41 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -59,7 +59,7 @@ impl TaskPool { /// This is similar to `rayon::scope` and `crossbeam::scope` pub fn scope<'scope, F, T>(&self, f: F) -> Vec where - F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send, + F: FnOnce(&mut Scope<'scope, T>) + 'scope, T: Send + 'static, { let executor = &async_executor::LocalExecutor::new(); diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 9331799120583..db2655c273c9d 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -163,7 +163,7 @@ impl TaskPool { /// This is similar to `rayon::scope` and `crossbeam::scope` pub fn scope<'scope, F, T>(&self, f: F) -> Vec where - F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send, + F: FnOnce(&mut Scope<'scope, T>) + 'scope, T: Send + 'static, { TaskPool::LOCAL_EXECUTOR.with(|local_executor| {