Skip to content

Commit

Permalink
Update futures implementation to not destroy callbacks
Browse files Browse the repository at this point in the history
JS engines guarantee that at least one of our `then` callbacks are
invoked, so that means if we destroy them prematurely they're guaranteed
to log an exception to the console! Instead to prevent exceptions from
happening tweak how the completion callbacks for JS futures are managed
and ensure that the closures stay alive until they're invoked later.

Closes rustwasm#1637
  • Loading branch information
alexcrichton committed Jul 9, 2019
1 parent 604c036 commit d32b6a9
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 71 deletions.
75 changes: 37 additions & 38 deletions crates/futures/src/futures_0_3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
use std::task::{Context, Poll};

Expand All @@ -24,10 +25,7 @@ use wasm_bindgen::prelude::*;
///
/// Currently this type is constructed with `JsFuture::from`.
pub struct JsFuture {
resolved: oneshot::Receiver<JsValue>,
rejected: oneshot::Receiver<JsValue>,
_cb_resolve: Closure<dyn FnMut(JsValue)>,
_cb_reject: Closure<dyn FnMut(JsValue)>,
rx: oneshot::Receiver<Result<JsValue, JsValue>>,
}

impl fmt::Debug for JsFuture {
Expand All @@ -38,31 +36,37 @@ impl fmt::Debug for JsFuture {

impl From<Promise> for JsFuture {
fn from(js: Promise) -> JsFuture {
// Use the `then` method to schedule two callbacks, one for the
// resolved value and one for the rejected value. These two callbacks
// will be connected to oneshot channels which feed back into our
// future.
//
// This may not be the speediest option today but it should work!
let (tx1, rx1) = oneshot::channel();

let cb_resolve = Closure::once(move |val| {
tx1.send(val).unwrap_throw();
});

let (tx2, rx2) = oneshot::channel();

let cb_reject = Closure::once(move |val| {
tx2.send(val).unwrap_throw();
});

js.then2(&cb_resolve, &cb_reject);

JsFuture {
resolved: rx1,
rejected: rx2,
_cb_resolve: cb_resolve,
_cb_reject: cb_reject,
// See comments in `src/lib.rs` for why we're using one self-contained
// callback here.
let (tx, rx) = oneshot::channel();
let state = Rc::new(RefCell::new(None));
let state2 = state.clone();
let resolve = Closure::once(move |val| finish(&state2, Ok(val)));
let state2 = state.clone();
let reject = Closure::once(move |val| finish(&state2, Err(val)));

js.then2(&resolve, &reject);
*state.borrow_mut() = Some((tx, resolve, reject));

return JsFuture { rx };

fn finish(
state: &RefCell<
Option<(
oneshot::Sender<Result<JsValue, JsValue>>,
Closure<dyn FnMut(JsValue)>,
Closure<dyn FnMut(JsValue)>,
)>,
>,
val: Result<JsValue, JsValue>,
) {
match state.borrow_mut().take() {
// We don't have any guarantee that anyone's still listening at this
// point (the Rust `JsFuture` could have been dropped) so simply
// ignore any errors here.
Some((tx, _, _)) => drop(tx.send(val)),
None => wasm_bindgen::throw_str("cannot finish twice"),
}
}
}
}
Expand All @@ -71,16 +75,11 @@ impl Future for JsFuture {
type Output = Result<JsValue, JsValue>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
// Test if either our resolved or rejected side is finished yet.
if let Poll::Ready(val) = self.resolved.poll_unpin(cx) {
return Poll::Ready(Ok(val.unwrap_throw()));
}

if let Poll::Ready(val) = self.rejected.poll_unpin(cx) {
return Poll::Ready(Err(val.unwrap_throw()));
match self.rx.poll_unpin(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(val)) => Poll::Ready(val),
Poll::Ready(Err(_)) => wasm_bindgen::throw_str("cannot cancel"),
}

Poll::Pending
}
}

Expand Down
77 changes: 44 additions & 33 deletions crates/futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,7 @@ use wasm_bindgen::prelude::*;
///
/// Currently this type is constructed with `JsFuture::from`.
pub struct JsFuture {
resolved: oneshot::Receiver<JsValue>,
rejected: oneshot::Receiver<JsValue>,
callbacks: Option<(Closure<dyn FnMut(JsValue)>, Closure<dyn FnMut(JsValue)>)>,
rx: oneshot::Receiver<Result<JsValue, JsValue>>,
}

impl fmt::Debug for JsFuture {
Expand All @@ -142,28 +140,49 @@ impl fmt::Debug for JsFuture {
impl From<Promise> for JsFuture {
fn from(js: Promise) -> JsFuture {
// Use the `then` method to schedule two callbacks, one for the
// resolved value and one for the rejected value. These two callbacks
// will be connected to oneshot channels which feed back into our
// future.
// resolved value and one for the rejected value. We're currently
// assuming that JS engines will unconditionally invoke precisely one of
// these callbacks, no matter what.
//
// This may not be the speediest option today but it should work!
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let mut tx1 = Some(tx1);
let resolve = Closure::wrap(Box::new(move |val| {
drop(tx1.take().unwrap().send(val));
}) as Box<dyn FnMut(_)>);
let mut tx2 = Some(tx2);
let reject = Closure::wrap(Box::new(move |val| {
drop(tx2.take().unwrap().send(val));
}) as Box<dyn FnMut(_)>);
// Ideally we'd have a way to cancel the callbacks getting invoked and
// free up state ourselves when this `JsFuture` is dropped. We don't
// have that, though, and one of the callbacks is likely always going to
// be invoked.
//
// As a result we need to make sure that no matter when the callbacks
// are invoked they are valid to be called at any time, which means they
// have to be self-contained. Through the `Closure::once` and some
// `Rc`-trickery we can arrange for both instances of `Closure`, and the
// `Rc`, to all be destroyed once the first one is called.
let (tx, rx) = oneshot::channel();
let state = Rc::new(RefCell::new(None));
let state2 = state.clone();
let resolve = Closure::once(move |val| finish(&state2, Ok(val)));
let state2 = state.clone();
let reject = Closure::once(move |val| finish(&state2, Err(val)));

js.then2(&resolve, &reject);
*state.borrow_mut() = Some((tx, resolve, reject));

return JsFuture { rx };

JsFuture {
resolved: rx1,
rejected: rx2,
callbacks: Some((resolve, reject)),
fn finish(
state: &RefCell<
Option<(
oneshot::Sender<Result<JsValue, JsValue>>,
Closure<dyn FnMut(JsValue)>,
Closure<dyn FnMut(JsValue)>,
)>,
>,
val: Result<JsValue, JsValue>,
) {
match state.borrow_mut().take() {
// We don't have any guarantee that anyone's still listening at this
// point (the Rust `JsFuture` could have been dropped) so simply
// ignore any errors here.
Some((tx, _, _)) => drop(tx.send(val)),
None => wasm_bindgen::throw_str("cannot finish twice"),
}
}
}
}
Expand All @@ -173,19 +192,11 @@ impl Future for JsFuture {
type Error = JsValue;

fn poll(&mut self) -> Poll<JsValue, JsValue> {
// Test if either our resolved or rejected side is finished yet. Note
// that they will return errors if they're disconnected which can't
// happen until we drop the `callbacks` field, which doesn't happen
// till we're done, so we dont need to handle that.
if let Ok(Async::Ready(val)) = self.resolved.poll() {
drop(self.callbacks.take());
return Ok(val.into());
}
if let Ok(Async::Ready(val)) = self.rejected.poll() {
drop(self.callbacks.take());
return Err(val);
match self.rx.poll() {
Ok(Async::Ready(val)) => val.map(Async::Ready),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => wasm_bindgen::throw_str("cannot cancel"),
}
Ok(Async::NotReady)
}
}

Expand Down

0 comments on commit d32b6a9

Please sign in to comment.