Skip to content

Commit

Permalink
tick local executor (bevyengine#6121)
Browse files Browse the repository at this point in the history
# Objective

- bevyengine#4466 broke local tasks running.
- Fixes bevyengine#6120

## Solution

- Add system for ticking local executors on main thread into bevy_core where the tasks pools are initialized.
- Add ticking local executors into thread executors

## Changelog

- tick all thread local executors in task pool.

## Notes

- ~~Not 100% sure about this PR. Ticking the local executor for the main thread in scope feels a little kludgy as it requires users of bevy_tasks to be calling scope periodically for those tasks to make progress.~~ took this out in favor of a system that ticks the local executors.
  • Loading branch information
hymm authored and ItsDoot committed Feb 1, 2023
1 parent 342be38 commit 0e8310c
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 4 deletions.
3 changes: 3 additions & 0 deletions crates/bevy_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ bevy_utils = { path = "../bevy_utils", version = "0.9.0-dev" }

# other
bytemuck = "1.5"

[dev-dependencies]
crossbeam-channel = "0.5.0"
51 changes: 51 additions & 0 deletions crates/bevy_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ use bevy_utils::{Duration, HashSet, Instant};
use std::borrow::Cow;
use std::ops::Range;

#[cfg(not(target_arch = "wasm32"))]
use bevy_ecs::schedule::IntoSystemDescriptor;
#[cfg(not(target_arch = "wasm32"))]
use bevy_tasks::tick_global_task_pools_on_main_thread;

/// Adds core functionality to Apps.
#[derive(Default)]
pub struct CorePlugin;
Expand All @@ -35,6 +40,13 @@ impl Plugin for CorePlugin {
.unwrap_or_default()
.create_default_pools();

#[cfg(not(target_arch = "wasm32"))]
app.add_system_to_stage(
bevy_app::CoreStage::Last,
tick_global_task_pools_on_main_thread.at_end(),
);

app.register_type::<Entity>().register_type::<Name>();
app.register_type::<Entity>()
.register_type::<Name>()
.register_type::<Range<f32>>()
Expand Down Expand Up @@ -97,3 +109,42 @@ fn register_math_types(app: &mut App) {
/// Wraps to 0 when it reaches the maximum u32 value
#[derive(Default, Resource, Clone, Copy)]
pub struct FrameCount(pub u32);

#[cfg(test)]
mod tests {
use super::*;
use bevy_tasks::prelude::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool};

#[test]
fn runs_spawn_local_tasks() {
let mut app = App::new();
app.add_plugin(CorePlugin);

let (async_tx, async_rx) = crossbeam_channel::unbounded();
AsyncComputeTaskPool::get()
.spawn_local(async move {
async_tx.send(()).unwrap();
})
.detach();

let (compute_tx, compute_rx) = crossbeam_channel::unbounded();
ComputeTaskPool::get()
.spawn_local(async move {
compute_tx.send(()).unwrap();
})
.detach();

let (io_tx, io_rx) = crossbeam_channel::unbounded();
IoTaskPool::get()
.spawn_local(async move {
io_tx.send(()).unwrap();
})
.detach();

app.run();

async_rx.try_recv().unwrap();
compute_rx.try_recv().unwrap();
io_rx.try_recv().unwrap();
}
}
2 changes: 2 additions & 0 deletions crates/bevy_tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ mod single_threaded_task_pool;
pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder};

mod usages;
#[cfg(not(target_arch = "wasm32"))]
pub use usages::tick_global_task_pools_on_main_thread;
pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool};

mod iter;
Expand Down
33 changes: 29 additions & 4 deletions crates/bevy_tasks/src/task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
};

use concurrent_queue::ConcurrentQueue;
use futures_lite::{future, pin};
use futures_lite::{future, pin, FutureExt};

use crate::Task;

Expand Down Expand Up @@ -117,9 +117,16 @@ impl TaskPool {

thread_builder
.spawn(move || {
let shutdown_future = ex.run(shutdown_rx.recv());
// Use unwrap_err because we expect a Closed error
future::block_on(shutdown_future).unwrap_err();
TaskPool::LOCAL_EXECUTOR.with(|local_executor| {
let tick_forever = async move {
loop {
local_executor.tick().await;
}
};
let shutdown_future = ex.run(tick_forever.or(shutdown_rx.recv()));
// Use unwrap_err because we expect a Closed error
future::block_on(shutdown_future).unwrap_err();
});
})
.expect("Failed to spawn thread.")
})
Expand Down Expand Up @@ -314,6 +321,24 @@ impl TaskPool {
{
Task::new(TaskPool::LOCAL_EXECUTOR.with(|executor| executor.spawn(future)))
}

/// Runs a function with the local executor. Typically used to tick
/// the local executor on the main thread as it needs to share time with
/// other things.
///
/// ```rust
/// use bevy_tasks::TaskPool;
///
/// TaskPool::new().with_local_executor(|local_executor| {
/// local_executor.try_tick();
/// });
/// ```
pub fn with_local_executor<F, R>(&self, f: F) -> R
where
F: FnOnce(&async_executor::LocalExecutor) -> R,
{
Self::LOCAL_EXECUTOR.with(f)
}
}

impl Default for TaskPool {
Expand Down
26 changes: 26 additions & 0 deletions crates/bevy_tasks/src/usages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,29 @@ impl Deref for IoTaskPool {
&self.0
}
}

/// Used by `bevy_core` to tick the global tasks pools on the main thread.
/// This will run a maximum of 100 local tasks per executor per call to this function.
#[cfg(not(target_arch = "wasm32"))]
pub fn tick_global_task_pools_on_main_thread() {
COMPUTE_TASK_POOL
.get()
.unwrap()
.with_local_executor(|compute_local_executor| {
ASYNC_COMPUTE_TASK_POOL
.get()
.unwrap()
.with_local_executor(|async_local_executor| {
IO_TASK_POOL
.get()
.unwrap()
.with_local_executor(|io_local_executor| {
for _ in 0..100 {
compute_local_executor.try_tick();
async_local_executor.try_tick();
io_local_executor.try_tick();
}
});
});
});
}

0 comments on commit 0e8310c

Please sign in to comment.