Skip to content

Commit

Permalink
Implement ThreadSafeFunction class
Browse files Browse the repository at this point in the history
This PR is implementing ThreadSafeFunction class wraps
napi_threadsafe_function features.

FYI, the test files that included in this PR have come from Node.js
repo[1]. They've been rewritten based on C++ and node-addon-api.

Fixes nodejs#312.

[1] https://github.com/nodejs/node/tree/master/test/node-api/test_threadsafe_function
  • Loading branch information
romandev committed May 8, 2019
1 parent 36863f0 commit 8a20144
Show file tree
Hide file tree
Showing 7 changed files with 933 additions and 0 deletions.
382 changes: 382 additions & 0 deletions napi-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,65 @@ struct FinalizeData {
Hint* hint;
};

template <typename ContextType=void,
typename Finalizer=std::function<void(Env, void*, ContextType*)>,
typename FinalizerDataType=void>
struct ThreadSafeFinalize {
static inline
void Wrapper(napi_env env, void* rawFinalizeData, void* /* rawContext */) {
if (rawFinalizeData == nullptr)
return;

ThreadSafeFinalize* finalizeData =
static_cast<ThreadSafeFinalize*>(rawFinalizeData);
finalizeData->callback(Env(env));
delete finalizeData;
}

static inline
void WrapperWithData(napi_env env,
void* rawFinalizeData,
void* /* rawContext */) {
if (rawFinalizeData == nullptr)
return;

ThreadSafeFinalize* finalizeData =
static_cast<ThreadSafeFinalize*>(rawFinalizeData);
finalizeData->callback(Env(env), finalizeData->data);
delete finalizeData;
}

static inline
void WrapperWithContext(napi_env env,
void* rawFinalizeData,
void* rawContext) {
if (rawFinalizeData == nullptr)
return;

ThreadSafeFinalize* finalizeData =
static_cast<ThreadSafeFinalize*>(rawFinalizeData);
finalizeData->callback(Env(env), static_cast<ContextType*>(rawContext));
delete finalizeData;
}

static inline
void WrapperWithDataAndContext(napi_env env,
void* rawFinalizeData,
void* rawContext) {
if (rawFinalizeData == nullptr)
return;

ThreadSafeFinalize* finalizeData =
static_cast<ThreadSafeFinalize*>(rawFinalizeData);
finalizeData->callback(Env(env), finalizeData->data,
static_cast<ContextType*>(rawContext));
delete finalizeData;
}

FinalizerDataType* data;
Finalizer callback;
};

template <typename Getter, typename Setter>
struct AccessorCallbackData {
static inline
Expand Down Expand Up @@ -3627,6 +3686,329 @@ inline void AsyncWorker::OnWorkComplete(
}
}

////////////////////////////////////////////////////////////////////////////////
// ThreadSafeFunction class
////////////////////////////////////////////////////////////////////////////////

// static
template <typename ResourceString>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount) {
return New(env, callback, Object(), resourceName, maxQueueSize,
initialThreadCount);
}

// static
template <typename ResourceString, typename ContextType>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
ContextType* context) {
return New(env, callback, Object(), resourceName, maxQueueSize,
initialThreadCount, context);
}

// static
template <typename ResourceString, typename Finalizer>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
Finalizer finalizeCallback) {
return New(env, callback, Object(), resourceName, maxQueueSize,
initialThreadCount, finalizeCallback);
}

// static
template <typename ResourceString, typename Finalizer,
typename FinalizerDataType>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
Finalizer finalizeCallback,
FinalizerDataType* data) {
return New(env, callback, Object(), resourceName, maxQueueSize,
initialThreadCount, finalizeCallback, data);
}

// static
template <typename ResourceString, typename ContextType, typename Finalizer>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
ContextType* context,
Finalizer finalizeCallback) {
return New(env, callback, Object(), resourceName, maxQueueSize,
initialThreadCount, context, finalizeCallback);
}

// static
template <typename ResourceString, typename ContextType,
typename Finalizer, typename FinalizerDataType>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
ContextType* context,
Finalizer finalizeCallback,
FinalizerDataType* data) {
return New(env, callback, Object(), resourceName, maxQueueSize,
initialThreadCount, context, finalizeCallback, data);
}

// static
template <typename ResourceString>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
const Object& resource,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount) {
return New(env, callback, resource, resourceName, maxQueueSize,
initialThreadCount, static_cast<void*>(nullptr) /* context */);
}

// static
template <typename ResourceString, typename ContextType>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
const Object& resource,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
ContextType* context) {
return New(env, callback, resource, resourceName, maxQueueSize,
initialThreadCount, context,
[](Env, ContextType*) {} /* empty finalizer */);
}

// static
template <typename ResourceString, typename Finalizer>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
const Object& resource,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
Finalizer finalizeCallback) {
return New(env, callback, resource, resourceName, maxQueueSize,
initialThreadCount, static_cast<void*>(nullptr) /* context */,
finalizeCallback, static_cast<void*>(nullptr) /* data */,
details::ThreadSafeFinalize<void, Finalizer>::Wrapper);
}

// static
template <typename ResourceString, typename Finalizer,
typename FinalizerDataType>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
const Object& resource,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
Finalizer finalizeCallback,
FinalizerDataType* data) {
return New(env, callback, resource, resourceName, maxQueueSize,
initialThreadCount, static_cast<void*>(nullptr) /* context */,
finalizeCallback, data,
details::ThreadSafeFinalize<
void, Finalizer, FinalizerDataType>::WrapperWithData);
}

