Skip to content

Commit

Permalink
Implement ThreadSafeFunction class
Browse files Browse the repository at this point in the history
This PR is implementing ThreadSafeFunction class wraps
napi_threadsafe_function features.

FYI, the test files that included in this PR have come from Node.js
repo[1]. They've been rewritten based on C++ and node-addon-api.

Fixes nodejs#312.

[1] https://github.com/nodejs/node/tree/master/test/node-api/test_threadsafe_function
  • Loading branch information
romandev committed Apr 15, 2019
1 parent 36863f0 commit 2d54968
Show file tree
Hide file tree
Showing 7 changed files with 785 additions and 0 deletions.
290 changes: 290 additions & 0 deletions napi-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,21 @@ struct FinalizeData {
Hint* hint;
};

template <typename DataType, typename Finalizer, typename Context=void>
struct ThreadSafeFinalizeData {
static inline
void Wrapper(napi_env env, void* rawFinalizeData, void* rawContext) {
ThreadSafeFinalizeData* finalizeData =
static_cast<ThreadSafeFinalizeData*>(rawFinalizeData);
finalizeData->callback(Env(env), finalizeData->data,
static_cast<Context*>(rawContext));
delete finalizeData;
}

DataType* data;
Finalizer callback;
};

template <typename Getter, typename Setter>
struct AccessorCallbackData {
static inline
Expand Down Expand Up @@ -3627,6 +3642,281 @@ inline void AsyncWorker::OnWorkComplete(
}
}

////////////////////////////////////////////////////////////////////////////////
// ThreadSafeFunction class
////////////////////////////////////////////////////////////////////////////////

// static
template <typename ResourceString>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount) {
return New(env, callback, Object(), resourceName, maxQueueSize,
initialThreadCount, static_cast<void*>(nullptr));
}

// static
template <typename ResourceString>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
const Object& resource,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount) {
return New(env, callback, resource, resourceName, maxQueueSize,
initialThreadCount, static_cast<void*>(nullptr));
}

// static
template <typename DataType, typename Finalizer, typename ResourceString>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
DataType* data,
Finalizer finalizeCallback) {
return New(env, callback, Object(), resourceName, maxQueueSize,
initialThreadCount, data, finalizeCallback);
}

// static
template <typename Context, typename ResourceString>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
Context* context) {
return New(env, callback, Object(), resourceName, maxQueueSize,
initialThreadCount, context);
}

// static
template <typename DataType, typename Finalizer,
typename Context, typename ResourceString>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
DataType* data,
Finalizer finalizeCallback,
Context* context) {
return New(env, callback, Object(), resourceName, maxQueueSize,
initialThreadCount, data, finalizeCallback, context);
}

// static
template <typename DataType, typename Finalizer, typename ResourceString>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
const Object& resource,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
DataType* data,
Finalizer finalizeCallback) {
static_assert(details::can_make_string<ResourceString>::value
|| std::is_convertible<ResourceString, napi_value>::value,
"Resource name should be string convertible type");

napi_threadsafe_function tsFunctionValue;
auto* finalizeData = new details::ThreadSafeFinalizeData<
DataType, Finalizer>({ data, finalizeCallback });
napi_status status = napi_create_threadsafe_function(env, callback, resource,
Value::From(env, resourceName), maxQueueSize,
initialThreadCount, finalizeData,
details::ThreadSafeFinalizeData<DataType, Finalizer>::Wrapper,
nullptr, CallJS, &tsFunctionValue);
if (status != napi_ok) {
delete finalizeData;
NAPI_THROW_IF_FAILED(env, status, ThreadSafeFunction());
}

return ThreadSafeFunction(env, tsFunctionValue);
}

// static
template <typename Context, typename ResourceString>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
const Object& resource,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
Context* context) {
static_assert(details::can_make_string<ResourceString>::value
|| std::is_convertible<ResourceString, napi_value>::value,
"Resource name should be string convertible type");

napi_threadsafe_function tsFunctionValue;
napi_status status = napi_create_threadsafe_function(env, callback, resource,
Value::From(env, resourceName), maxQueueSize, initialThreadCount, nullptr,
nullptr, context, CallJS, &tsFunctionValue);
NAPI_THROW_IF_FAILED(env, status, ThreadSafeFunction());

return ThreadSafeFunction(env, tsFunctionValue);
}

