Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Tasks functional on WASM #13889

Merged
merged 15 commits into from
Jul 16, 2024
2 changes: 2 additions & 0 deletions crates/bevy_tasks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ concurrent-queue = { version = "2.0.0", optional = true }

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen-futures = "0.4"
pin-project = "1"
futures-channel = "0.3"

[dev-dependencies]
web-time = { version = "1.1" }
Expand Down
4 changes: 3 additions & 1 deletion crates/bevy_tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
mod slice;
pub use slice::{ParallelSlice, ParallelSliceMut};

#[cfg_attr(target_arch = "wasm32", path = "wasm_task.rs")]
mod task;

pub use task::Task;

#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
Expand All @@ -19,7 +21,7 @@ pub use task_pool::{Scope, TaskPool, TaskPoolBuilder};
#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
mod single_threaded_task_pool;
#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
pub use single_threaded_task_pool::{FakeTask, Scope, TaskPool, TaskPoolBuilder, ThreadExecutor};
pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder, ThreadExecutor};

mod usages;
#[cfg(not(target_arch = "wasm32"))]
Expand Down
34 changes: 12 additions & 22 deletions crates/bevy_tasks/src/single_threaded_task_pool.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::sync::Arc;
use std::{cell::RefCell, future::Future, marker::PhantomData, mem, rc::Rc};

use crate::Task;

thread_local! {
static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = const { async_executor::LocalExecutor::new() };
}
Expand Down Expand Up @@ -145,34 +147,33 @@ impl TaskPool {
.collect()
}

/// Spawns a static future onto the thread pool. The returned Task is a future. It can also be
/// cancelled and "detached" allowing it to continue running without having to be polled by the
/// Spawns a static future onto the thread pool. The returned Task is a future, which can be polled
/// to retrieve the output of the original future. Dropping the task will attempt to cancel it.
/// It can also be "detached", allowing it to continue running without having to be polled by the
/// end-user.
///
/// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead.
pub fn spawn<T>(&self, future: impl Future<Output = T> + 'static) -> FakeTask
pub fn spawn<T>(&self, future: impl Future<Output = T> + 'static) -> Task<T>
where
T: 'static,
{
#[cfg(target_arch = "wasm32")]
wasm_bindgen_futures::spawn_local(async move {
future.await;
});
return Task::wrap_future(future);

#[cfg(not(target_arch = "wasm32"))]
{
LOCAL_EXECUTOR.with(|executor| {
let _task = executor.spawn(future);
let task = executor.spawn(future);
// Loop until all tasks are done
while executor.try_tick() {}
});
}

FakeTask
Task::new(task)
})
}
}

/// Spawns a static future on the JS event loop. This is exactly the same as [`TaskPool::spawn`].
pub fn spawn_local<T>(&self, future: impl Future<Output = T> + 'static) -> FakeTask
pub fn spawn_local<T>(&self, future: impl Future<Output = T> + 'static) -> Task<T>
where
T: 'static,
{
Expand All @@ -198,17 +199,6 @@ impl TaskPool {
}
}

/// An empty task used in single-threaded contexts.
///
/// This does nothing and is therefore safe, and recommended, to ignore.
#[derive(Debug)]
pub struct FakeTask;

impl FakeTask {
/// No op on the single threaded task pool
pub fn detach(self) {}
}

/// A `TaskPool` scope for running one or more non-`'static` futures.
///
/// For more information, see [`TaskPool::scope`].
Expand Down
82 changes: 82 additions & 0 deletions crates/bevy_tasks/src/wasm_task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use std::{
any::Any,
future::{Future, IntoFuture},
panic::{AssertUnwindSafe, UnwindSafe},
pin::Pin,
task::Poll,
};

use futures_channel::oneshot;

/// Wraps an asynchronous task, a spawned future.
///
/// Tasks are also futures themselves and yield the output of the spawned future.
#[derive(Debug)]
pub struct Task<T>(oneshot::Receiver<Result<T, Panic>>);

impl<T: 'static> Task<T> {
pub(crate) fn wrap_future(future: impl Future<Output = T> + 'static) -> Self {
let (sender, receiver) = oneshot::channel();
wasm_bindgen_futures::spawn_local(async move {
// Catch any panics that occur when polling the future so they can
// be propagated back to the task handle.
let value = CatchUnwind(AssertUnwindSafe(future)).await;
let _ = sender.send(value);
});
Self(receiver.into_future())
}

/// When building for Wasm, this method has no effect.
/// This is only included for feature parity with other platforms.
pub fn detach(self) {}

/// Requests a task to be cancelled and returns a future that suspends until it completes.
/// Returns the output of the future if it has already completed.
///
/// # Implementation
///
/// When building for Wasm, it is not possible to cancel tasks, which means this is the same
/// as just awaiting the task. This method is only included for feature parity with other platforms.
pub async fn cancel(self) -> Option<T> {
match self.0.await {
Ok(Ok(value)) => Some(value),
Err(_) => None,
Ok(Err(panic)) => {
// drop this to prevent the panic payload from resuming the panic on drop.
// this also leaks the box but I'm not sure how to avoid that
std::mem::forget(panic);
None
}
}
}
}

impl<T> Future for Task<T> {
type Output = T;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
match Pin::new(&mut self.0).poll(cx) {
Poll::Ready(Ok(Ok(value))) => Poll::Ready(value),
// NOTE: Propagating the panic here sorta has parity with the async_executor behavior.
// For those tasks, polling them after a panic returns a `None` which gets `unwrap`ed, so
// using `resume_unwind` here is essentially keeping the same behavior while adding more information.
Poll::Ready(Ok(Err(panic))) => std::panic::resume_unwind(panic),
Poll::Ready(Err(_)) => panic!("Polled a task after it was cancelled"),
Poll::Pending => Poll::Pending,
}
}
}

type Panic = Box<dyn Any + Send + 'static>;

#[pin_project::pin_project]
struct CatchUnwind<F: UnwindSafe>(#[pin] F);

impl<F: Future + UnwindSafe> Future for CatchUnwind<F> {
type Output = Result<F::Output, Panic>;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<Self::Output> {
std::panic::catch_unwind(AssertUnwindSafe(|| self.project().0.poll(cx)))?.map(Ok)
}
}