Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delayed queuing of spawn_local tasks #2854

Merged
merged 6 commits into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ diff = "0.1"
predicates = "1.0.0"
rayon = "1.0"
tempfile = "3.0"
wasmprinter = "0.2, <=0.2.33" # pinned for wit-printer
wit-printer = "0.2"
wit-text = "0.8"
wit-validator = "0.2"
Expand Down
44 changes: 23 additions & 21 deletions crates/futures/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,27 @@ use std::rc::Rc;
use wasm_bindgen::prelude::*;

struct QueueState {
// The queue of Tasks which will be run in order. In practice this is all the
// The queue of Tasks which are to be run in order. In practice this is all the
// synchronous work of futures, and each `Task` represents calling `poll` on
// a future "at the right time"
// a future "at the right time".
tasks: RefCell<VecDeque<Rc<crate::task::Task>>>,

// This flag indicates whether we're currently executing inside of
// `run_all` or have scheduled `run_all` to run in the future. This is
// used to ensure that it's only scheduled once.
is_spinning: Cell<bool>,
// This flag indicates whether we've scheduled `run_all` to run in the future.
// This is used to ensure that it's only scheduled once.
is_scheduled: Cell<bool>,
}

impl QueueState {
fn run_all(&self) {
debug_assert!(self.is_spinning.get());
// "consume" the schedule
let _was_scheduled = self.is_scheduled.replace(false);
debug_assert!(_was_scheduled);

// Runs all Tasks until empty. This blocks the event loop if a Future is
// stuck in an infinite loop, so we may want to yield back to the main
// event loop occasionally. For now though greedy execution should get
// the job done.
loop {
// Stop when all tasks that have been scheduled before this tick have been run.
// Tasks that are scheduled while running tasks will run on the next tick.
let mut task_count_left = self.tasks.borrow().len();
while task_count_left > 0 {
task_count_left -= 1;
let task = match self.tasks.borrow_mut().pop_front() {
Some(task) => task,
None => break,
Expand All @@ -34,7 +35,6 @@ impl QueueState {

// All of the Tasks have been run, so it's now possible to schedule the
// next tick again
self.is_spinning.set(false);
}
}

Expand All @@ -45,26 +45,28 @@ pub(crate) struct Queue {
}

impl Queue {
pub(crate) fn push_task(&self, task: Rc<crate::task::Task>) {
// Schedule a task to run on the next tick
pub(crate) fn schedule_task(&self, task: Rc<crate::task::Task>) {
self.state.tasks.borrow_mut().push_back(task);

// If we're already inside the `run_all` loop then that'll pick up the
// task we just enqueued. If we're not in `run_all`, though, then we need
// to schedule a microtask.
//
// Note that we currently use a promise and a closure to do this, but
// eventually we should probably use something like `queueMicrotask`:
// https://developer.mozilla.org/en-US/docs/Web/API/WindowOrWorkerGlobalScope/queueMicrotask
if !self.state.is_spinning.replace(true) {
if !self.state.is_scheduled.replace(true) {
let _ = self.promise.then(&self.closure);
}
}
// Append a task to the currently running queue, or schedule it
pub(crate) fn push_task(&self, task: Rc<crate::task::Task>) {
// It would make sense to run this task on the same tick. For now, we
// make the simplifying choice of always scheduling tasks for a future tick.
self.schedule_task(task)
}
}

impl Queue {
fn new() -> Self {
let state = Rc::new(QueueState {
is_spinning: Cell::new(false),
is_scheduled: Cell::new(false),
tasks: RefCell::new(VecDeque::new()),
});

Expand Down
2 changes: 1 addition & 1 deletion crates/futures/src/task/multithread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl Task {
*this.inner.borrow_mut() = Some(Inner { future, closure });

// Queue up the Future's work to happen on the next microtask tick.
crate::queue::QUEUE.with(move |queue| queue.push_task(this));
crate::queue::QUEUE.with(move |queue| queue.schedule_task(this));
}

pub(crate) fn run(&self) {
Expand Down
4 changes: 2 additions & 2 deletions crates/futures/src/task/singlethread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ impl Task {
pub(crate) fn spawn(future: Pin<Box<dyn Future<Output = ()> + 'static>>) {
let this = Rc::new(Self {
inner: RefCell::new(None),
is_queued: Cell::new(false),
is_queued: Cell::new(true),
});

let waker = unsafe { Waker::from_raw(Task::into_raw_waker(Rc::clone(&this))) };

*this.inner.borrow_mut() = Some(Inner { future, waker });

Task::wake_by_ref(&this);
crate::queue::QUEUE.with(|queue| queue.schedule_task(this));
}

fn wake_by_ref(this: &Rc<Self>) {
Expand Down
47 changes: 46 additions & 1 deletion crates/futures/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);

use futures_channel::oneshot;
use wasm_bindgen::prelude::*;
use js_sys::Promise;
use std::ops::FnMut;
use wasm_bindgen::{prelude::*, JsValue};
use wasm_bindgen_futures::{future_to_promise, spawn_local, JsFuture};
use wasm_bindgen_test::*;

Expand Down Expand Up @@ -68,6 +70,49 @@ async fn spawn_local_runs() {
assert_eq!(rx.await.unwrap(), 42);
}

#[wasm_bindgen_test]
async fn spawn_local_nested() {
let (ta, mut ra) = oneshot::channel::<u32>();
let (ts, rs) = oneshot::channel::<u32>();
let (tx, rx) = oneshot::channel::<u32>();
// The order in which the various promises and tasks run is important!
// We want, on different ticks each, the following things to happen
// 1. A promise resolves, off of which we can spawn our inbetween assertion
// 2. The outer task runs, spawns in the inner task, and the inbetween promise, then yields
// 3. The inbetween promise runs and asserts that the inner task hasn't run
// 4. The inner task runs
// This depends crucially on two facts:
// - JsFuture schedules on ticks independently from tasks
// - The order of ticks is the same as the code flow
let promise = Promise::resolve(&JsValue::null());

spawn_local(async move {
// Create a closure that runs in between the two ticks and
// assert that the inner task hasn't run yet
let inbetween = Closure::wrap(Box::new(move |_| {
assert_eq!(
ra.try_recv().unwrap(),
None,
"Nested task should not have run yet"
);
}) as Box<dyn FnMut(JsValue)>);
let inbetween = promise.then(&inbetween);
spawn_local(async {
ta.send(0xdead).unwrap();
ts.send(0xbeaf).unwrap();
});
JsFuture::from(inbetween).await.unwrap();
assert_eq!(
rs.await.unwrap(),
0xbeaf,
"Nested task should run eventually"
);
tx.send(42).unwrap();
});

assert_eq!(rx.await.unwrap(), 42);
}

#[wasm_bindgen_test]
async fn spawn_local_err_no_exception() {
let (tx, rx) = oneshot::channel::<u32>();
Expand Down