Skip to content

Commit

Permalink
Implement ThreadSafeFunction class
Browse files Browse the repository at this point in the history
This PR is implementing ThreadSafeFunction class wraps
napi_threadsafe_function features.

FYI, the test files that included in this PR have come from Node.js
repo[1]. They've been rewritten based on C++ and node-addon-api.

Fixes nodejs#312.

[1] https://github.com/nodejs/node/tree/master/test/node-api/test_threadsafe_function
  • Loading branch information
romandev committed Mar 6, 2019
1 parent fcfc612 commit a78aff8
Show file tree
Hide file tree
Showing 7 changed files with 579 additions and 0 deletions.
153 changes: 153 additions & 0 deletions napi-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -3618,6 +3618,159 @@ inline void AsyncWorker::OnWorkComplete(
delete self;
}

////////////////////////////////////////////////////////////////////////////////
// ThreadSafeFunction class
////////////////////////////////////////////////////////////////////////////////

// static
template <typename DataType, typename Finalizer,
typename Context, typename ResourceString>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
const Object& resource,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
DataType* data,
Finalizer finalizeCallback,
Context* context) {
static_assert(details::can_make_string<ResourceString>::value
|| std::is_convertible<ResourceString, napi_value>::value,
"Resource name should be string convertible type");

napi_threadsafe_function tsFunctionValue;
auto* finalizeData = new details::FinalizeData<DataType, Finalizer>({
finalizeCallback, context });
napi_status status = napi_create_threadsafe_function(env, callback, resource,
Value::From(env, resourceName), maxQueueSize, initialThreadCount, data,
details::FinalizeData<DataType, Finalizer, Context>::WrapperWithHint,
finalizeData, CallJS, &tsFunctionValue);
if (status != napi_ok) {
delete finalizeData;
NAPI_THROW_IF_FAILED(env, status, ThreadSafeFunction());
}

return ThreadSafeFunction(env, tsFunctionValue);
}

inline ThreadSafeFunction::Status ThreadSafeFunction::BlockingCall() const {
return CallInternal(nullptr, napi_tsfn_blocking);
}

template <typename Callback>
inline ThreadSafeFunction::Status ThreadSafeFunction::BlockingCall(
Callback callback) const {
return CallInternal(new CallbackWrapper(callback), napi_tsfn_blocking);
}

template <typename DataType, typename Callback>
inline ThreadSafeFunction::Status ThreadSafeFunction::BlockingCall(
DataType* data, Callback callback) const {
auto wrapper = [data, callback](Env env, Function jsCallback) {
callback(env, jsCallback, data);
};
return CallInternal(new CallbackWrapper(wrapper), napi_tsfn_blocking);
}

inline ThreadSafeFunction::Status ThreadSafeFunction::NonBlockingCall() const {
return CallInternal(nullptr, napi_tsfn_nonblocking);
}

template <typename Callback>
inline ThreadSafeFunction::Status ThreadSafeFunction::NonBlockingCall(
Callback callback) const {
return CallInternal(new CallbackWrapper(callback), napi_tsfn_nonblocking);
}

template <typename DataType, typename Callback>
inline ThreadSafeFunction::Status ThreadSafeFunction::NonBlockingCall(
DataType* data, Callback callback) const {
auto wrapper = [data, callback](Env env, Function jsCallback) {
callback(env, jsCallback, data);
};
return CallInternal(new CallbackWrapper(wrapper), napi_tsfn_nonblocking);
}

inline bool ThreadSafeFunction::Acquire() const {
return !IsAborted() && napi_acquire_threadsafe_function(
_tsFunctionValue) == napi_ok;
}

inline bool ThreadSafeFunction::Release() {
return !IsAborted() && napi_release_threadsafe_function(
_tsFunctionValue, napi_tsfn_release) == napi_ok;
}

inline bool ThreadSafeFunction::Abort() {
if (IsAborted()) {
return false;
}

napi_status status = napi_release_threadsafe_function(
_tsFunctionValue, napi_tsfn_abort);

_tsFunctionValue = nullptr;
_env = nullptr;

return status == napi_ok;
}

inline bool ThreadSafeFunction::IsAborted() const {
return _env == nullptr || _tsFunctionValue == nullptr;
}

inline ThreadSafeFunction::ThreadSafeFunction()
: _env(nullptr),
_tsFunctionValue(nullptr) {
}

inline ThreadSafeFunction::ThreadSafeFunction(
napi_env env, napi_threadsafe_function tsFunctionValue)
: _env(env),
_tsFunctionValue(tsFunctionValue) {
}

