Skip to content

Commit

Permalink
Merge pull request #234 from Imberflur/send-dispatcher
Browse files Browse the repository at this point in the history
Add a dispatcher that can be sent between threads
  • Loading branch information
Imberflur authored Jan 3, 2024
2 parents 5450bf3 + 3f15a49 commit 5d52c6f
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 42 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,6 @@ jobs:
# Miri currently reports leaks in some tests so we disable that check
# here (might be due to ptr-int-ptr in crossbeam-epoch so might be
# resolved in future versions of that crate).
run: MIRIFLAGS="-Zmiri-ignore-leaks" cargo miri test
#
# crossbeam-epoch doesn't pass with stacked borrows https://github.com/crossbeam-rs/crossbeam/issues/545
run: MIRIFLAGS="-Zmiri-ignore-leaks -Zmiri-tree-borrows" cargo miri test
72 changes: 33 additions & 39 deletions src/dispatch/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use smallvec::SmallVec;

use crate::{dispatch::stage::Stage, system::RunNow, world::World};
use crate::{
dispatch::{stage::Stage, SendDispatcher},
system::RunNow,
world::World,
};

/// This wrapper is used to share a replaceable ThreadPool with other
/// dispatchers. Useful with batch dispatchers.
Expand All @@ -10,19 +14,15 @@ pub type ThreadPoolWrapper = Option<::std::sync::Arc<::rayon::ThreadPool>>;
/// The dispatcher struct, allowing
/// systems to be executed in parallel.
pub struct Dispatcher<'a, 'b> {
stages: Vec<Stage<'a>>,
inner: SendDispatcher<'a>,
thread_local: ThreadLocal<'b>,
#[cfg(feature = "parallel")]
thread_pool: ::std::sync::Arc<::std::sync::RwLock<ThreadPoolWrapper>>,
}

