Skip to content

Commit

Permalink
feat(neon): Implement Futures feature
Browse files Browse the repository at this point in the history
  • Loading branch information
kjvalencik committed Jun 3, 2022
1 parent 7272922 commit 957a23b
Show file tree
Hide file tree
Showing 13 changed files with 404 additions and 34 deletions.
8 changes: 4 additions & 4 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[alias]
# Neon defines mutually exclusive feature flags which prevents using `cargo clippy --all-features`
# The following aliases simplify linting the entire workspace
neon-check = " check --all --all-targets --features napi-experimental"
neon-clippy = "clippy --all --all-targets --features napi-experimental -- -A clippy::missing_safety_doc"
neon-test = " test --all --features=napi-experimental"
neon-doc = " rustdoc -p neon --features=napi-experimental -- --cfg docsrs"
neon-check = " check --all --all-targets --features napi-experimental,futures"
neon-clippy = "clippy --all --all-targets --features napi-experimental,futures -- -A clippy::missing_safety_doc"
neon-test = " test --all --features=napi-experimental,futures"
neon-doc = " rustdoc -p neon --features=napi-experimental,futures -- --cfg docsrs"
11 changes: 11 additions & 0 deletions crates/neon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,19 @@ semver = "0.9.0"
smallvec = "1.4.2"
neon-macros = { version = "=0.10.1", path = "../neon-macros" }

[dependencies.tokio]
version = "1.18.2"
default-features = false
features = ["sync"]
optional = true

[features]
default = ["napi-1"]

# Experimental Rust Futures API
# https://github.com/neon-bindings/rfcs/pull/46
futures = ["tokio"]

# Default N-API version. Prefer to select a minimum required version.
# DEPRECATED: This is an alias that should be removed
napi-runtime = ["napi-8"]
Expand Down Expand Up @@ -62,5 +72,6 @@ proc-macros = []
[package.metadata.docs.rs]
rustdoc-args = ["--cfg", "docsrs"]
features = [
"futures",
"napi-experimental",
]
122 changes: 97 additions & 25 deletions crates/neon/src/event/channel.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,38 @@
use std::sync::{
atomic::{AtomicUsize, Ordering},
mpsc, Arc,
use std::{
error, fmt, mem,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};

use crate::{
context::{Context, TaskContext},
result::NeonResult,
result::ResultExt,
sys::{raw::Env, tsfn::ThreadsafeFunction},
};

#[cfg(feature = "futures")]
use {
std::future::Future,
std::pin::Pin,
std::task::{self, Poll},
tokio::sync::oneshot,
};

#[cfg(not(feature = "futures"))]
// Synchronous oneshot channel API compatible with `futures-channel`
mod oneshot {
use std::sync::mpsc;

pub(super) use std::sync::mpsc::Receiver;

pub(super) fn channel<T>() -> (mpsc::SyncSender<T>, mpsc::Receiver<T>) {
mpsc::sync_channel(1)
}
}

type Callback = Box<dyn FnOnce(Env) + Send + 'static>;

/// Channel for scheduling Rust closures to execute on the JavaScript main thread.
Expand Down Expand Up @@ -70,8 +94,8 @@ pub struct Channel {
has_ref: bool,
}

impl std::fmt::Debug for Channel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl fmt::Debug for Channel {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("Channel")
}
}
Expand Down Expand Up @@ -131,9 +155,9 @@ impl Channel {
T: Send + 'static,
F: FnOnce(TaskContext) -> NeonResult<T> + Send + 'static,
{
let (tx, rx) = mpsc::sync_channel(1);
let (tx, rx) = oneshot::channel();
let callback = Box::new(move |env| {
let env = unsafe { std::mem::transmute(env) };
let env = unsafe { mem::transmute(env) };

// Note: It is sufficient to use `TaskContext`'s `InheritedHandleScope` because
// N-API creates a `HandleScope` before calling the callback.
Expand Down Expand Up @@ -225,20 +249,34 @@ impl Drop for Channel {
/// thread with [`Channel::send`].
pub struct JoinHandle<T> {
// `Err` is always `Throw`, but `Throw` cannot be sent across threads
rx: mpsc::Receiver<Result<T, ()>>,
rx: oneshot::Receiver<Result<T, ()>>,
}

impl<T> JoinHandle<T> {
/// Waits for the associated closure to finish executing
///
/// If the closure panics or throws an exception, `Err` is returned
///
/// # Panics
///
/// This function panics if called within an asynchronous execution context.
pub fn join(self) -> Result<T, JoinError> {
self.rx
.recv()
// If the sending side dropped without sending, it must have panicked
.map_err(|_| JoinError(JoinErrorType::Panic))?
// If the closure returned `Err`, a JavaScript exception was thrown
.map_err(|_| JoinError(JoinErrorType::Throw))
#[cfg(feature = "futures")]
let result = self.rx.blocking_recv();
#[cfg(not(feature = "futures"))]
let result = self.rx.recv();

JoinError::map_res(result)
}
}

#[cfg(feature = "futures")]
#[cfg_attr(docsrs, doc(cfg(feature = "futures")))]
impl<T> Future for JoinHandle<T> {
type Output = Result<T, JoinError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output> {
JoinError::map_poll(&mut self.rx, cx)
}
}

Expand All @@ -253,16 +291,50 @@ enum JoinErrorType {
Throw,
}

impl std::fmt::Display for JoinError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl JoinError {
fn as_str(&self) -> &str {
match &self.0 {
JoinErrorType::Panic => f.write_str("Closure panicked before returning"),
JoinErrorType::Throw => f.write_str("Closure threw an exception"),
JoinErrorType::Panic => "Closure panicked before returning",
JoinErrorType::Throw => "Closure threw an exception",
}
}

#[cfg(feature = "futures")]
// Helper for writing a `Future` implementation by wrapping a `Future` and
// mapping to `Result<T, JoinError>`
pub(crate) fn map_poll<T, E>(
f: &mut (impl Future<Output = Result<Result<T, ()>, E>> + Unpin),
cx: &mut task::Context,
) -> Poll<Result<T, Self>> {
match Pin::new(f).poll(cx) {
Poll::Ready(result) => Poll::Ready(Self::map_res(result)),
Poll::Pending => Poll::Pending,
}
}

// Helper for mapping a nested `Result` from joining to a `Result<T, JoinError>`
pub(crate) fn map_res<T, E>(res: Result<Result<T, ()>, E>) -> Result<T, Self> {
res
// If the sending side dropped without sending, it must have panicked
.map_err(|_| JoinError(JoinErrorType::Panic))?
// If the closure returned `Err`, a JavaScript exception was thrown
.map_err(|_| JoinError(JoinErrorType::Throw))
}
}

impl fmt::Display for JoinError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str(self.as_str())
}
}

impl std::error::Error for JoinError {}
impl error::Error for JoinError {}

impl<T> ResultExt<T> for Result<T, JoinError> {
fn or_throw<'a, C: Context<'a>>(self, cx: &mut C) -> NeonResult<T> {
self.or_else(|err| cx.throw_error(err.as_str()))
}
}

/// Error indicating that a closure was unable to be scheduled to execute on the event loop.
///
Expand All @@ -275,19 +347,19 @@ impl std::error::Error for JoinError {}
#[cfg_attr(docsrs, doc(cfg(feature = "napi-4")))]
pub struct SendError;

impl std::fmt::Display for SendError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl fmt::Display for SendError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "SendError")
}
}