// static
template <typename ResourceString, typename ContextType, typename Finalizer>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
const Object& resource,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
ContextType* context,
Finalizer finalizeCallback) {
return New(env, callback, resource, resourceName, maxQueueSize,
initialThreadCount, context, finalizeCallback,
static_cast<void*>(nullptr) /* data */,
details::ThreadSafeFinalize<
ContextType, Finalizer>::WrapperWithContext);
}

// static
template <typename ResourceString, typename ContextType,
typename Finalizer, typename FinalizerDataType>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
const Object& resource,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
ContextType* context,
Finalizer finalizeCallback,
FinalizerDataType* data) {
return New(env, callback, resource, resourceName, maxQueueSize,
initialThreadCount, context, finalizeCallback, data,
details::ThreadSafeFinalize<ContextType, Finalizer,
FinalizerDataType>::WrapperWithDataAndContext);
}

inline ThreadSafeFunction::ThreadSafeFunction()
: _env(nullptr),
_tsFunctionValue(nullptr) {
}

inline ThreadSafeFunction::ThreadSafeFunction(
napi_env env, napi_threadsafe_function tsFunctionValue)
: _env(env),
_tsFunctionValue(tsFunctionValue) {
}

inline ThreadSafeFunction::ThreadSafeFunction(ThreadSafeFunction&& other)
: _env(other._env), _tsFunctionValue(other._tsFunctionValue) {
other._env = nullptr;
other._tsFunctionValue = nullptr;
}

inline ThreadSafeFunction& ThreadSafeFunction::operator =(
ThreadSafeFunction&& other) {
_env = other._env;
_tsFunctionValue = other._tsFunctionValue;
other._env = nullptr;
other._tsFunctionValue = nullptr;
return *this;
}

inline napi_status ThreadSafeFunction::BlockingCall() const {
return CallInternal(nullptr, napi_tsfn_blocking);
}

template <typename Callback>
inline napi_status ThreadSafeFunction::BlockingCall(
Callback callback) const {
return CallInternal(new CallbackWrapper(callback), napi_tsfn_blocking);
}

template <typename DataType, typename Callback>
inline napi_status ThreadSafeFunction::BlockingCall(
DataType* data, Callback callback) const {
auto wrapper = [data, callback](Env env, Function jsCallback) {
callback(env, jsCallback, data);
};
return CallInternal(new CallbackWrapper(wrapper), napi_tsfn_blocking);
}

inline napi_status ThreadSafeFunction::NonBlockingCall() const {
return CallInternal(nullptr, napi_tsfn_nonblocking);
}

template <typename Callback>
inline napi_status ThreadSafeFunction::NonBlockingCall(
Callback callback) const {
return CallInternal(new CallbackWrapper(callback), napi_tsfn_nonblocking);
}

template <typename DataType, typename Callback>
inline napi_status ThreadSafeFunction::NonBlockingCall(
DataType* data, Callback callback) const {
auto wrapper = [data, callback](Env env, Function jsCallback) {
callback(env, jsCallback, data);
};
return CallInternal(new CallbackWrapper(wrapper), napi_tsfn_nonblocking);
}

inline napi_status ThreadSafeFunction::Acquire() const {
return napi_acquire_threadsafe_function(_tsFunctionValue);
}

inline napi_status ThreadSafeFunction::Release() {
return napi_release_threadsafe_function(
_tsFunctionValue, napi_tsfn_release);
}

inline napi_status ThreadSafeFunction::Abort() {
return napi_release_threadsafe_function(
_tsFunctionValue, napi_tsfn_abort);
}

inline ThreadSafeFunction::ConvertibleContext
ThreadSafeFunction::GetContext() const {
void* context;
napi_get_threadsafe_function_context(_tsFunctionValue, &context);
return ConvertibleContext({ context });
}

// static
template <typename ResourceString, typename ContextType,
typename Finalizer, typename FinalizerDataType>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
const Object& resource,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
ContextType* context,
Finalizer finalizeCallback,
FinalizerDataType* data,
napi_finalize wrapper) {
static_assert(details::can_make_string<ResourceString>::value
|| std::is_convertible<ResourceString, napi_value>::value,
"Resource name should be string convertible type");

auto* finalizeData = new details::ThreadSafeFinalize<
ContextType, Finalizer, FinalizerDataType>({ data, finalizeCallback });
napi_threadsafe_function tsFunctionValue;
napi_status status = napi_create_threadsafe_function(env, callback, resource,
Value::From(env, resourceName), maxQueueSize, initialThreadCount,
finalizeData, wrapper, context, CallJS, &tsFunctionValue);
if (status != napi_ok) {
delete finalizeData;
NAPI_THROW_IF_FAILED(env, status, ThreadSafeFunction());
}

return ThreadSafeFunction(env, tsFunctionValue);
}

inline napi_status ThreadSafeFunction::CallInternal(
CallbackWrapper* callbackWrapper,
napi_threadsafe_function_call_mode mode) const {
if (_env == nullptr || _tsFunctionValue == nullptr) {
return napi_closing;
}
napi_status status = napi_call_threadsafe_function(
_tsFunctionValue, callbackWrapper, mode);
if (status != napi_ok && callbackWrapper != nullptr) {
delete callbackWrapper;
}

return status;
}

// static
inline void ThreadSafeFunction::CallJS(napi_env env,
napi_value jsCallback,
void* /* context */,
void* data) {
if (env == nullptr && jsCallback == nullptr)
return;

if (data != nullptr) {
auto* callbackWrapper = static_cast<CallbackWrapper*>(data);
(*callbackWrapper)(env, Function(env, jsCallback));
delete callbackWrapper;
} else {
Function(env, jsCallback).Call({});
}
}

////////////////////////////////////////////////////////////////////////////////
// Memory Management class
////////////////////////////////////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit 8a20144

Please sign in to comment.