impl<'a, 'b> Dispatcher<'a, 'b> {
/// Sets up all the systems which means they are gonna add default values
/// for the resources they need.
pub fn setup(&mut self, world: &mut World) {
for stage in &mut self.stages {
stage.setup(world);
}
self.inner.setup(world);

for sys in &mut self.thread_local {
sys.setup(world);
Expand All @@ -34,9 +34,7 @@ impl<'a, 'b> Dispatcher<'a, 'b> {
/// / or resources from the `World` which are associated with external
/// resources.
pub fn dispose(self, world: &mut World) {
for stage in self.stages {
stage.dispose(world);
}
self.inner.dispose(world);

for sys in self.thread_local {
sys.dispose(world);
Expand All @@ -56,12 +54,7 @@ impl<'a, 'b> Dispatcher<'a, 'b> {
/// Please note that this method assumes that no resource
/// is currently borrowed. If that's the case, it panics.
pub fn dispatch(&mut self, world: &World) {
#[cfg(feature = "parallel")]
self.dispatch_par(world);

#[cfg(not(feature = "parallel"))]
self.dispatch_seq(world);

self.inner.dispatch(world);
self.dispatch_thread_local(world);
}

Expand All @@ -77,18 +70,7 @@ impl<'a, 'b> Dispatcher<'a, 'b> {
/// is currently borrowed. If that's the case, it panics.
#[cfg(feature = "parallel")]
pub fn dispatch_par(&mut self, world: &World) {
let stages = &mut self.stages;

self.thread_pool
.read()
.unwrap()
.as_ref()
.unwrap()
.install(move || {
for stage in stages {
stage.execute(world);
}
});
self.inner.dispatch_par(world);
}

/// Dispatches the systems (except thread local systems) sequentially.
Expand All @@ -99,9 +81,7 @@ impl<'a, 'b> Dispatcher<'a, 'b> {
/// Please note that this method assumes that no resource
/// is currently borrowed. If that's the case, it panics.
pub fn dispatch_seq(&mut self, world: &World) {
for stage in &mut self.stages {
stage.execute_seq(world);
}
self.inner.dispatch_seq(world);
}

/// Dispatch only thread local systems sequentially.
Expand All @@ -114,16 +94,28 @@ impl<'a, 'b> Dispatcher<'a, 'b> {
}
}

/// Converts this to a [`SendDispatcher`].
///
/// Fails and returns the original distpatcher if it contains thread local systems.
pub fn try_into_sendable(self) -> Result<SendDispatcher<'a>, Self> {
let Dispatcher {
inner: _,
thread_local,
} = &self;

if thread_local.is_empty() {
Ok(self.inner)
} else {
Err(self)
}
}

/// This method returns the largest amount of threads this dispatcher
/// can make use of. This is mainly for debugging purposes so you can see
/// how well your systems can make use of multi-threading.
#[cfg(feature = "parallel")]
pub fn max_threads(&self) -> usize {
self.stages
.iter()
.map(Stage::max_threads)
.max()
.unwrap_or(0)
self.inner.max_threads()
}
}

Expand Down Expand Up @@ -154,9 +146,11 @@ pub fn new_dispatcher<'a, 'b>(
thread_pool: ::std::sync::Arc<::std::sync::RwLock<ThreadPoolWrapper>>,
) -> Dispatcher<'a, 'b> {
Dispatcher {
stages,
inner: SendDispatcher {
stages,
thread_pool,
},
thread_local,
thread_pool,
}
}

Expand All @@ -166,7 +160,7 @@ pub fn new_dispatcher<'a, 'b>(
thread_local: ThreadLocal<'b>,
) -> Dispatcher<'a, 'b> {
Dispatcher {
stages,
inner: SendDispatcher { stages },
thread_local,
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/dispatch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub use self::{
},
builder::DispatcherBuilder,
dispatcher::Dispatcher,
send_dispatcher::SendDispatcher,
};

#[cfg(feature = "parallel")]
Expand All @@ -18,5 +19,6 @@ mod builder;
mod dispatcher;
#[cfg(feature = "parallel")]
mod par_seq;
mod send_dispatcher;
mod stage;
mod util;
128 changes: 128 additions & 0 deletions src/dispatch/send_dispatcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#[cfg(feature = "parallel")]
use crate::dispatch::dispatcher::ThreadPoolWrapper;
use crate::{dispatch::stage::Stage, system::RunNow, world::World};

/// `Send`able version of [`Dispatcher`](crate::dispatch::Dispatcher).
///
/// Can't hold thread local systems.
///
/// Create using [`Dispatcher::try_into_sendable`](crate::dispatch::Dispatcher::try_into_sendable).
pub struct SendDispatcher<'a> {
pub(super) stages: Vec<Stage<'a>>,
#[cfg(feature = "parallel")]
pub(super) thread_pool: ::std::sync::Arc<::std::sync::RwLock<ThreadPoolWrapper>>,
}

impl<'a> SendDispatcher<'a> {
/// Sets up all the systems which means they are gonna add default values
/// for the resources they need.
pub fn setup(&mut self, world: &mut World) {
for stage in &mut self.stages {
stage.setup(world);
}
}

/// Calls the `dispose` method of all systems and allows them to release
/// external resources. It is common this method removes components and
/// / or resources from the `World` which are associated with external
/// resources.
pub fn dispose(self, world: &mut World) {
for stage in self.stages {
stage.dispose(world);
}
}

/// Dispatch all the systems with given resources and context
/// and then run thread local systems.
///
/// This function automatically redirects to
///
/// * [SendDispatcher::dispatch_par] in case it is supported
/// * [SendDispatcher::dispatch_seq] otherwise
///
/// and runs `dispatch_thread_local` afterwards.
///
/// Please note that this method assumes that no resource
/// is currently borrowed. If that's the case, it panics.
pub fn dispatch(&mut self, world: &World) {
#[cfg(feature = "parallel")]
self.dispatch_par(world);

#[cfg(not(feature = "parallel"))]
self.dispatch_seq(world);
}

/// Dispatches the systems (except thread local systems)
/// in parallel given the resources to operate on.
///
/// This operation blocks the
/// executing thread.
///
/// Only available with "parallel" feature enabled.
///
/// Please note that this method assumes that no resource
/// is currently borrowed. If that's the case, it panics.
#[cfg(feature = "parallel")]
pub fn dispatch_par(&mut self, world: &World) {
let stages = &mut self.stages;

self.thread_pool
.read()
.unwrap()
.as_ref()
.unwrap()
.install(move || {
for stage in stages {
stage.execute(world);
}
});
}

/// Dispatches the systems (except thread local systems) sequentially.
///
/// This is useful if parallel overhead is
/// too big or the platform does not support multithreading.
///
/// Please note that this method assumes that no resource
/// is currently borrowed. If that's the case, it panics.
pub fn dispatch_seq(&mut self, world: &World) {
for stage in &mut self.stages {
stage.execute_seq(world);
}
}

/// This method returns the largest amount of threads this dispatcher
/// can make use of. This is mainly for debugging purposes so you can see
/// how well your systems can make use of multi-threading.
#[cfg(feature = "parallel")]
pub fn max_threads(&self) -> usize {
self.stages
.iter()
.map(Stage::max_threads)
.max()
.unwrap_or(0)
}
}

impl<'a, 'b> RunNow<'a> for SendDispatcher<'b> {
fn run_now(&mut self, world: &World) {
self.dispatch(world);
}

fn setup(&mut self, world: &mut World) {
self.setup(world);
}

fn dispose(self: Box<Self>, world: &mut World) {
(*self).dispose(world);
}
}

#[cfg(test)]
mod tests {
#[test]
fn send_dispatcher_is_send() {
fn is_send<T: Send>() {}
is_send::<super::SendDispatcher>();
}
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub use crate::dispatch::{Par, ParSeq, RunWithPool, Seq};
pub use crate::{
dispatch::{
BatchAccessor, BatchController, BatchUncheckedWorld, Dispatcher, DispatcherBuilder,
MultiDispatchController, MultiDispatcher,
MultiDispatchController, MultiDispatcher, SendDispatcher,
},
meta::{CastFrom, MetaIter, MetaIterMut, MetaTable},
system::{
Expand Down
2 changes: 1 addition & 1 deletion src/world/res_downcast/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Code is based on https://github.com/chris-morgan/mopa
//! Code is based on <https://github.com/chris-morgan/mopa>
//! with the macro inlined for `Resource`. License files can be found in the
//! directory of this source file, see COPYRIGHT, LICENSE-APACHE and
//! LICENSE-MIT.
Expand Down

0 comments on commit 5d52c6f

Please sign in to comment.