diff --git a/.cargo/config.toml b/.cargo/config.toml index 981b93099..827cbe62a 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,9 +1,10 @@ [alias] # Neon defines mutually exclusive feature flags which prevents using `cargo clippy --all-features` # The following aliases simplify linting the entire workspace -check-napi = "check --all-targets --no-default-features -p neon -p neon-runtime -p neon-build -p neon-macros -p electron-tests -p napi-tests --features proc-macros,try-catch-api,napi-experimental" +check-napi = "check --all-targets --no-default-features -p neon -p neon-runtime -p neon-build -p neon-macros -p electron-tests -p napi-tests --features proc-macros,try-catch-api,napi-experimental,promise-api,task-api" check-legacy = "check --all-targets --no-default-features -p neon -p neon-runtime -p neon-build -p neon-macros -p tests -p static_tests --features event-handler-api,proc-macros,try-catch-api,legacy-runtime" clippy-legacy = "clippy --all-targets --no-default-features -p neon -p neon-runtime -p neon-build -p neon-macros -p tests -p static_tests --features event-handler-api,proc-macros,try-catch-api,legacy-runtime -- -A clippy::missing_safety_doc" -clippy-napi = "clippy --all-targets --no-default-features -p neon -p neon-runtime -p neon-build -p neon-macros -p electron-tests -p napi-tests --features proc-macros,try-catch-api,napi-experimental -- -A clippy::missing_safety_doc" +clippy-napi = "clippy --all-targets --no-default-features -p neon -p neon-runtime -p neon-build -p neon-macros -p electron-tests -p napi-tests --features proc-macros,try-catch-api,napi-experimental,promise-api,task-api -- -A clippy::missing_safety_doc" neon-test = "test --no-default-features --features napi-experimental" -neon-doc = "rustdoc --no-default-features --features=channel-api,napi-experimental,proc-macros,try-catch-api -- --cfg docsrs" +neon-doc = "rustdoc --no-default-features --features=channel-api,napi-experimental,proc-macros,try-catch-api,promise-api,task-api -- --cfg docsrs" +neon-doc-test = "test --doc --no-default-features --features=channel-api,napi-experimental,proc-macros,try-catch-api,promise-api,task-api" diff --git a/Cargo.toml b/Cargo.toml index 4d8963f93..2e2423bc6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,6 +78,13 @@ event-queue-api = ["channel-api"] # Feature flag to include procedural macros proc-macros = ["neon-macros"] +# Enable `JsPromise` and `Deferred` +# https://github.com/neon-bindings/rfcs/pull/35 +promise-api = [] +# Enable `TaskBuilder` +# https://github.com/neon-bindings/rfcs/pull/35 +task-api = [] + [package.metadata.docs.rs] no-default-features = true rustdoc-args = ["--cfg", "docsrs"] @@ -86,6 +93,8 @@ features = [ "napi-experimental", "proc-macros", "try-catch-api", + "promise-api", + "task-api", ] [workspace] diff --git a/crates/neon-runtime/src/napi/async_work.rs b/crates/neon-runtime/src/napi/async_work.rs new file mode 100644 index 000000000..d0677c4b2 --- /dev/null +++ b/crates/neon-runtime/src/napi/async_work.rs @@ -0,0 +1,138 @@ +//! Rust wrappers for Node-API simple asynchronous operations +//! +//! Unlike `napi_async_work` which threads a single mutable pointer to a data +//! struct to both the `execute` and `complete` callbacks, the wrapper follows +//! a more idiomatic Rust ownership pattern by passing the output of `execute` +//! into the input of `complete`. +//! +//! https://nodejs.org/api/n-api.html#n_api_simple_asynchronous_operations + +use std::ffi::c_void; +use std::mem; +use std::ptr; + +use crate::napi::bindings as napi; +use crate::raw::Env; + +type Execute = fn(input: T) -> O; +type Complete = fn(env: Env, output: O); + +/// Schedule work to execute on the libuv thread pool +/// +/// # Safety +/// * `env` must be a valid `napi_env` for the current thread +pub unsafe fn schedule(env: Env, input: T, execute: Execute, complete: Complete) +where + T: Send + 'static, + O: Send + 'static, +{ + let mut data = Box::new(Data { + state: State::Input(input), + execute, + complete, + // Work is initialized as a null pointer, but set by `create_async_work` + // `data` must not be used until this value has been set. + work: ptr::null_mut(), + }); + + // Store a pointer to `work` before ownership is transferred to `Box::into_raw` + let work = &mut data.work as *mut _; + + // Create the `async_work` + assert_eq!( + napi::create_async_work( + env, + ptr::null_mut(), + super::string(env, "neon_async_work"), + Some(call_execute::), + Some(call_complete::), + Box::into_raw(data).cast(), + work, + ), + napi::Status::Ok, + ); + + // Queue the work + match napi::queue_async_work(env, *work) { + napi::Status::Ok => {} + status => { + // If queueing failed, delete the work to prevent a leak + napi::delete_async_work(env, *work); + assert_eq!(status, napi::Status::Ok); + } + } +} + +/// A pointer to data is passed to the `execute` and `complete` callbacks +struct Data { + state: State, + execute: Execute, + complete: Complete, + work: napi::AsyncWork, +} + +/// State of the task that is transitioned by `execute` and `complete` +enum State { + /// Initial data input passed to `execute` + Input(T), + /// Transient state while `execute` is running + Executing, + /// Return data of `execute` passed to `complete` + Output(O), +} + +impl State { + /// Return the input if `State::Input`, replacing with `State::Executing` + fn take_execute_input(&mut self) -> Option { + match mem::replace(self, Self::Executing) { + Self::Input(input) => Some(input), + _ => None, + } + } + + /// Return the output if `State::Output`, replacing with `State::Executing` + fn into_output(self) -> Option { + match self { + Self::Output(output) => Some(output), + _ => None, + } + } +} + +/// Callback executed on the libuv thread pool +/// +/// # Safety +/// * `Env` should not be used because it could attempt to call JavaScript +/// * `data` is expected to be a pointer to `Data` +unsafe extern "C" fn call_execute(_: Env, data: *mut c_void) { + let data = &mut *data.cast::>(); + // `unwrap` is ok because `call_execute` should be called exactly once + // after initialization + let input = data.state.take_execute_input().unwrap(); + let output = (data.execute)(input); + + data.state = State::Output(output); +} + +/// Callback executed on the JavaScript main thread +/// +/// # Safety +/// * `data` is expected to be a pointer to `Data` +unsafe extern "C" fn call_complete(env: Env, status: napi::Status, data: *mut c_void) { + let Data { + state, + complete, + work, + .. + } = *Box::>::from_raw(data.cast()); + + napi::delete_async_work(env, work); + + match status { + // `unwrap` is okay because `call_complete` should be called exactly once + // if and only if `call_execute` has completed successfully + napi::Status::Ok => complete(env, state.into_output().unwrap()), + napi::Status::Cancelled => {} + _ => assert_eq!(status, napi::Status::Ok), + } +} diff --git a/crates/neon-runtime/src/napi/bindings/functions.rs b/crates/neon-runtime/src/napi/bindings/functions.rs index 268667a87..4456b71af 100644 --- a/crates/neon-runtime/src/napi/bindings/functions.rs +++ b/crates/neon-runtime/src/napi/bindings/functions.rs @@ -57,6 +57,7 @@ mod napi1 { fn is_buffer(env: Env, value: Value, result: *mut bool) -> Status; fn is_error(env: Env, value: Value, result: *mut bool) -> Status; fn is_array(env: Env, value: Value, result: *mut bool) -> Status; + fn is_promise(env: Env, value: Value, result: *mut bool) -> Status; fn get_value_string_utf8( env: Env, @@ -209,6 +210,22 @@ mod napi1 { ) -> Status; fn run_script(env: Env, script: Value, result: *mut Value) -> Status; + + fn create_async_work( + env: Env, + async_resource: Value, + async_resource_name: Value, + execute: AsyncExecuteCallback, + complete: AsyncCompleteCallback, + data: *mut c_void, + result: *mut AsyncWork, + ) -> Status; + + fn delete_async_work(env: Env, work: AsyncWork) -> Status; + fn queue_async_work(env: Env, work: AsyncWork) -> Status; + fn create_promise(env: Env, deferred: *mut Deferred, promise: *mut Value) -> Status; + fn resolve_deferred(env: Env, deferred: Deferred, resolution: Value) -> Status; + fn reject_deferred(env: Env, deferred: Deferred, rejection: Value) -> Status; } ); } diff --git a/crates/neon-runtime/src/napi/bindings/mod.rs b/crates/neon-runtime/src/napi/bindings/mod.rs index 8771255bd..85d141a08 100644 --- a/crates/neon-runtime/src/napi/bindings/mod.rs +++ b/crates/neon-runtime/src/napi/bindings/mod.rs @@ -169,8 +169,8 @@ macro_rules! generate { use std::sync::Once; pub(crate) use functions::*; -pub use types::TypedArrayType; pub(crate) use types::*; +pub use types::{Deferred, TypedArrayType}; mod functions; mod types; diff --git a/crates/neon-runtime/src/napi/bindings/types.rs b/crates/neon-runtime/src/napi/bindings/types.rs index 55f86c384..6ab691c9a 100644 --- a/crates/neon-runtime/src/napi/bindings/types.rs +++ b/crates/neon-runtime/src/napi/bindings/types.rs @@ -29,6 +29,7 @@ pub type CallbackInfo = *mut CallbackInfo__; pub struct EscapableHandleScope__ { _unused: [u8; 0], } + pub type EscapableHandleScope = *mut EscapableHandleScope__; #[repr(C)] @@ -203,3 +204,24 @@ impl std::ops::BitAndAssign for KeyFilter { self.0 &= rhs.0; } } + +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct AsyncWork__ { + _unused: [u8; 0], +} + +pub type AsyncWork = *mut AsyncWork__; + +pub type AsyncExecuteCallback = Option; + +pub type AsyncCompleteCallback = + Option; + +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct Deferred__ { + _unused: [u8; 0], +} + +pub type Deferred = *mut Deferred__; diff --git a/crates/neon-runtime/src/napi/mod.rs b/crates/neon-runtime/src/napi/mod.rs index 5ab9f8484..edc971f00 100644 --- a/crates/neon-runtime/src/napi/mod.rs +++ b/crates/neon-runtime/src/napi/mod.rs @@ -1,5 +1,6 @@ pub mod array; pub mod arraybuffer; +pub mod async_work; pub mod buffer; pub mod call; pub mod convert; @@ -13,6 +14,7 @@ pub mod lifecycle; pub mod mem; pub mod object; pub mod primitive; +pub mod promise; pub mod raw; pub mod reference; pub mod scope; @@ -23,4 +25,29 @@ pub mod tsfn; pub mod typedarray; mod bindings; + pub use bindings::*; + +use std::mem::MaybeUninit; + +/// Create a JavaScript `String`, panicking if unsuccessful +/// +/// # Safety +/// * `env` is a `napi_env` valid for the current thread +/// * The returned value does not outlive `env` +unsafe fn string(env: Env, s: impl AsRef) -> raw::Local { + let s = s.as_ref(); + let mut result = MaybeUninit::uninit(); + + assert_eq!( + create_string_utf8( + env, + s.as_bytes().as_ptr() as *const _, + s.len(), + result.as_mut_ptr(), + ), + Status::Ok, + ); + + result.assume_init() +} diff --git a/crates/neon-runtime/src/napi/promise.rs b/crates/neon-runtime/src/napi/promise.rs new file mode 100644 index 000000000..cac765123 --- /dev/null +++ b/crates/neon-runtime/src/napi/promise.rs @@ -0,0 +1,66 @@ +//! JavaScript Promise and Deferred handle +//! +//! https://nodejs.org/api/n-api.html#n_api_promises + +use std::mem::MaybeUninit; +use std::ptr; + +use crate::napi::bindings as napi; +use crate::raw::Env; + +/// Create a `Promise` and a `napi::Deferred` handle for resolving it +/// +/// # Safety +/// * `env` is a valid `napi_env` for the current thread +/// * The returned `napi::Value` does not outlive `env` +pub unsafe fn create(env: Env) -> (napi::Deferred, napi::Value) { + let mut deferred = MaybeUninit::uninit(); + let mut promise = MaybeUninit::uninit(); + + assert_eq!( + napi::create_promise(env, deferred.as_mut_ptr(), promise.as_mut_ptr()), + napi::Status::Ok, + ); + + (deferred.assume_init(), promise.assume_init()) +} + +/// Resolve a promise from a `napi::Deferred` handle +/// +/// # Safety +/// * `env` is a valid `napi_env` for the current thread +/// * `resolution` is a valid `napi::Value` +pub unsafe fn resolve(env: Env, deferred: napi::Deferred, resolution: napi::Value) { + assert_eq!( + napi::resolve_deferred(env, deferred, resolution), + napi::Status::Ok, + ); +} + +/// Rejects a promise from a `napi::Deferred` handle +/// +/// # Safety +/// * `env` is a valid `napi_env` for the current thread +/// * `rejection` is a valid `napi::Value` +pub unsafe fn reject(env: Env, deferred: napi::Deferred, rejection: napi::Value) { + assert_eq!( + napi::reject_deferred(env, deferred, rejection), + napi::Status::Ok, + ); +} + +/// Rejects a promise from a `napi::Deferred` handle with a string message +/// +/// # Safety +/// * `env` is a valid `napi_env` for the current thread +pub unsafe fn reject_err_message(env: Env, deferred: napi::Deferred, msg: impl AsRef) { + let msg = super::string(env, msg); + let mut err = MaybeUninit::uninit(); + + assert_eq!( + napi::create_error(env, ptr::null_mut(), msg, err.as_mut_ptr()), + napi::Status::Ok, + ); + + reject(env, deferred, err.assume_init()); +} diff --git a/crates/neon-runtime/src/napi/tag.rs b/crates/neon-runtime/src/napi/tag.rs index d1b969182..c56944f34 100644 --- a/crates/neon-runtime/src/napi/tag.rs +++ b/crates/neon-runtime/src/napi/tag.rs @@ -99,3 +99,16 @@ pub unsafe fn is_date(env: Env, val: Local) -> bool { ); result } + +/// Is `val` a Promise? +/// +/// # Safety +/// * `env` is a valid `napi_env` for the current thread +pub unsafe fn is_promise(env: Env, val: Local) -> bool { + let mut result = false; + assert_eq!( + napi::is_promise(env, val, &mut result as *mut _), + napi::Status::Ok + ); + result +} diff --git a/crates/neon-runtime/src/napi/tsfn.rs b/crates/neon-runtime/src/napi/tsfn.rs index eea90f78c..2b007199c 100644 --- a/crates/neon-runtime/src/napi/tsfn.rs +++ b/crates/neon-runtime/src/napi/tsfn.rs @@ -5,24 +5,7 @@ use std::mem::MaybeUninit; use std::sync::{Arc, Mutex}; use crate::napi::bindings as napi; -use crate::raw::{Env, Local}; - -unsafe fn string(env: Env, s: impl AsRef) -> Local { - let s = s.as_ref(); - let mut result = MaybeUninit::uninit(); - - assert_eq!( - napi::create_string_utf8( - env, - s.as_bytes().as_ptr() as *const _, - s.len(), - result.as_mut_ptr(), - ), - napi::Status::Ok, - ); - - result.assume_init() -} +use crate::raw::Env; #[derive(Debug)] struct Tsfn(napi::ThreadsafeFunction); @@ -85,7 +68,7 @@ impl ThreadsafeFunction { env, std::ptr::null_mut(), std::ptr::null_mut(), - string(env, "neon threadsafe function"), + super::string(env, "neon threadsafe function"), max_queue_size, // Always set the reference count to 1. Prefer using // Rust `Arc` to maintain the struct. diff --git a/src/context/mod.rs b/src/context/mod.rs index 93254b774..e9c405864 100644 --- a/src/context/mod.rs +++ b/src/context/mod.rs @@ -155,6 +155,8 @@ use crate::borrow::{Borrow, BorrowMut, Ref, RefMut}; use crate::context::internal::Env; #[cfg(all(feature = "napi-4", feature = "channel-api"))] use crate::event::Channel; +#[cfg(all(feature = "napi-1", feature = "task-api"))] +use crate::event::TaskBuilder; use crate::handle::{Handle, Managed}; #[cfg(all(feature = "napi-6", feature = "channel-api"))] use crate::lifecycle::InstanceData; @@ -169,6 +171,8 @@ pub use crate::types::buffer::lock::Lock; #[cfg(feature = "napi-5")] use crate::types::date::{DateError, JsDate}; use crate::types::error::JsError; +#[cfg(all(feature = "napi-1", feature = "promise-api"))] +use crate::types::{Deferred, JsPromise}; use crate::types::{ JsArray, JsArrayBuffer, JsBoolean, JsBuffer, JsFunction, JsNull, JsNumber, JsObject, JsString, JsUndefined, JsValue, StringResult, Value, @@ -608,6 +612,57 @@ pub trait Context<'a>: ContextInternal<'a> { fn queue(&mut self) -> Channel { self.channel() } + + #[cfg(all(feature = "napi-1", feature = "promise-api"))] + #[cfg_attr(docsrs, doc(cfg(feature = "promise-api")))] + /// Creates a [`Deferred`] and [`JsPromise`] pair. The [`Deferred`] handle can be + /// used to resolve or reject the [`JsPromise`]. + /// + /// ``` + /// # #[cfg(all(feature = "napi-1", feature = "promise-api"))] { + /// # use neon::prelude::*; + /// fn resolve_promise(mut cx: FunctionContext) -> JsResult { + /// let (deferred, promise) = cx.promise(); + /// let msg = cx.string("Hello, World!"); + /// + /// deferred.resolve(&mut cx, msg); + /// + /// Ok(promise) + /// } + /// # } + /// ``` + fn promise(&mut self) -> (Deferred, Handle<'a, JsPromise>) { + JsPromise::new(self) + } + + #[cfg(all(feature = "napi-1", feature = "task-api"))] + #[cfg_attr(docsrs, doc(cfg(feature = "task-api")))] + /// Creates a [`TaskBuilder`] which can be used to schedule the `execute` + /// callback to asynchronously execute on the + /// [Node worker pool](https://nodejs.org/en/docs/guides/dont-block-the-event-loop/). + /// + /// ``` + /// # #[cfg(all(feature = "napi-1", feature = "promise-api", feature = "task-api"))] { + /// # use neon::prelude::*; + /// fn greet(mut cx: FunctionContext) -> JsResult { + /// let name = cx.argument::(0)?.value(&mut cx); + /// + /// let promise = cx + /// .task(move || format!("Hello, {}!", name)) + /// .promise(move |cx, greeting| Ok(cx.string(greeting))); + /// + /// Ok(promise) + /// } + /// # } + /// ``` + fn task<'cx, O, E>(&'cx mut self, execute: E) -> TaskBuilder + where + 'a: 'cx, + O: Send + 'static, + E: FnOnce() -> O + Send + 'static, + { + TaskBuilder::new(self, execute) + } } /// An execution context of module initialization. @@ -851,7 +906,10 @@ impl<'a> TaskContext<'a> { Scope::with(env, |scope| f(TaskContext { scope })) } - #[cfg(all(feature = "napi-4", feature = "channel-api"))] + #[cfg(any( + all(feature = "napi-1", feature = "task-api"), + all(feature = "napi-4", feature = "channel-api"), + ))] pub(crate) fn with_context FnOnce(TaskContext<'b>) -> T>(env: Env, f: F) -> T { Scope::with(env, |scope| f(TaskContext { scope })) } diff --git a/src/event/event_queue.rs b/src/event/channel.rs similarity index 82% rename from src/event/event_queue.rs rename to src/event/channel.rs index ae06d9cae..7f7d9a136 100644 --- a/src/event/event_queue.rs +++ b/src/event/channel.rs @@ -5,7 +5,11 @@ use neon_runtime::raw::Env; use neon_runtime::tsfn::ThreadsafeFunction; use crate::context::{Context, TaskContext}; +#[cfg(feature = "promise-api")] +use crate::result::JsResult; use crate::result::NeonResult; +#[cfg(feature = "promise-api")] +use crate::types::{Deferred, Value}; type Callback = Box; @@ -54,7 +58,7 @@ type Callback = Box; /// Ok(cx.undefined()) /// } /// ``` - +#[cfg_attr(docsrs, doc(cfg(all(feature = "napi-4", feature = "task-api"))))] pub struct Channel { state: Arc, has_ref: bool, @@ -146,6 +150,58 @@ impl Channel { pub fn has_ref(&self) -> bool { self.has_ref } + + #[cfg_attr(docsrs, doc(cfg(feature = "promise-api")))] + #[cfg(feature = "promise-api")] + /// Settle a [`JsPromise`](crate::types::JsPromise) from [`Deferred`] by sending a + /// closure to be executed on the main JavaScript thread. + /// + /// The `JsPromise` will be resolved with the value returned by the `complete` + /// closure. If an exception is thrown, the promise will be rejected with the exception. + /// + /// Panics if there is a libuv error. + /// + /// ``` + /// # #[cfg(feature = "promise-api")] { + /// # use neon::prelude::*; + /// # fn example(mut cx: FunctionContext) -> JsResult { + /// let channel = cx.channel(); + /// let (deferred, promise) = cx.promise(); + /// + /// channel.settle_with(deferred, move |cx| Ok(cx.number(42))); + /// + /// # Ok(promise) + /// # } + /// # } + /// ``` + pub fn settle_with(&self, deferred: Deferred, complete: F) -> JoinHandle<()> + where + V: Value, + for<'a> F: FnOnce(&mut TaskContext<'a>) -> JsResult<'a, V> + Send + 'static, + { + self.send(move |mut cx| Ok(deferred.settle_with(&mut cx, complete))) + } + + #[cfg_attr(docsrs, doc(cfg(feature = "promise-api")))] + #[cfg(feature = "promise-api")] + /// Settle a [`JsPromise`](crate::types::JsPromise) from [`Deferred`](crate::types::Deferred) by + /// sending a closure to be executed on the main JavaScript thread. + /// + /// Usage is identical to [`Channel::settle_with`]. + /// + /// Returns a `SendError` if sending the closure to the main JavaScript thread fails. + /// See [`Channel::try_send`] and [`SendError`] for more details. + pub fn try_settle_with( + &self, + deferred: Deferred, + complete: F, + ) -> Result, SendError> + where + V: Value, + for<'a> F: FnOnce(&mut TaskContext<'a>) -> JsResult<'a, V> + Send + 'static, + { + self.try_send(move |mut cx| Ok(deferred.settle_with(&mut cx, complete))) + } } impl Clone for Channel { @@ -261,6 +317,7 @@ impl std::error::Error for JoinError {} // // NOTE: These docs will need to be updated to include `QueueFull` if bounded queues are // implemented. +#[cfg_attr(docsrs, doc(cfg(all(feature = "napi-4", feature = "task-api"))))] pub struct SendError; impl std::fmt::Display for SendError { diff --git a/src/event/mod.rs b/src/event/mod.rs index e80d3b8e2..935be793f 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -124,20 +124,26 @@ //! [psd-file]: https://www.adobe.com/devnet-apps/photoshop/fileformatashtml/ #[cfg(all(feature = "napi-4", feature = "channel-api"))] -mod event_queue; +mod channel; + +#[cfg(all(feature = "napi-1", feature = "task-api"))] +mod task; #[cfg(all(feature = "napi-4", feature = "channel-api"))] -pub use self::event_queue::{Channel, JoinError, JoinHandle, SendError}; +pub use self::channel::{Channel, JoinError, JoinHandle, SendError}; + +#[cfg(all(feature = "napi-1", feature = "task-api"))] +pub use self::task::TaskBuilder; #[cfg(all(feature = "napi-4", feature = "channel-api"))] #[deprecated(since = "0.9.0", note = "Please use the Channel type instead")] #[doc(hidden)] -pub type EventQueue = self::event_queue::Channel; +pub type EventQueue = self::channel::Channel; #[cfg(all(feature = "napi-4", feature = "channel-api"))] #[deprecated(since = "0.9.0", note = "Please use the SendError type instead")] #[doc(hidden)] -pub type EventQueueError = self::event_queue::SendError; +pub type EventQueueError = self::channel::SendError; #[cfg(all(not(feature = "napi-1"), feature = "event-handler-api"))] mod event_handler; diff --git a/src/event/task.rs b/src/event/task.rs new file mode 100644 index 000000000..cbbce7cf1 --- /dev/null +++ b/src/event/task.rs @@ -0,0 +1,175 @@ +use neon_runtime::{async_work, raw}; + +#[cfg(feature = "promise-api")] +use crate::context::internal::ContextInternal; +use crate::context::{internal::Env, Context, TaskContext}; +#[cfg(feature = "promise-api")] +use crate::handle::Handle; +#[cfg(feature = "promise-api")] +use crate::result::JsResult; +use crate::result::NeonResult; +#[cfg(feature = "promise-api")] +use crate::types::Value; +#[cfg(feature = "promise-api")] +use crate::types::{Deferred, JsPromise}; + +type ExecuteOutput = (O, F); +#[cfg(feature = "promise-api")] +type PromiseOutput = (O, F, Deferred); + +#[cfg_attr(docsrs, doc(cfg(feature = "task-api")))] +/// Node asynchronous task builder +/// +/// ``` +/// # #[cfg(feature = "promise-api")] { +/// # use neon::prelude::*; +/// fn greet(mut cx: FunctionContext) -> JsResult { +/// let name = cx.argument::(0)?.value(&mut cx); +/// +/// let promise = cx +/// .task(move || format!("Hello, {}!", name)) +/// .promise(move |cx, greeting| Ok(cx.string(greeting))); +/// +/// Ok(promise) +/// } +/// # } +/// ``` +pub struct TaskBuilder<'cx, C, E> { + cx: &'cx mut C, + execute: E, +} + +impl<'a: 'cx, 'cx, C, O, E> TaskBuilder<'cx, C, E> +where + C: Context<'a>, + O: Send + 'static, + E: FnOnce() -> O + Send + 'static, +{ + /// Construct a new task builder from an `execute` callback that can be + /// scheduled to execute on the Node worker pool + pub fn new(cx: &'cx mut C, execute: E) -> Self { + Self { cx, execute } + } + + /// Schedules a task to execute on the Node worker pool, executing the + /// `complete` callback on the JavaScript main thread with the result + /// of the `execute` callback + pub fn and_then(self, complete: F) + where + F: FnOnce(&mut TaskContext, O) -> NeonResult<()> + Send + 'static, + { + let env = self.cx.env(); + let execute = self.execute; + + // Wrap the user provided `execute` callback with a callback that + // runs it but, also passes the user's `complete` callback to the + // static `complete` function. + let execute = move || (execute(), complete); + + schedule(env, execute); + } + + #[cfg_attr(docsrs, doc(cfg(feature = "promise-api")))] + #[cfg(feature = "promise-api")] + /// Schedules a task to execute on the Node worker pool and returns a + /// promise that is resolved with the value from the `complete` callback. + /// + /// The `complete` callback will execute on the JavaScript main thread and + /// is passed the return value from `execute`. If the `complete` callback + /// throws, the promise will be rejected with the exception + pub fn promise(self, complete: F) -> Handle<'a, JsPromise> + where + V: Value, + for<'b> F: FnOnce(&mut TaskContext<'b>, O) -> JsResult<'b, V> + Send + 'static, + { + let env = self.cx.env(); + let (deferred, promise) = JsPromise::new(self.cx); + let execute = self.execute; + + // Wrap the user provided `execute` callback with a callback that runs it + // but, also passes the user's `complete` callback and `Deferred` into the + // static `complete` function. + let execute = move || (execute(), complete, deferred); + + schedule_promise(env, execute); + + promise + } +} + +// Schedule a task to execute on the Node worker pool +fn schedule(env: Env, input: I) +where + O: Send + 'static, + F: FnOnce(&mut TaskContext, O) -> NeonResult<()> + Send + 'static, + I: FnOnce() -> ExecuteOutput + Send + 'static, +{ + unsafe { + async_work::schedule( + env.to_raw(), + input, + execute::>, + complete::, + ); + } +} + +fn execute(input: I) -> O +where + I: FnOnce() -> O + Send + 'static, + O: Send + 'static, +{ + input() +} + +// Unpack an `(output, complete)` tuple returned by `execute` and execute +// `complete` with the `output` argument +fn complete(env: raw::Env, (output, complete): ExecuteOutput) +where + O: Send + 'static, + F: FnOnce(&mut TaskContext, O) -> NeonResult<()> + Send + 'static, +{ + let env = unsafe { std::mem::transmute(env) }; + + TaskContext::with_context(env, move |mut cx| { + let _ = complete(&mut cx, output); + }); +} + +#[cfg(feature = "promise-api")] +// Schedule a task to execute on the Node worker pool and settle a `Promise` with the result +fn schedule_promise(env: Env, input: I) +where + O: Send + 'static, + V: Value, + for<'b> F: FnOnce(&mut TaskContext<'b>, O) -> JsResult<'b, V> + Send + 'static, + I: FnOnce() -> PromiseOutput + Send + 'static, +{ + unsafe { + async_work::schedule( + env.to_raw(), + input, + execute::>, + complete_promise::, + ); + } +} + +#[cfg(feature = "promise-api")] +// Unpack an `(output, complete, deferred)` tuple returned by `execute` and settle the +// deferred with the result of passing `output` to the `complete` callback +fn complete_promise(env: raw::Env, (output, complete, deferred): PromiseOutput) +where + O: Send + 'static, + V: Value, + for<'b> F: FnOnce(&mut TaskContext<'b>, O) -> JsResult<'b, V> + Send + 'static, +{ + let env = unsafe { std::mem::transmute(env) }; + + TaskContext::with_context(env, move |mut cx| { + match cx.try_catch_internal(move |cx| complete(cx, output)) { + Ok(value) => deferred.resolve(&mut cx, value), + Err(err) => deferred.reject(&mut cx, err), + } + }); +} diff --git a/src/handle/root.rs b/src/handle/root.rs index 30313fb6d..6571315c3 100644 --- a/src/handle/root.rs +++ b/src/handle/root.rs @@ -3,14 +3,14 @@ use std::marker::PhantomData; #[cfg(feature = "napi-6")] use std::sync::Arc; -use neon_runtime::reference; #[cfg(feature = "napi-6")] use neon_runtime::tsfn::ThreadsafeFunction; +use neon_runtime::{raw, reference}; use crate::context::Context; use crate::handle::Handle; #[cfg(feature = "napi-6")] -use crate::lifecycle::InstanceData; +use crate::lifecycle::{DropData, InstanceData}; use crate::object::Object; use crate::types::boxed::Finalize; @@ -18,12 +18,19 @@ use crate::types::boxed::Finalize; #[derive(Clone)] pub(crate) struct NapiRef(*mut c_void); +impl NapiRef { + pub(crate) unsafe fn unref(self, env: raw::Env) { + reference::unreference(env, self.0.cast()); + } +} + // # Safety // `NapiRef` are reference counted types that allow references to JavaScript objects // to outlive a `Context` (`napi_env`). Since access is serialized by obtaining a // `Context`, they are both `Send` and `Sync`. // https://nodejs.org/api/n-api.html#n_api_references_to_objects_with_a_lifespan_longer_than_that_of_the_native_method unsafe impl Send for NapiRef {} + unsafe impl Sync for NapiRef {} /// A thread-safe handle that holds a reference to a JavaScript object and @@ -36,7 +43,7 @@ pub struct Root { // It will *always* be `Some` when a user is interacting with `Root`. internal: Option, #[cfg(feature = "napi-6")] - drop_queue: Arc>, + drop_queue: Arc>, _phantom: PhantomData, } @@ -50,6 +57,7 @@ impl std::fmt::Debug for Root { // Safety: `Root` contains two types. A `NapiRef` which is `Send` and `Sync` and a // `PhantomData` that does not impact the safety. unsafe impl Send for Root {} + unsafe impl Sync for Root {} impl Root { @@ -106,22 +114,20 @@ impl Root { /// object. pub fn drop<'a, C: Context<'a>>(self, cx: &mut C) { let env = cx.env().to_raw(); - let internal = self.into_napi_ref().0 as *mut _; unsafe { - reference::unreference(env, internal); + self.into_napi_ref().unref(env); } } /// Return the referenced JavaScript object and allow it to be garbage collected pub fn into_inner<'a, C: Context<'a>>(self, cx: &mut C) -> Handle<'a, T> { let env = cx.env(); - let internal = self.into_napi_ref().0 as *mut _; - - let local = unsafe { reference::get(env.to_raw(), internal) }; + let internal = self.into_napi_ref(); + let local = unsafe { reference::get(env.to_raw(), internal.0.cast()) }; unsafe { - reference::unreference(env.to_raw(), internal); + internal.unref(env.to_raw()); } Handle::new_internal(T::from_raw(env, local)) @@ -173,16 +179,13 @@ impl Drop for Root { // Destructors are called during stack unwinding, prevent a double // panic and instead prefer to leak. if std::thread::panicking() { - eprintln!("Warning: neon::sync::Root leaked during a panic"); + eprintln!("Warning: neon::handle::Root leaked during a panic"); return; } // Only panic if the event loop is still running if let Ok(true) = crate::context::internal::IS_RUNNING.try_with(|v| *v.borrow()) { - panic!( - "Must call `into_inner` or `drop` on `Root` \ - https://docs.rs/neon/latest/neon/sync/index.html#drop-safety" - ); + panic!("Must call `into_inner` or `drop` on `neon::handle::Root`"); } } @@ -190,7 +193,7 @@ impl Drop for Root { fn drop(&mut self) { // If `None`, the `NapiRef` has already been manually dropped if let Some(internal) = self.internal.take() { - let _ = self.drop_queue.call(internal.clone(), None); + let _ = self.drop_queue.call(DropData::Ref(internal), None); } } } diff --git a/src/lib.rs b/src/lib.rs index a39835ece..f7ecbbf59 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -85,9 +85,9 @@ pub mod borrow; pub mod context; #[cfg(any( feature = "event-handler-api", + all(feature = "napi-1", feature = "task-api"), all(feature = "napi-4", feature = "channel-api") ))] -#[cfg_attr(docsrs, doc(cfg(all(feature = "napi-4", feature = "channel-api"))))] pub mod event; pub mod handle; pub mod meta; diff --git a/src/lifecycle.rs b/src/lifecycle.rs index 23f77ccdb..4802d9753 100644 --- a/src/lifecycle.rs +++ b/src/lifecycle.rs @@ -8,17 +8,17 @@ //! //! [napi-docs]: https://nodejs.org/api/n-api.html#n_api_environment_life_cycle_apis -use std::mem; use std::sync::Arc; use neon_runtime::raw::Env; -use neon_runtime::reference; use neon_runtime::tsfn::ThreadsafeFunction; use crate::context::Context; -#[cfg(all(feature = "channel-api"))] +#[cfg(feature = "channel-api")] use crate::event::Channel; use crate::handle::root::NapiRef; +#[cfg(feature = "promise-api")] +use crate::types::promise::NodeApiDeferred; /// `InstanceData` holds Neon data associated with a particular instance of a /// native module. If a module is loaded multiple times (e.g., worker threads), this @@ -31,17 +31,31 @@ pub(crate) struct InstanceData { /// could be replaced with a leaked `&'static ThreadsafeFunction`. However, /// given the cost of FFI, this optimization is omitted until the cost of an /// `Arc` is demonstrated as significant. - drop_queue: Arc>, + drop_queue: Arc>, /// Shared `Channel` that is cloned to be returned by the `cx.channel()` method #[cfg(all(feature = "channel-api"))] shared_channel: Channel, } -fn drop_napi_ref(env: Option, data: NapiRef) { - if let Some(env) = env { - unsafe { - reference::unreference(env, mem::transmute(data)); +/// Wrapper for raw Node-API values to be dropped on the main thread +pub(crate) enum DropData { + #[cfg(feature = "promise-api")] + Deferred(NodeApiDeferred), + Ref(NapiRef), +} + +impl DropData { + /// Drop a value on the main thread + fn drop(env: Option, data: Self) { + if let Some(env) = env { + unsafe { + match data { + #[cfg(feature = "promise-api")] + DropData::Deferred(data) => data.leaked(env), + DropData::Ref(data) => data.unref(env), + } + } } } } @@ -63,7 +77,7 @@ impl InstanceData { } let drop_queue = unsafe { - let queue = ThreadsafeFunction::new(env, drop_napi_ref); + let queue = ThreadsafeFunction::new(env, DropData::drop); queue.unref(env); queue }; @@ -85,7 +99,7 @@ impl InstanceData { } /// Helper to return a reference to the `drop_queue` field of `InstanceData` - pub(crate) fn drop_queue<'a, C: Context<'a>>(cx: &mut C) -> Arc> { + pub(crate) fn drop_queue<'a, C: Context<'a>>(cx: &mut C) -> Arc> { Arc::clone(&InstanceData::get(cx).drop_queue) } diff --git a/src/prelude.rs b/src/prelude.rs index 08ab087fc..44eab7ac2 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -37,6 +37,9 @@ pub use crate::task::Task; #[cfg(feature = "legacy-runtime")] #[doc(no_inline)] pub use crate::types::BinaryData; +#[cfg(all(feature = "napi-1", feature = "promise-api"))] +#[doc(no_inline)] +pub use crate::types::JsPromise; #[cfg(feature = "napi-1")] #[doc(no_inline)] pub use crate::types::JsTypedArray; diff --git a/src/types/mod.rs b/src/types/mod.rs index 3d4316e36..d80f4fe91 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -81,6 +81,8 @@ pub mod buffer; #[cfg(feature = "napi-5")] pub(crate) mod date; pub(crate) mod error; +#[cfg(all(feature = "napi-1", feature = "promise-api"))] +pub(crate) mod promise; pub(crate) mod internal; pub(crate) mod utf8; @@ -111,6 +113,8 @@ pub use self::buffer::types::{JsArrayBuffer, JsBuffer, JsTypedArray}; #[cfg(feature = "napi-5")] pub use self::date::{DateError, DateErrorKind, JsDate}; pub use self::error::JsError; +#[cfg(all(feature = "napi-1", feature = "promise-api"))] +pub use self::promise::{Deferred, JsPromise}; pub(crate) fn build<'a, T: Managed, F: FnOnce(&mut raw::Local) -> bool>( env: Env, diff --git a/src/types/promise.rs b/src/types/promise.rs new file mode 100644 index 000000000..70bebe117 --- /dev/null +++ b/src/types/promise.rs @@ -0,0 +1,162 @@ +#[cfg(feature = "napi-6")] +use std::sync::Arc; + +#[cfg(feature = "napi-6")] +use neon_runtime::tsfn::ThreadsafeFunction; +use neon_runtime::{napi, raw}; + +use crate::context::{internal::Env, Context}; +use crate::handle::Managed; +#[cfg(feature = "napi-6")] +use crate::lifecycle::{DropData, InstanceData}; +use crate::result::JsResult; +use crate::types::{Handle, Object, Value, ValueInternal}; + +#[cfg_attr(docsrs, doc(cfg(feature = "promise-api")))] +#[repr(C)] +#[derive(Clone, Copy)] +/// The JavaScript [`Promise`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise) value. +/// +/// [`JsPromise`] may be constructed with [`Context::promise`]. +pub struct JsPromise(raw::Local); + +impl JsPromise { + pub(crate) fn new<'a, C: Context<'a>>(cx: &mut C) -> (Deferred, Handle<'a, Self>) { + let (deferred, promise) = unsafe { napi::promise::create(cx.env().to_raw()) }; + let deferred = Deferred { + internal: Some(NodeApiDeferred(deferred)), + #[cfg(feature = "napi-6")] + drop_queue: InstanceData::drop_queue(cx), + }; + + (deferred, Handle::new_internal(JsPromise(promise))) + } +} + +impl Managed for JsPromise { + fn to_raw(self) -> raw::Local { + self.0 + } + + fn from_raw(_env: Env, h: raw::Local) -> Self { + Self(h) + } +} + +impl ValueInternal for JsPromise { + fn name() -> String { + "Promise".to_string() + } + + fn is_typeof(env: Env, other: Other) -> bool { + unsafe { neon_runtime::tag::is_promise(env.to_raw(), other.to_raw()) } + } +} + +impl Value for JsPromise {} + +impl Object for JsPromise {} + +#[cfg_attr(docsrs, doc(cfg(feature = "promise-api")))] +/// [`Deferred`] is a handle that can be used to resolve or reject a [`JsPromise`] +/// +/// It is recommended to settle a [`Deferred`] with [`Deferred::settle_with`] to ensure +/// exceptions are caught. +/// +/// On Node-API versions less than 6, dropping a [`Deferred`] without settling will +/// cause a panic. On Node-API 6+, the associated [`JsPromise`] will be automatically +/// rejected. +pub struct Deferred { + internal: Option, + #[cfg(feature = "napi-6")] + drop_queue: Arc>, +} + +impl Deferred { + /// Resolve a [`JsPromise`] with a JavaScript value + pub fn resolve<'a, V, C>(self, cx: &mut C, value: Handle) + where + V: Value, + C: Context<'a>, + { + unsafe { + napi::promise::resolve(cx.env().to_raw(), self.into_inner(), value.to_raw()); + } + } + + /// Reject a [`JsPromise`] with a JavaScript value + pub fn reject<'a, V, C>(self, cx: &mut C, value: Handle) + where + V: Value, + C: Context<'a>, + { + unsafe { + napi::promise::reject(cx.env().to_raw(), self.into_inner(), value.to_raw()); + } + } + + /// Resolve or reject a [`JsPromise`] with the result of a closure + /// + /// If the closure throws, the promise will be rejected with the promise + pub fn settle_with<'a, V, F, C>(self, cx: &mut C, f: F) + where + V: Value, + F: FnOnce(&mut C) -> JsResult<'a, V> + 'static, + C: Context<'a>, + { + match cx.try_catch_internal(f) { + Ok(value) => self.resolve(cx, value), + Err(err) => self.reject(cx, err), + } + } + + fn into_inner(mut self) -> napi::Deferred { + self.internal.take().unwrap().0 + } +} + +#[repr(transparent)] +pub(crate) struct NodeApiDeferred(napi::Deferred); + +unsafe impl Send for NodeApiDeferred {} + +#[cfg(feature = "napi-6")] +impl NodeApiDeferred { + pub(crate) unsafe fn leaked(self, env: raw::Env) { + napi::promise::reject_err_message( + env, + self.0, + "`neon::types::Deferred` was dropped without being settled", + ); + } +} + +impl Drop for Deferred { + #[cfg(not(feature = "napi-6"))] + fn drop(&mut self) { + // If `None`, the `Deferred` has already been settled + if self.internal.is_none() { + return; + } + + // Destructors are called during stack unwinding, prevent a double + // panic and instead prefer to leak. + if std::thread::panicking() { + eprintln!("Warning: neon::types::JsPromise leaked during a panic"); + return; + } + + // Only panic if the event loop is still running + if let Ok(true) = crate::context::internal::IS_RUNNING.try_with(|v| *v.borrow()) { + panic!("Must settle a `neon::types::JsPromise` with `neon::types::Deferred`"); + } + } + + #[cfg(feature = "napi-6")] + fn drop(&mut self) { + // If `None`, the `Deferred` has already been settled + if let Some(internal) = self.internal.take() { + let _ = self.drop_queue.call(DropData::Deferred(internal), None); + } + } +} diff --git a/test/napi/Cargo.toml b/test/napi/Cargo.toml index 6957a3a9d..8c3ef4176 100644 --- a/test/napi/Cargo.toml +++ b/test/napi/Cargo.toml @@ -13,4 +13,4 @@ crate-type = ["cdylib"] version = "*" path = "../.." default-features = false -features = ["default-panic-hook", "napi-6", "try-catch-api", "channel-api"] +features = ["default-panic-hook", "napi-6", "try-catch-api", "channel-api", "promise-api", "task-api"] diff --git a/test/napi/lib/threads.js b/test/napi/lib/threads.js index bef118c4a..854895132 100644 --- a/test/napi/lib/threads.js +++ b/test/napi/lib/threads.js @@ -93,4 +93,37 @@ const assert = require('chai').assert; msg = "Hello, World!"; }, 10); }); + + it('should be able to sum numbers on the libuv pool', async function () { + const nums = new Float64Array([...new Array(10000)].map(() => Math.random())); + const expected = nums.reduce((y, x) => y + x, 0); + const actual = await addon.sum(nums); + + assert.strictEqual(expected, actual); + }); + + it('should be able to resolve a promise manually', async function () { + const nums = new Float64Array([...new Array(10000)].map(() => Math.random())); + const expected = nums.reduce((y, x) => y + x, 0); + const actual = await addon.sum_manual_promise(nums); + + assert.strictEqual(expected, actual); + }); + + it('should be able to resolve a promise from a rust thread', async function () { + const nums = new Float64Array([...new Array(10000)].map(() => Math.random())); + const expected = nums.reduce((y, x) => y + x, 0); + const actual = await addon.sum_rust_thread(nums); + + assert.strictEqual(expected, actual); + }); + + it('should reject promise if leaked', async function () { + try { + await addon.leak_promise(); + } catch (err) { + assert.instanceOf(err, Error); + assert.ok(/Deferred/.test(err)); + } + }); }); diff --git a/test/napi/src/js/threads.rs b/test/napi/src/js/threads.rs index 8b6ce4446..8527a7b31 100644 --- a/test/napi/src/js/threads.rs +++ b/test/napi/src/js/threads.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use std::time::Duration; use neon::prelude::*; +use neon::types::buffer::TypedArray; pub fn useless_root(mut cx: FunctionContext) -> JsResult { let object = cx.argument::(0)?; @@ -229,3 +230,49 @@ pub fn channel_join(mut cx: FunctionContext) -> JsResult { Ok(cx.undefined()) } + +pub fn sum(mut cx: FunctionContext) -> JsResult { + let nums = cx.argument::>(0)?.as_slice(&cx).to_vec(); + + let promise = cx + .task(move || nums.into_iter().sum()) + .promise(|cx, n: f64| Ok(cx.number(n))); + + Ok(promise) +} + +pub fn sum_manual_promise(mut cx: FunctionContext) -> JsResult { + let nums = cx.argument::>(0)?.as_slice(&cx).to_vec(); + + let (deferred, promise) = cx.promise(); + + cx.task(move || nums.into_iter().sum()) + .and_then(move |cx, n: f64| { + let n = cx.number(n); + deferred.resolve(cx, n); + Ok(()) + }); + + Ok(promise) +} + +pub fn sum_rust_thread(mut cx: FunctionContext) -> JsResult { + let nums = cx.argument::>(0)?.as_slice(&cx).to_vec(); + + let channel = cx.channel(); + let (deferred, promise) = cx.promise(); + + std::thread::spawn(move || { + let n: f64 = nums.into_iter().sum(); + + channel.settle_with(deferred, move |cx| Ok(cx.number(n))); + }); + + Ok(promise) +} + +pub fn leak_promise(mut cx: FunctionContext) -> JsResult { + let (_, promise) = cx.promise(); + + Ok(promise) +} diff --git a/test/napi/src/lib.rs b/test/napi/src/lib.rs index 3c0f06523..cf0225763 100644 --- a/test/napi/src/lib.rs +++ b/test/napi/src/lib.rs @@ -256,6 +256,10 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> { cx.export_function("leak_channel", leak_channel)?; cx.export_function("drop_global_queue", drop_global_queue)?; cx.export_function("channel_join", channel_join)?; + cx.export_function("sum", sum)?; + cx.export_function("sum_manual_promise", sum_manual_promise)?; + cx.export_function("sum_rust_thread", sum_rust_thread)?; + cx.export_function("leak_promise", leak_promise)?; Ok(()) }