// static
template <typename DataType, typename Finalizer,
typename Context, typename ResourceString>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
const Object& resource,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
DataType* data,
Finalizer finalizeCallback,
Context* context) {
static_assert(details::can_make_string<ResourceString>::value
|| std::is_convertible<ResourceString, napi_value>::value,
"Resource name should be string convertible type");

napi_threadsafe_function tsFunctionValue;
auto* finalizeData = new details::ThreadSafeFinalizeData<
DataType, Finalizer, Context>({ data, finalizeCallback });
napi_status status = napi_create_threadsafe_function(env, callback, resource,
Value::From(env, resourceName), maxQueueSize,
initialThreadCount, finalizeData,
details::ThreadSafeFinalizeData<DataType, Finalizer, Context>::Wrapper,
context, CallJS, &tsFunctionValue);
if (status != napi_ok) {
delete finalizeData;
NAPI_THROW_IF_FAILED(env, status, ThreadSafeFunction());
}

return ThreadSafeFunction(env, tsFunctionValue);
}

inline ThreadSafeFunction::Status ThreadSafeFunction::BlockingCall() const {
return CallInternal(nullptr, napi_tsfn_blocking);
}

template <typename Callback>
inline ThreadSafeFunction::Status ThreadSafeFunction::BlockingCall(
Callback callback) const {
return CallInternal(new CallbackWrapper(callback), napi_tsfn_blocking);
}

template <typename DataType, typename Callback>
inline ThreadSafeFunction::Status ThreadSafeFunction::BlockingCall(
DataType* data, Callback callback) const {
auto wrapper = [data, callback](Env env, Function jsCallback) {
callback(env, jsCallback, data);
};
return CallInternal(new CallbackWrapper(wrapper), napi_tsfn_blocking);
}

inline ThreadSafeFunction::Status ThreadSafeFunction::NonBlockingCall() const {
return CallInternal(nullptr, napi_tsfn_nonblocking);
}

template <typename Callback>
inline ThreadSafeFunction::Status ThreadSafeFunction::NonBlockingCall(
Callback callback) const {
return CallInternal(new CallbackWrapper(callback), napi_tsfn_nonblocking);
}

template <typename DataType, typename Callback>
inline ThreadSafeFunction::Status ThreadSafeFunction::NonBlockingCall(
DataType* data, Callback callback) const {
auto wrapper = [data, callback](Env env, Function jsCallback) {
callback(env, jsCallback, data);
};
return CallInternal(new CallbackWrapper(wrapper), napi_tsfn_nonblocking);
}

inline bool ThreadSafeFunction::Acquire() const {
return napi_acquire_threadsafe_function(_tsFunctionValue) == napi_ok;
}

inline bool ThreadSafeFunction::Release() {
return napi_release_threadsafe_function(
_tsFunctionValue, napi_tsfn_release) == napi_ok;
}

inline bool ThreadSafeFunction::Abort() {
if (IsAborted()) {
return false;
}

napi_status status = napi_release_threadsafe_function(
_tsFunctionValue, napi_tsfn_abort);

_tsFunctionValue = nullptr;
_env = nullptr;

return status == napi_ok;
}

inline bool ThreadSafeFunction::IsAborted() const {
return _env == nullptr || _tsFunctionValue == nullptr;
}

inline ThreadSafeFunction::ConvertibleContext
ThreadSafeFunction::GetContext() const {
void* context;
napi_get_threadsafe_function_context(_tsFunctionValue, &context);
return ConvertibleContext({ context });
}

inline ThreadSafeFunction::ThreadSafeFunction()
: _env(nullptr),
_tsFunctionValue(nullptr) {
}

inline ThreadSafeFunction::ThreadSafeFunction(
napi_env env, napi_threadsafe_function tsFunctionValue)
: _env(env),
_tsFunctionValue(tsFunctionValue) {
}

inline ThreadSafeFunction::Status ThreadSafeFunction::CallInternal(
CallbackWrapper* callbackWrapper,
napi_threadsafe_function_call_mode mode) const {
if (IsAborted()) {
return CLOSE;
}
napi_status status = napi_call_threadsafe_function(
_tsFunctionValue, callbackWrapper, mode);
if (status != napi_ok && callbackWrapper != nullptr) {
delete callbackWrapper;
}

switch (status) {
case napi_ok:
return OK;
case napi_closing:
return CLOSE;
case napi_queue_full:
return FULL;
default:
return ERROR;
}
}

// static
inline void ThreadSafeFunction::CallJS(napi_env env,
napi_value jsCallback,
void* /* context */,
void* data) {
if (env == nullptr && jsCallback == nullptr)
return;

if (data != nullptr) {
auto* callbackWrapper = static_cast<CallbackWrapper*>(data);
(*callbackWrapper)(env, Function(env, jsCallback));
delete callbackWrapper;
} else {
Function(env, jsCallback).Call({});
}
}

////////////////////////////////////////////////////////////////////////////////
// Memory Management class
////////////////////////////////////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit 2d54968

Please sign in to comment.