inline ThreadSafeFunction::Status ThreadSafeFunction::CallInternal(
CallbackWrapper* callbackWrapper,
napi_threadsafe_function_call_mode mode) const {
if (IsAborted()) {
return CLOSE;
}
napi_status status = napi_call_threadsafe_function(
_tsFunctionValue, callbackWrapper, mode);
if (status != napi_ok && callbackWrapper != nullptr) {
delete callbackWrapper;
}

switch (status) {
case napi_ok:
return OK;
case napi_closing:
return CLOSE;
case napi_queue_full:
return FULL;
default:
return ERROR;
}
}

// static
inline void ThreadSafeFunction::CallJS(napi_env env,
napi_value jsCallback,
void* /* context */,
void* data) {
if (env == nullptr && jsCallback == nullptr)
return;

if (data != nullptr) {
auto* callbackWrapper = static_cast<CallbackWrapper*>(data);
(*callbackWrapper)(env, Function(env, jsCallback));
delete callbackWrapper;
} else {
Function(env, jsCallback).Call({});
}
}

////////////////////////////////////////////////////////////////////////////////
// Memory Management class
////////////////////////////////////////////////////////////////////////////////
Expand Down
62 changes: 62 additions & 0 deletions napi.h
Original file line number Diff line number Diff line change
Expand Up @@ -1818,6 +1818,68 @@ namespace Napi {
std::string _error;
};

class ThreadSafeFunction {
public:
enum Status {
CLOSE,
FULL,
ERROR,
OK
};

template <typename DataType, typename Finalizer,
typename Context, typename ResourceString>
static ThreadSafeFunction New(napi_env env,
const Function& callback,
const Object& resource,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
DataType* data,
Finalizer finalizeCallback,
Context* context);

ThreadSafeFunction();

Status BlockingCall() const;

template <typename Callback>
Status BlockingCall(Callback callback) const;

template <typename DataType, typename Callback>
Status BlockingCall(DataType* data, Callback callback) const;

Status NonBlockingCall() const;

template <typename Callback>
Status NonBlockingCall(Callback callback) const;

template <typename DataType, typename Callback>
Status NonBlockingCall(DataType* data, Callback callback) const;

bool Acquire() const;
bool Release();
bool Abort();

bool IsAborted() const;

private:
using CallbackWrapper = std::function<void(Napi::Env, Napi::Function)>;

ThreadSafeFunction(napi_env env, napi_threadsafe_function tsFunctionValue);

Status CallInternal(CallbackWrapper* callbackWrapper,
napi_threadsafe_function_call_mode mode) const;

static void CallJS(napi_env env,
napi_value jsCallback,
void* context,
void* data);

napi_env _env;
napi_threadsafe_function _tsFunctionValue;
};

// Memory management.
class MemoryManagement {
public:
Expand Down
6 changes: 6 additions & 0 deletions test/binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ Object InitObject(Env env);
Object InitObjectDeprecated(Env env);
#endif // !NODE_ADDON_API_DISABLE_DEPRECATED
Object InitPromise(Env env);
#if (NAPI_VERSION > 3)
Object InitThreadSafeFunction(Env env);
#endif
Object InitTypedArray(Env env);
Object InitObjectWrap(Env env);
Object InitObjectReference(Env env);
Expand Down Expand Up @@ -69,6 +72,9 @@ Object Init(Env env, Object exports) {
exports.Set("object_deprecated", InitObjectDeprecated(env));
#endif // !NODE_ADDON_API_DISABLE_DEPRECATED
exports.Set("promise", InitPromise(env));
#if (NAPI_VERSION > 3)
exports.Set("threadsafe_function", InitThreadSafeFunction(env));
#endif
exports.Set("typedarray", InitTypedArray(env));
exports.Set("objectwrap", InitObjectWrap(env));
exports.Set("objectreference", InitObjectReference(env));
Expand Down
1 change: 1 addition & 0 deletions test/binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
'object/object.cc',
'object/set_property.cc',
'promise.cc',
'threadsafe_function/threadsafe_function.cc',
'typedarray.cc',
'objectwrap.cc',
'objectreference.cc',
Expand Down
1 change: 1 addition & 0 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ let testModules = [
'object/object_deprecated',
'object/set_property',
'promise',
'threadsafe_function/threadsafe_function',
'typedarray',
'typedarray-bigint',
'objectwrap',
Expand Down
Loading

0 comments on commit a78aff8

Please sign in to comment.