impl std::fmt::Debug for SendError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(self, f)
impl fmt::Debug for SendError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(self, f)
}
}

impl std::error::Error for SendError {}
impl error::Error for SendError {}

struct ChannelState {
tsfn: ThreadsafeFunction<Callback>,
Expand Down
10 changes: 10 additions & 0 deletions crates/neon/src/result/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,13 @@ pub trait JsResultExt<'a, V: Value> {
pub trait ResultExt<T> {
fn or_throw<'a, C: Context<'a>>(self, cx: &mut C) -> NeonResult<T>;
}

impl<'a, 'b, T, E> ResultExt<Handle<'a, T>> for Result<Handle<'a, T>, Handle<'b, E>>
where
T: Value,
E: Value,
{
fn or_throw<'cx, C: Context<'cx>>(self, cx: &mut C) -> JsResult<'a, T> {
self.or_else(|err| cx.throw(err))
}
}
2 changes: 1 addition & 1 deletion crates/neon/src/sys/async_work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! a more idiomatic Rust ownership pattern by passing the output of `execute`
//! into the input of `complete`.
//!
//! https://nodejs.org/api/n-api.html#n_api_simple_asynchronous_operations
//! <https://nodejs.org/api/n-api.html#n_api_simple_asynchronous_operations>
use std::{
ffi::c_void,
Expand Down
2 changes: 1 addition & 1 deletion crates/neon/src/sys/promise.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! JavaScript Promise and Deferred handle
//!
//! https://nodejs.org/api/n-api.html#n_api_promises
//! <https://nodejs.org/api/n-api.html#n_api_promises>
use std::mem::MaybeUninit;

Expand Down
4 changes: 4 additions & 0 deletions crates/neon/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ pub use self::{
#[cfg(feature = "napi-5")]
pub use self::date::{DateError, DateErrorKind, JsDate};

#[cfg(all(feature = "napi-5", feature = "futures"))]
#[cfg_attr(docsrs, doc(cfg(all(feature = "napi-5", feature = "futures"))))]
pub use self::promise::JsFuture;

pub(crate) fn build<'a, T: Managed, F: FnOnce(&mut raw::Local) -> bool>(
env: Env,
init: F,
Expand Down
Loading

0 comments on commit 957a23b

Please sign in to comment.