diff --git a/crates/futures/Cargo.toml b/crates/futures/Cargo.toml index 23cd230d6b2..b07b78f1770 100644 --- a/crates/futures/Cargo.toml +++ b/crates/futures/Cargo.toml @@ -11,6 +11,7 @@ version = "0.3.25" edition = "2018" [dependencies] +cfg-if = "0.1.9" futures = "0.1.20" js-sys = { path = "../js-sys", version = '0.3.25' } wasm-bindgen = { path = "../..", version = '0.2.48' } @@ -18,6 +19,14 @@ futures-util-preview = { version = "0.3.0-alpha.15", optional = true } futures-channel-preview = { version = "0.3.0-alpha.15", optional = true } lazy_static = { version = "1.3.0", optional = true } +[target.'cfg(target_feature = "atomics")'.dependencies.web-sys] +path = "../web-sys" +version = "0.3.24" +features = [ + "MessageEvent", + "Worker", +] + [target.'cfg(target_arch = "wasm32")'.dev-dependencies] wasm-bindgen-test = { path = '../test', version = '0.2.48' } diff --git a/crates/futures/src/legacy.rs b/crates/futures/src/legacy.rs new file mode 100644 index 00000000000..a200d088c60 --- /dev/null +++ b/crates/futures/src/legacy.rs @@ -0,0 +1,204 @@ +use futures::executor::{self, Notify, Spawn}; +use futures::prelude::*; +use js_sys::{Function, Promise}; +use std::cell::{Cell, RefCell}; +use std::rc::Rc; +use std::sync::Arc; +use wasm_bindgen::prelude::*; + +/// Converts a Rust `Future` into a JavaScript `Promise`. +/// +/// This function will take any future in Rust and schedule it to be executed, +/// returning a JavaScript `Promise` which can then be passed back to JavaScript +/// to get plumbed into the rest of a system. +/// +/// The `future` provided must adhere to `'static` because it'll be scheduled +/// to run in the background and cannot contain any stack references. The +/// returned `Promise` will be resolved or rejected when the future completes, +/// depending on whether it finishes with `Ok` or `Err`. +/// +/// # Panics +/// +/// Note that in wasm panics are currently translated to aborts, but "abort" in +/// this case means that a JavaScript exception is thrown. The wasm module is +/// still usable (likely erroneously) after Rust panics. +/// +/// If the `future` provided panics then the returned `Promise` **will not +/// resolve**. Instead it will be a leaked promise. This is an unfortunate +/// limitation of wasm currently that's hoped to be fixed one day! +pub fn future_to_promise(future: F) -> Promise +where + F: Future + 'static, +{ + _future_to_promise(Box::new(future)) +} + +// Implementation of actually transforming a future into a JavaScript `Promise`. +// +// The only primitive we have to work with here is `Promise::new`, which gives +// us two callbacks that we can use to either reject or resolve the promise. +// It's our job to ensure that one of those callbacks is called at the +// appropriate time. +// +// Now we know that JavaScript (in general) can't block and is largely +// notification/callback driven. That means that our future must either have +// synchronous computational work to do, or it's "scheduled a notification" to +// happen. These notifications are likely callbacks to get executed when things +// finish (like a different promise or something like `setTimeout`). The general +// idea here is thus to do as much synchronous work as we can and then otherwise +// translate notifications of a future's task into "let's poll the future!" +// +// This isn't necessarily the greatest future executor in the world, but it +// should get the job done for now hopefully. +fn _future_to_promise(future: Box>) -> Promise { + let mut future = Some(executor::spawn(future)); + return Promise::new(&mut |resolve, reject| { + Package::poll(&Arc::new(Package { + spawn: RefCell::new(future.take().unwrap()), + resolve, + reject, + notified: Cell::new(State::Notified), + })); + }); + + struct Package { + // Our "spawned future". This'll have everything we need to poll the + // future and continue to move it forward. + spawn: RefCell>>>, + + // The current state of this future, expressed in an enum below. This + // indicates whether we're currently polling the future, received a + // notification and need to keep polling, or if we're waiting for a + // notification to come in (and no one is polling). + notified: Cell, + + // Our two callbacks connected to the `Promise` that we returned to + // JavaScript. We'll be invoking one of these at the end. + resolve: Function, + reject: Function, + } + + // The possible states our `Package` (future) can be in, tracked internally + // and used to guide what happens when polling a future. + enum State { + // This future is currently and actively being polled. Attempting to + // access the future will result in a runtime panic and is considered a + // bug. + Polling, + + // This future has been notified, while it was being polled. This marker + // is used in the `Notify` implementation below, and indicates that a + // notification was received that the future is ready to make progress. + // If seen, however, it probably means that the future is also currently + // being polled. + Notified, + + // The future is blocked, waiting for something to happen. Stored here + // is a self-reference to the future itself so we can pull it out in + // `Notify` and continue polling. + // + // Note that the self-reference here is an Arc-cycle that will leak + // memory unless the future completes, but currently that should be ok + // as we'll have to stick around anyway while the future is executing! + // + // This state is removed as soon as a notification comes in, so the leak + // should only be "temporary" + Waiting(Arc), + } + + // No shared memory right now, wasm is single threaded, no need to worry + // about this! + unsafe impl Send for Package {} + unsafe impl Sync for Package {} + + impl Package { + // Move the future contained in `me` as far forward as we can. This will + // do as much synchronous work as possible to complete the future, + // ensuring that when it blocks we're scheduled to get notified via some + // callback somewhere at some point (vague, right?) + // + // TODO: this probably shouldn't do as much synchronous work as possible + // as it can starve other computations. Rather it should instead + // yield every so often with something like `setTimeout` with the + // timeout set to zero. + fn poll(me: &Arc) { + loop { + match me.notified.replace(State::Polling) { + // We received a notification while previously polling, or + // this is the initial poll. We've got work to do below! + State::Notified => {} + + // We've gone through this loop once and no notification was + // received while we were executing work. That means we got + // `NotReady` below and we're scheduled to receive a + // notification. Block ourselves and wait for later. + // + // When the notification comes in it'll notify our task, see + // our `Waiting` state, and resume the polling process + State::Polling => { + me.notified.set(State::Waiting(me.clone())); + break; + } + + State::Waiting(_) => panic!("shouldn't see waiting state!"), + } + + let (val, f) = match me.spawn.borrow_mut().poll_future_notify(me, 0) { + // If the future is ready, immediately call the + // resolve/reject callback and then return as we're done. + Ok(Async::Ready(value)) => (value, &me.resolve), + Err(value) => (value, &me.reject), + + // Otherwise keep going in our loop, if we weren't notified + // we'll break out and start waiting. + Ok(Async::NotReady) => continue, + }; + + drop(f.call1(&JsValue::undefined(), &val)); + break; + } + } + } + + impl Notify for Package { + fn notify(&self, _id: usize) { + let me = match self.notified.replace(State::Notified) { + // we need to schedule polling to resume, so keep going + State::Waiting(me) => me, + + // we were already notified, and were just notified again; + // having now coalesced the notifications we return as it's + // still someone else's job to process this + State::Notified => return, + + // the future was previously being polled, and we've just + // switched it to the "you're notified" state. We don't have + // access to the future as it's being polled, so the future + // polling process later sees this notification and will + // continue polling. For us, though, there's nothing else to do, + // so we bail out. + // later see + State::Polling => return, + }; + + // Use `Promise.then` on a resolved promise to place our execution + // onto the next turn of the microtask queue, enqueueing our poll + // operation. We don't currently poll immediately as it turns out + // `futures` crate adapters aren't compatible with it and it also + // helps avoid blowing the stack by accident. + // + // Note that the `Rc`/`RefCell` trick here is basically to just + // ensure that our `Closure` gets cleaned up appropriately. + let promise = Promise::resolve(&JsValue::undefined()); + let slot = Rc::new(RefCell::new(None)); + let slot2 = slot.clone(); + let closure = Closure::wrap(Box::new(move |_| { + let myself = slot2.borrow_mut().take(); + debug_assert!(myself.is_some()); + Package::poll(&me); + }) as Box); + promise.then(&closure); + *slot.borrow_mut() = Some(closure); + } + } +} diff --git a/crates/futures/src/legacy_atomics.rs b/crates/futures/src/legacy_atomics.rs new file mode 100644 index 00000000000..b09e8b7ced5 --- /dev/null +++ b/crates/futures/src/legacy_atomics.rs @@ -0,0 +1,166 @@ +use futures::executor::{self, Notify, Spawn}; +use futures::prelude::*; +use js_sys::Function; +use std::sync::atomic::{AtomicI32, Ordering}; +use std::sync::Arc; +use wasm_bindgen::prelude::*; +use wasm_bindgen::JsCast; + +// Duplicate a bit here because `then` takes a `JsValue` instead of a `Closure`. +#[wasm_bindgen] +extern "C" { + type Promise; + #[wasm_bindgen(method)] + fn then(this: &Promise, cb: &JsValue) -> Promise; + + type Atomics; + #[wasm_bindgen(static_method_of = Atomics, js_name = waitAsync)] + fn wait_async(buf: &JsValue, index: i32, value: i32) -> js_sys::Promise; + #[wasm_bindgen(static_method_of = Atomics, js_name = waitAsync, getter)] + fn get_wait_async() -> JsValue; +} + +/// Converts a Rust `Future` into a JavaScript `Promise`. +/// +/// This function will take any future in Rust and schedule it to be executed, +/// returning a JavaScript `Promise` which can then be passed back to JavaScript +/// to get plumbed into the rest of a system. +/// +/// The `future` provided must adhere to `'static` because it'll be scheduled +/// to run in the background and cannot contain any stack references. The +/// returned `Promise` will be resolved or rejected when the future completes, +/// depending on whether it finishes with `Ok` or `Err`. +/// +/// # Panics +/// +/// Note that in wasm panics are currently translated to aborts, but "abort" in +/// this case means that a JavaScript exception is thrown. The wasm module is +/// still usable (likely erroneously) after Rust panics. +/// +/// If the `future` provided panics then the returned `Promise` **will not +/// resolve**. Instead it will be a leaked promise. This is an unfortunate +/// limitation of wasm currently that's hoped to be fixed one day! +pub fn future_to_promise(future: F) -> js_sys::Promise +where + F: Future + 'static, +{ + _future_to_promise(Box::new(future)) +} + +// Implementation of actually transforming a future into a JavaScript `Promise`. +// +// The main primitives used here are `Promise::new` to actually create a JS +// promise to return as well as `Atomics.waitAsync` to create a promise that we +// can asynchronously wait on. The general idea here is that we'll create a +// promise to return and schedule work to happen in `Atomics.waitAsync` +// callbacks. +// +// After we've created a promise we start polling a future, and whenever it's +// not ready we'll execute `Atomics.waitAsync`. When that resolves we'll keep +// polling the future, and this happens until the future is done. Finally +// when it's all finished we call either resolver or reject depending on the +// result of the future. +fn _future_to_promise(future: Box>) -> js_sys::Promise { + let mut future = Some(executor::spawn(future)); + return js_sys::Promise::new(&mut |resolve, reject| { + Package { + spawn: future.take().unwrap(), + resolve, + reject, + waker: Arc::new(Waker { + value: AtomicI32::new(1), // 1 == "notified, ready to poll" + }), + } + .poll(); + }); + + struct Package { + // Our "spawned future". This'll have everything we need to poll the + // future and continue to move it forward. + spawn: Spawn>>, + + // Our two callbacks connected to the `Promise` that we returned to + // JavaScript. We'll be invoking one of these at the end. + resolve: Function, + reject: Function, + + // Shared state used to communicate waking up this future, this is the + // `Send + Sync` piece needed by the async task system. + waker: Arc, + } + + struct Waker { + value: AtomicI32, + }; + + impl Notify for Waker { + fn notify(&self, _id: usize) { + // Attempt to notify us by storing 1. If we're already 1 then we + // were previously notified and there's nothing to do. Otherwise + // we execute the native `notify` instruction to wake up the + // corresponding `waitAsync` that was waiting for the transition + // from 0 to 1. + let prev = self.value.swap(1, Ordering::SeqCst); + if prev == 1 { + return; + } + debug_assert_eq!(prev, 0); + unsafe { + core::arch::wasm32::atomic_notify( + &self.value as *const AtomicI32 as *mut i32, + 1, // number of threads to notify + ); + } + } + } + + impl Package { + fn poll(mut self) { + // Poll in a loop waiting for the future to become ready. Note that + // we probably shouldn't maximize synchronous work here but rather + // we should occasionally yield back to the runtime and schedule + // ourselves to resume this future later on. + // + // Note that 0 here means "need a notification" and 1 means "we got + // a notification". That means we're storing 0 into the `notified` + // slot and we're trying to read 1 to keep on going. + while self.waker.value.swap(0, Ordering::SeqCst) == 1 { + let (val, f) = match self.spawn.poll_future_notify(&self.waker, 0) { + // If the future is ready, immediately call the + // resolve/reject callback and then return as we're done. + Ok(Async::Ready(value)) => (value, &self.resolve), + Err(value) => (value, &self.reject), + + // ... otherwise let's break out and wait + Ok(Async::NotReady) => break, + }; + + // Call the resolution function, and then when we're done + // destroy ourselves through `drop` since our future is no + // longer needed. + drop(f.call1(&JsValue::undefined(), &val)); + return; + } + + // Create a `js_sys::Promise` using `Atomics.waitAsync` (or our + // polyfill) and then register its completion callback as simply + // calling this function again. + let promise = wait_async(&self.waker.value, 0).unchecked_into::(); + let closure = Closure::once_into_js(move || { + self.poll(); + }); + promise.then(&closure); + } + } +} + +fn wait_async(ptr: &AtomicI32, val: i32) -> js_sys::Promise { + // If `Atomics.waitAsync` isn't defined (as it isn't defined anywhere today) + // then we use our fallback, otherwise we use the native function. + if Atomics::get_wait_async().is_undefined() { + crate::wait_async_polyfill::wait_async(ptr, val) + } else { + let mem = wasm_bindgen::memory().unchecked_into::(); + Atomics::wait_async(&mem.buffer(), ptr as *const AtomicI32 as i32 / 4, val) + } +} diff --git a/crates/futures/src/legacy_shared.rs b/crates/futures/src/legacy_shared.rs new file mode 100644 index 00000000000..a53e32e108e --- /dev/null +++ b/crates/futures/src/legacy_shared.rs @@ -0,0 +1,108 @@ +use std::cell::RefCell; +use futures::future; +use std::fmt; +use std::rc::Rc; +use futures::prelude::*; +use futures::sync::oneshot; +use js_sys::Promise; +use wasm_bindgen::prelude::*; + +/// A Rust `Future` backed by a JavaScript `Promise`. +/// +/// This type is constructed with a JavaScript `Promise` object and translates +/// it to a Rust `Future`. This type implements the `Future` trait from the +/// `futures` crate and will either succeed or fail depending on what happens +/// with the JavaScript `Promise`. +/// +/// Currently this type is constructed with `JsFuture::from`. +pub struct JsFuture { + rx: oneshot::Receiver>, +} + +impl fmt::Debug for JsFuture { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "JsFuture {{ ... }}") + } +} + +impl From for JsFuture { + fn from(js: Promise) -> JsFuture { + // Use the `then` method to schedule two callbacks, one for the + // resolved value and one for the rejected value. We're currently + // assuming that JS engines will unconditionally invoke precisely one of + // these callbacks, no matter what. + // + // Ideally we'd have a way to cancel the callbacks getting invoked and + // free up state ourselves when this `JsFuture` is dropped. We don't + // have that, though, and one of the callbacks is likely always going to + // be invoked. + // + // As a result we need to make sure that no matter when the callbacks + // are invoked they are valid to be called at any time, which means they + // have to be self-contained. Through the `Closure::once` and some + // `Rc`-trickery we can arrange for both instances of `Closure`, and the + // `Rc`, to all be destroyed once the first one is called. + let (tx, rx) = oneshot::channel(); + let state = Rc::new(RefCell::new(None)); + let state2 = state.clone(); + let resolve = Closure::once(move |val| finish(&state2, Ok(val))); + let state2 = state.clone(); + let reject = Closure::once(move |val| finish(&state2, Err(val))); + + js.then2(&resolve, &reject); + *state.borrow_mut() = Some((tx, resolve, reject)); + + return JsFuture { rx }; + + fn finish( + state: &RefCell< + Option<( + oneshot::Sender>, + Closure, + Closure, + )>, + >, + val: Result, + ) { + match state.borrow_mut().take() { + // We don't have any guarantee that anyone's still listening at this + // point (the Rust `JsFuture` could have been dropped) so simply + // ignore any errors here. + Some((tx, _, _)) => drop(tx.send(val)), + None => wasm_bindgen::throw_str("cannot finish twice"), + } + } + } +} + +impl Future for JsFuture { + type Item = JsValue; + type Error = JsValue; + + fn poll(&mut self) -> Poll { + match self.rx.poll() { + Ok(Async::Ready(val)) => val.map(Async::Ready), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(_) => wasm_bindgen::throw_str("cannot cancel"), + } + } +} + +/// Converts a Rust `Future` on a local task queue. +/// +/// The `future` provided must adhere to `'static` because it'll be scheduled +/// to run in the background and cannot contain any stack references. +/// +/// # Panics +/// +/// This function has the same panic behavior as `future_to_promise`. +pub fn spawn_local(future: F) +where + F: Future + 'static, +{ + crate::future_to_promise( + future + .map(|()| JsValue::undefined()) + .or_else(|()| future::ok::(JsValue::undefined())), + ); +} diff --git a/crates/futures/src/lib.rs b/crates/futures/src/lib.rs index e17eaaee30d..57da7b365d0 100644 --- a/crates/futures/src/lib.rs +++ b/crates/futures/src/lib.rs @@ -101,317 +101,34 @@ //! } //! ``` +#![cfg_attr(target_feature = "atomics", feature(stdsimd))] #![deny(missing_docs)] -#[cfg(feature = "futures_0_3")] -/// Contains a Futures 0.3 implementation of this crate. -pub mod futures_0_3; - -use std::cell::{Cell, RefCell}; -use std::fmt; -use std::rc::Rc; -use std::sync::Arc; +use cfg_if::cfg_if; -use futures::executor::{self, Notify, Spawn}; -use futures::future; -use futures::prelude::*; -use futures::sync::oneshot; -use js_sys::{Function, Promise}; -use wasm_bindgen::prelude::*; +mod legacy_shared; +pub use legacy_shared::*; -/// A Rust `Future` backed by a JavaScript `Promise`. -/// -/// This type is constructed with a JavaScript `Promise` object and translates -/// it to a Rust `Future`. This type implements the `Future` trait from the -/// `futures` crate and will either succeed or fail depending on what happens -/// with the JavaScript `Promise`. -/// -/// Currently this type is constructed with `JsFuture::from`. -pub struct JsFuture { - rx: oneshot::Receiver>, -} +cfg_if! { + if #[cfg(target_feature = "atomics")] { + /// Contains a thread-safe version of this crate, with Futures 0.1 + mod legacy_atomics; + pub use legacy_atomics::*; -impl fmt::Debug for JsFuture { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "JsFuture {{ ... }}") - } + /// Polyfill for `Atomics.waitAsync` function + mod wait_async_polyfill; + } else { + mod legacy; + pub use legacy::*; + } } -impl From for JsFuture { - fn from(js: Promise) -> JsFuture { - // Use the `then` method to schedule two callbacks, one for the - // resolved value and one for the rejected value. We're currently - // assuming that JS engines will unconditionally invoke precisely one of - // these callbacks, no matter what. - // - // Ideally we'd have a way to cancel the callbacks getting invoked and - // free up state ourselves when this `JsFuture` is dropped. We don't - // have that, though, and one of the callbacks is likely always going to - // be invoked. - // - // As a result we need to make sure that no matter when the callbacks - // are invoked they are valid to be called at any time, which means they - // have to be self-contained. Through the `Closure::once` and some - // `Rc`-trickery we can arrange for both instances of `Closure`, and the - // `Rc`, to all be destroyed once the first one is called. - let (tx, rx) = oneshot::channel(); - let state = Rc::new(RefCell::new(None)); - let state2 = state.clone(); - let resolve = Closure::once(move |val| finish(&state2, Ok(val))); - let state2 = state.clone(); - let reject = Closure::once(move |val| finish(&state2, Err(val))); - - js.then2(&resolve, &reject); - *state.borrow_mut() = Some((tx, resolve, reject)); - - return JsFuture { rx }; - - fn finish( - state: &RefCell< - Option<( - oneshot::Sender>, - Closure, - Closure, - )>, - >, - val: Result, - ) { - match state.borrow_mut().take() { - // We don't have any guarantee that anyone's still listening at this - // point (the Rust `JsFuture` could have been dropped) so simply - // ignore any errors here. - Some((tx, _, _)) => drop(tx.send(val)), - None => wasm_bindgen::throw_str("cannot finish twice"), - } - } - } -} - -impl Future for JsFuture { - type Item = JsValue; - type Error = JsValue; - - fn poll(&mut self) -> Poll { - match self.rx.poll() { - Ok(Async::Ready(val)) => val.map(Async::Ready), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(_) => wasm_bindgen::throw_str("cannot cancel"), - } - } -} - -/// Converts a Rust `Future` into a JavaScript `Promise`. -/// -/// This function will take any future in Rust and schedule it to be executed, -/// returning a JavaScript `Promise` which can then be passed back to JavaScript -/// to get plumbed into the rest of a system. -/// -/// The `future` provided must adhere to `'static` because it'll be scheduled -/// to run in the background and cannot contain any stack references. The -/// returned `Promise` will be resolved or rejected when the future completes, -/// depending on whether it finishes with `Ok` or `Err`. -/// -/// # Panics -/// -/// Note that in wasm panics are currently translated to aborts, but "abort" in -/// this case means that a JavaScript exception is thrown. The wasm module is -/// still usable (likely erroneously) after Rust panics. -/// -/// If the `future` provided panics then the returned `Promise` **will not -/// resolve**. Instead it will be a leaked promise. This is an unfortunate -/// limitation of wasm currently that's hoped to be fixed one day! -pub fn future_to_promise(future: F) -> Promise -where - F: Future + 'static, -{ - _future_to_promise(Box::new(future)) -} - -// Implementation of actually transforming a future into a JavaScript `Promise`. -// -// The only primitive we have to work with here is `Promise::new`, which gives -// us two callbacks that we can use to either reject or resolve the promise. -// It's our job to ensure that one of those callbacks is called at the -// appropriate time. -// -// Now we know that JavaScript (in general) can't block and is largely -// notification/callback driven. That means that our future must either have -// synchronous computational work to do, or it's "scheduled a notification" to -// happen. These notifications are likely callbacks to get executed when things -// finish (like a different promise or something like `setTimeout`). The general -// idea here is thus to do as much synchronous work as we can and then otherwise -// translate notifications of a future's task into "let's poll the future!" -// -// This isn't necessarily the greatest future executor in the world, but it -// should get the job done for now hopefully. -fn _future_to_promise(future: Box>) -> Promise { - let mut future = Some(executor::spawn(future)); - return Promise::new(&mut |resolve, reject| { - Package::poll(&Arc::new(Package { - spawn: RefCell::new(future.take().unwrap()), - resolve, - reject, - notified: Cell::new(State::Notified), - })); - }); - - struct Package { - // Our "spawned future". This'll have everything we need to poll the - // future and continue to move it forward. - spawn: RefCell>>>, - - // The current state of this future, expressed in an enum below. This - // indicates whether we're currently polling the future, received a - // notification and need to keep polling, or if we're waiting for a - // notification to come in (and no one is polling). - notified: Cell, - - // Our two callbacks connected to the `Promise` that we returned to - // JavaScript. We'll be invoking one of these at the end. - resolve: Function, - reject: Function, - } - - // The possible states our `Package` (future) can be in, tracked internally - // and used to guide what happens when polling a future. - enum State { - // This future is currently and actively being polled. Attempting to - // access the future will result in a runtime panic and is considered a - // bug. - Polling, - - // This future has been notified, while it was being polled. This marker - // is used in the `Notify` implementation below, and indicates that a - // notification was received that the future is ready to make progress. - // If seen, however, it probably means that the future is also currently - // being polled. - Notified, - - // The future is blocked, waiting for something to happen. Stored here - // is a self-reference to the future itself so we can pull it out in - // `Notify` and continue polling. - // - // Note that the self-reference here is an Arc-cycle that will leak - // memory unless the future completes, but currently that should be ok - // as we'll have to stick around anyway while the future is executing! - // - // This state is removed as soon as a notification comes in, so the leak - // should only be "temporary" - Waiting(Arc), - } - - // No shared memory right now, wasm is single threaded, no need to worry - // about this! - unsafe impl Send for Package {} - unsafe impl Sync for Package {} - - impl Package { - // Move the future contained in `me` as far forward as we can. This will - // do as much synchronous work as possible to complete the future, - // ensuring that when it blocks we're scheduled to get notified via some - // callback somewhere at some point (vague, right?) - // - // TODO: this probably shouldn't do as much synchronous work as possible - // as it can starve other computations. Rather it should instead - // yield every so often with something like `setTimeout` with the - // timeout set to zero. - fn poll(me: &Arc) { - loop { - match me.notified.replace(State::Polling) { - // We received a notification while previously polling, or - // this is the initial poll. We've got work to do below! - State::Notified => {} - - // We've gone through this loop once and no notification was - // received while we were executing work. That means we got - // `NotReady` below and we're scheduled to receive a - // notification. Block ourselves and wait for later. - // - // When the notification comes in it'll notify our task, see - // our `Waiting` state, and resume the polling process - State::Polling => { - me.notified.set(State::Waiting(me.clone())); - break; - } - - State::Waiting(_) => panic!("shouldn't see waiting state!"), - } - - let (val, f) = match me.spawn.borrow_mut().poll_future_notify(me, 0) { - // If the future is ready, immediately call the - // resolve/reject callback and then return as we're done. - Ok(Async::Ready(value)) => (value, &me.resolve), - Err(value) => (value, &me.reject), - - // Otherwise keep going in our loop, if we weren't notified - // we'll break out and start waiting. - Ok(Async::NotReady) => continue, - }; - - drop(f.call1(&JsValue::undefined(), &val)); - break; - } - } - } - - impl Notify for Package { - fn notify(&self, _id: usize) { - let me = match self.notified.replace(State::Notified) { - // we need to schedule polling to resume, so keep going - State::Waiting(me) => me, - - // we were already notified, and were just notified again; - // having now coalesced the notifications we return as it's - // still someone else's job to process this - State::Notified => return, - - // the future was previously being polled, and we've just - // switched it to the "you're notified" state. We don't have - // access to the future as it's being polled, so the future - // polling process later sees this notification and will - // continue polling. For us, though, there's nothing else to do, - // so we bail out. - // later see - State::Polling => return, - }; - - // Use `Promise.then` on a resolved promise to place our execution - // onto the next turn of the microtask queue, enqueueing our poll - // operation. We don't currently poll immediately as it turns out - // `futures` crate adapters aren't compatible with it and it also - // helps avoid blowing the stack by accident. - // - // Note that the `Rc`/`RefCell` trick here is basically to just - // ensure that our `Closure` gets cleaned up appropriately. - let promise = Promise::resolve(&JsValue::undefined()); - let slot = Rc::new(RefCell::new(None)); - let slot2 = slot.clone(); - let closure = Closure::wrap(Box::new(move |_| { - let myself = slot2.borrow_mut().take(); - debug_assert!(myself.is_some()); - Package::poll(&me); - }) as Box); - promise.then(&closure); - *slot.borrow_mut() = Some(closure); - } - } -} - -/// Converts a Rust `Future` on a local task queue. -/// -/// The `future` provided must adhere to `'static` because it'll be scheduled -/// to run in the background and cannot contain any stack references. -/// -/// # Panics -/// -/// This function has the same panic behavior as `future_to_promise`. -pub fn spawn_local(future: F) -where - F: Future + 'static, -{ - future_to_promise( - future - .map(|()| JsValue::undefined()) - .or_else(|()| future::ok::(JsValue::undefined())), - ); +#[cfg(feature = "futures_0_3")] +cfg_if! { + if #[cfg(target_feature = "atomics")] { + compile_error!("futures 0.3 support is not implemented with atomics yet"); + } else { + /// Contains a Futures 0.3 implementation of this crate. + pub mod futures_0_3; + } } diff --git a/crates/futures/src/wait_async_polyfill.rs b/crates/futures/src/wait_async_polyfill.rs new file mode 100644 index 00000000000..14b8f0eadf5 --- /dev/null +++ b/crates/futures/src/wait_async_polyfill.rs @@ -0,0 +1,104 @@ +//! +//! The polyfill was kindly borrowed from https://github.com/tc39/proposal-atomics-wait-async +//! and ported to Rust +//! + +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * Author: Lars T Hansen, lhansen@mozilla.com + */ + +/* Polyfill for Atomics.waitAsync() for web browsers. + * + * Any kind of agent that is able to create a new Worker can use this polyfill. + * + * Load this file in all agents that will use Atomics.waitAsync. + * + * Agents that don't call Atomics.waitAsync need do nothing special. + * + * Any kind of agent can wake another agent that is sleeping in + * Atomics.waitAsync by just calling Atomics.wake for the location being slept + * on, as normal. + * + * The implementation is not completely faithful to the proposed semantics: in + * the case where an agent first asyncWaits and then waits on the same location: + * when it is woken, the two waits will be woken in order, while in the real + * semantics, the sync wait will be woken first. + * + * In this polyfill Atomics.waitAsync is not very fast. + */ + +/* Implementation: + * + * For every wait we fork off a Worker to perform the wait. Workers are reused + * when possible. The worker communicates with its parent using postMessage. + */ + +use js_sys::{encode_uri_component, Array, Promise}; +use std::cell::RefCell; +use std::sync::atomic::AtomicI32; +use wasm_bindgen::prelude::*; +use wasm_bindgen::JsCast; +use web_sys::{MessageEvent, Worker}; + +const HELPER_CODE: &'static str = " +onmessage = function (ev) { + let [ia, index, value] = ev.data; + ia = new Int32Array(ia.buffer); + let result = Atomics.wait(ia, index, value); + postMessage(result); +}; +"; + +thread_local! { + static HELPERS: RefCell> = RefCell::new(vec![]); +} + +fn alloc_helper() -> Worker { + HELPERS.with(|helpers| { + if let Some(helper) = helpers.borrow_mut().pop() { + return helper; + } + + let mut initialization_string = "data:application/javascript,".to_owned(); + let encoded: String = encode_uri_component(HELPER_CODE).into(); + initialization_string.push_str(&encoded); + + Worker::new(&initialization_string).unwrap_or_else(|js| wasm_bindgen::throw_val(js)) + }) +} + +fn free_helper(helper: Worker) { + HELPERS.with(move |helpers| { + let mut helpers = helpers.borrow_mut(); + helpers.push(helper.clone()); + helpers.truncate(10); // random arbitrary limit chosen here + }); +} + +pub fn wait_async(ptr: &AtomicI32, value: i32) -> Promise { + Promise::new(&mut |resolve, _reject| { + let helper = alloc_helper(); + let helper_ref = helper.clone(); + + let onmessage_callback = Closure::once_into_js(move |e: MessageEvent| { + // Our helper is done waiting so it's available to wait on a + // different location, so return it to the free list. + free_helper(helper_ref); + drop(resolve.call1(&JsValue::NULL, &e.data())); + }); + helper.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref())); + + let data = Array::of3( + &wasm_bindgen::memory(), + &JsValue::from(ptr as *const AtomicI32 as i32 / 4), + &JsValue::from(value), + ); + + helper + .post_message(&data) + .unwrap_or_else(|js| wasm_bindgen::throw_val(js)); + }) +} diff --git a/crates/futures/tests/tests.rs b/crates/futures/tests/tests.rs index 07a3a04a557..3fa5cda4df4 100755 --- a/crates/futures/tests/tests.rs +++ b/crates/futures/tests/tests.rs @@ -6,6 +6,8 @@ extern crate wasm_bindgen; extern crate wasm_bindgen_futures; extern crate wasm_bindgen_test; +wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); + use futures::unsync::oneshot; use futures::Future; use wasm_bindgen::prelude::*; diff --git a/crates/js-sys/src/lib.rs b/crates/js-sys/src/lib.rs index b3e92161821..83c27c22783 100644 --- a/crates/js-sys/src/lib.rs +++ b/crates/js-sys/src/lib.rs @@ -598,10 +598,19 @@ pub mod Atomics { /// The static `Atomics.notify()` method notifies up some agents that /// are sleeping in the wait queue. /// Note: This operation works with a shared `Int32Array` only. + /// If `count` is not provided, notifies all the agents in the queue. /// /// [MDN documentation](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Atomics/notify) #[wasm_bindgen(js_namespace = Atomics, catch)] - pub fn notify(typed_array: &Int32Array, index: u32, count: u32) -> Result; + pub fn notify(typed_array: &Int32Array, index: u32) -> Result; + + /// Notifies up to `count` agents in the wait queue. + #[wasm_bindgen(js_namespace = Atomics, catch, js_name = notify)] + pub fn notify_with_count( + typed_array: &Int32Array, + index: u32, + count: u32, + ) -> Result; /// The static `Atomics.or()` method computes a bitwise OR with a given value /// at a given position in the array, and returns the old value at that position. diff --git a/examples/raytrace-parallel/Cargo.toml b/examples/raytrace-parallel/Cargo.toml index 0a6dabae308..2521574db79 100644 --- a/examples/raytrace-parallel/Cargo.toml +++ b/examples/raytrace-parallel/Cargo.toml @@ -18,7 +18,7 @@ wasm-bindgen = { version = "0.2.48", features = ['serde-serialize'] } wasm-bindgen-futures = "0.3.25" [dependencies.web-sys] -version = "0.3.4" +version = "0.3.23" features = [ 'CanvasRenderingContext2d', 'ErrorEvent', diff --git a/examples/raytrace-parallel/src/lib.rs b/examples/raytrace-parallel/src/lib.rs index 8f99ed9236c..cfc05d61e19 100644 --- a/examples/raytrace-parallel/src/lib.rs +++ b/examples/raytrace-parallel/src/lib.rs @@ -1,3 +1,4 @@ +use futures::sync::oneshot; use futures::Future; use js_sys::{Promise, Uint8ClampedArray, WebAssembly}; use rayon::prelude::*; @@ -69,27 +70,28 @@ impl Scene { // threads so we don't lock up the main thread, so we ship off a thread // which actually does the whole rayon business. When our returned // future is resolved we can pull out the final version of the image. - let done = pool - .run_notify(move || { - thread_pool.install(|| { - rgb_data - .par_chunks_mut(4) - .enumerate() - .for_each(|(i, chunk)| { - let i = i as u32; - let x = i % width; - let y = i / width; - let ray = raytracer::Ray::create_prime(x, y, &scene); - let result = raytracer::cast_ray(&scene, &ray, 0).to_rgba(); - chunk[0] = result.data[0]; - chunk[1] = result.data[1]; - chunk[2] = result.data[2]; - chunk[3] = result.data[3]; - }); - }); + let (tx, rx) = oneshot::channel(); + pool.run(move || { + thread_pool.install(|| { rgb_data - })? - .map(move |_data| image_data(base, len, width, height).into()); + .par_chunks_mut(4) + .enumerate() + .for_each(|(i, chunk)| { + let i = i as u32; + let x = i % width; + let y = i / width; + let ray = raytracer::Ray::create_prime(x, y, &scene); + let result = raytracer::cast_ray(&scene, &ray, 0).to_rgba(); + chunk[0] = result.data[0]; + chunk[1] = result.data[1]; + chunk[2] = result.data[2]; + chunk[3] = result.data[3]; + }); + }); + drop(tx.send(rgb_data)); + })?; + let done = rx.map(move |_data| image_data(base, len, width, height).into()) + .map_err(|_| JsValue::undefined()); Ok(RenderingScene { promise: wasm_bindgen_futures::future_to_promise(done), diff --git a/examples/raytrace-parallel/src/pool.rs b/examples/raytrace-parallel/src/pool.rs index 921d3c1611f..b5cd4dd7994 100644 --- a/examples/raytrace-parallel/src/pool.rs +++ b/examples/raytrace-parallel/src/pool.rs @@ -1,13 +1,8 @@ //! A small module that's intended to provide an example of creating a pool of //! web workers which can be used to execute `rayon`-style work. -use futures::sync::oneshot; -use futures::Future; -use std::cell::{RefCell, UnsafeCell}; -use std::mem; +use std::cell::RefCell; use std::rc::Rc; -use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; -use std::sync::Arc; use wasm_bindgen::prelude::*; use wasm_bindgen::JsCast; use web_sys::{DedicatedWorkerGlobalScope, MessageEvent}; @@ -141,12 +136,11 @@ impl WorkerPool { /// whatn it's done the worker is ready to execute more work. This method is /// used for all spawned workers to ensure that when the work is finished /// the worker is reclaimed back into this pool. - fn reclaim_on_message(&self, worker: Worker, on_finish: impl FnOnce() + 'static) { + fn reclaim_on_message(&self, worker: Worker) { let state = Rc::downgrade(&self.state); let worker2 = worker.clone(); let reclaim_slot = Rc::new(RefCell::new(None)); let slot2 = reclaim_slot.clone(); - let mut on_finish = Some(on_finish); let reclaim = Closure::wrap(Box::new(move |event: Event| { if let Some(error) = event.dyn_ref::() { console_log!("error in worker: {}", error.message()); @@ -155,11 +149,9 @@ impl WorkerPool { return; } - // If this is a completion event then we can execute our `on_finish` - // callback and we can also deallocate our own callback by clearing - // out `slot2` which contains our own closure. + // If this is a completion event then can deallocate our own + // callback by clearing out `slot2` which contains our own closure. if let Some(_msg) = event.dyn_ref::() { - on_finish.take().unwrap()(); if let Some(state) = state.upgrade() { state.push(worker2.clone()); } @@ -193,80 +185,9 @@ impl WorkerPool { /// a web worker, that error is returned. pub fn run(&self, f: impl FnOnce() + Send + 'static) -> Result<(), JsValue> { let worker = self.execute(f)?; - self.reclaim_on_message(worker, || {}); + self.reclaim_on_message(worker); Ok(()) } - - /// Executes the closure `f` in a web worker, returning a future of the - /// value that `f` produces. - /// - /// This method is the same as `run` execept that it allows recovering the - /// return value of the closure `f` in a nonblocking fashion with the future - /// returned. - /// - /// # Errors - /// - /// If an error happens while spawning a web worker or sending a message to - /// a web worker, that error is returned. - pub fn run_notify( - &self, - f: impl FnOnce() -> T + Send + 'static, - ) -> Result + 'static, JsValue> - where - T: Send + 'static, - { - // FIXME(#1379) we should just use the `oneshot` directly as the future, - // but we have to use JS callbacks to ensure we don't have futures cross - // threads as that's currently not safe to do so. - let (tx, rx) = oneshot::channel(); - let storage = Arc::new(AtomicValue::new(None)); - let storage2 = storage.clone(); - let worker = self.execute(move || { - assert!(storage2.replace(Some(f())).is_ok()); - })?; - self.reclaim_on_message(worker, move || match storage.replace(None) { - Ok(Some(val)) => drop(tx.send(val)), - _ => unreachable!(), - }); - - Ok(rx.map_err(|_| JsValue::undefined())) - } -} - -/// A small helper struct representing atomic access to an internal value `T` -/// -/// This struct only supports one API, `replace`, which will either succeed and -/// replace the internal value with another (returning the previous one), or it -/// will fail returning the value passed in. Failure happens when two threads -/// try to `replace` at the same time. -/// -/// This is only really intended to help safely transfer information between -/// threads, it doesn't provide any synchronization capabilities itself other -/// than a guaranteed safe API. -struct AtomicValue { - modifying: AtomicBool, - slot: UnsafeCell, -} - -unsafe impl Send for AtomicValue {} -unsafe impl Sync for AtomicValue {} - -impl AtomicValue { - fn new(val: T) -> AtomicValue { - AtomicValue { - modifying: AtomicBool::new(false), - slot: UnsafeCell::new(val), - } - } - - fn replace(&self, val: T) -> Result { - if self.modifying.swap(true, SeqCst) { - return Err(val); - } - let ret = unsafe { mem::replace(&mut *self.slot.get(), val) }; - self.modifying.store(false, SeqCst); - Ok(ret) - } } impl PoolState {