Skip to content

Commit

Permalink
[WIP] Implement ThreadSafeFunction class
Browse files Browse the repository at this point in the history
This PR is implementing ThreadSafeFunction class wraps
napi_threadsafe_function*.

Fixes nodejs#312.
  • Loading branch information
romandev committed Feb 5, 2019
1 parent 28df833 commit 3d02ab3
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 1 deletion.
53 changes: 53 additions & 0 deletions napi-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -3676,6 +3676,59 @@ inline void AsyncWorker::OnWorkComplete(
delete self;
}

////////////////////////////////////////////////////////////////////////////////
// ThreadSafeFunction class
////////////////////////////////////////////////////////////////////////////////

inline ThreadSafeFunction::ThreadSafeFunction() {
}

inline ThreadSafeFunction::ThreadSafeFunction(napi_env env,
const Function& callback,
const char* resource_name,
const Object& resource,
size_t max_queue_size,
size_t initial_thread_count)
: _env(env) {
napi_value resource_id;
napi_status status = napi_create_string_utf8(
_env, resource_name, NAPI_AUTO_LENGTH, &resource_id);
NAPI_THROW_IF_FAILED_VOID(_env, status);

status = napi_create_threadsafe_function(
_env, callback, resource, resource_id, max_queue_size,
initial_thread_count, nullptr, nullptr, nullptr, CallJS,
&_ts_fn);
NAPI_THROW_IF_FAILED_VOID(_env, status);
}

inline napi_status ThreadSafeFunction::Call(Callback callback, napi_threadsafe_function_call_mode mode) const {
Callback* callback_container = new Callback();
*callback_container = std::move(callback);
return napi_call_threadsafe_function(_ts_fn, callback_container, mode);
}

inline bool ThreadSafeFunction::Acquire() const {
return napi_acquire_threadsafe_function(_ts_fn) == napi_ok;
}

inline bool ThreadSafeFunction::Release(napi_threadsafe_function_release_mode mode) const {
return napi_release_threadsafe_function(_ts_fn, mode) == napi_ok;
}

//static
inline void ThreadSafeFunction::CallJS(napi_env env,
napi_value js_callback,
void* /* hint */,
void* data) {
if (env == nullptr && js_callback == nullptr)
return;

Callback* callback_container = static_cast<Callback*>(data);
(*callback_container)(env, Function(env, js_callback));
delete callback_container;
}

////////////////////////////////////////////////////////////////////////////////
// Memory Management class
////////////////////////////////////////////////////////////////////////////////
Expand Down
27 changes: 27 additions & 0 deletions napi.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef SRC_NAPI_H_
#define SRC_NAPI_H_

#define NAPI_EXPERIMENTAL
#include "node_api.h"
#include <functional>
#include <initializer_list>
Expand Down Expand Up @@ -1760,6 +1761,32 @@ namespace Napi {
std::string _error;
};

class ThreadSafeFunction {
public:
using Callback = std::function<void(Napi::Env, Napi::Function)>;

ThreadSafeFunction();
ThreadSafeFunction(napi_env env,
const Function& callback,
const char* resource_name,
const Object& resource,
size_t max_queue_size,
size_t initial_thread_count);

napi_status Call(Callback callback, napi_threadsafe_function_call_mode mode) const;
bool Acquire() const;
bool Release(napi_threadsafe_function_release_mode mode) const;

private:
static void CallJS(napi_env env,
napi_value callback,
void* hint,
void* data);

napi_env _env;
napi_threadsafe_function _ts_fn;
};

// Memory management.
class MemoryManagement {
public:
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
"url": "git://github.com/nodejs/node-addon-api.git"
},
"scripts": {
"pretest": "node-gyp rebuild -C test",
"pretest": "node-gyp build -C test",
"test": "node test",
"doc": "doxygen doc/Doxyfile"
},
Expand Down
6 changes: 6 additions & 0 deletions test/binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ Object InitObject(Env env);
Object InitObjectDeprecated(Env env);
#endif // !NODE_ADDON_API_DISABLE_DEPRECATED
Object InitPromise(Env env);
#if (NAPI_VERSION > 3)
Object InitThreadSafeFunction(Env env);
#endif
Object InitTypedArray(Env env);
Object InitObjectWrap(Env env);
Object InitObjectReference(Env env);
Expand Down Expand Up @@ -69,6 +72,9 @@ Object Init(Env env, Object exports) {
exports.Set("object_deprecated", InitObjectDeprecated(env));
#endif // !NODE_ADDON_API_DISABLE_DEPRECATED
exports.Set("promise", InitPromise(env));
#if (NAPI_VERSION > 3)
exports.Set("threadsafe_function", InitThreadSafeFunction(env));
#endif
exports.Set("typedarray", InitTypedArray(env));
exports.Set("objectwrap", InitObjectWrap(env));
exports.Set("objectreference", InitObjectReference(env));
Expand Down
1 change: 1 addition & 0 deletions test/binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
'object/object.cc',
'object/set_property.cc',
'promise.cc',
'threadsafe_function/threadsafe_function.cc',
'typedarray.cc',
'objectwrap.cc',
'objectreference.cc',
Expand Down
1 change: 1 addition & 0 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ let testModules = [
'object/object_deprecated',
'object/set_property',
'promise',
'threadsafe_function/threadsafe_function',
'typedarray',
'typedarray-bigint',
'objectwrap',
Expand Down
85 changes: 85 additions & 0 deletions test/threadsafe_function/threadsafe_function.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#define NAPI_EXPERIMENTAL
#include <uv.h>
#include "napi.h"

using namespace Napi;

const size_t ARRAY_LENGTH = 10;
const size_t MAX_QUEUE_SIZE = 2;

// Thread data to transmit to JS
static int ints[ARRAY_LENGTH];

static uv_thread_t uv_threads[2];
static napi_threadsafe_function ts_fn;
static ThreadSafeFunction ts_fn_wrap;

int sum = 0;

static void data_source_thread(void* data) {
int args = 15;
ts_fn_wrap.Call([args](Napi::Env env, Napi::Function jsCallback) {
jsCallback.Call({ Number::New(env, args) });
}, napi_tsfn_blocking);
sum += 5;
if (data) {}
}

static Value StartThreadInternal(const CallbackInfo& info, bool block_on_full) {
ts_fn_wrap = ThreadSafeFunction(info.Env(), info[0].As<Function>(), "", Object(), 10, 2);

if (uv_thread_create(&uv_threads[0], data_source_thread, (void*)ts_fn) == 0) {
printf("created thread1\n");
}
if (uv_thread_create(&uv_threads[1], data_source_thread, (void*)ts_fn) == 0) {
printf("created thread2\n");
}

uv_thread_join(&uv_threads[1]);
uv_thread_join(&uv_threads[0]);
printf("%d\n", sum);
if (block_on_full) {}

return Object::New(info.Env());
}

static Value StartThread(const CallbackInfo& info) {
return StartThreadInternal(info, true);
}

static Value StartThreadNoNative(const CallbackInfo& info) {
return StartThreadInternal(info, true);
}

static Value StartThreadNonblocking(const CallbackInfo& info) {
return StartThreadInternal(info, false);
}

static Value StopThread(const CallbackInfo& info) {
return Object::New(info.Env());
}

static Value Unref(const CallbackInfo& info) {
return Object::New(info.Env());
}

static Value Release(const CallbackInfo& info) {
return Object::New(info.Env());
}

Object InitThreadSafeFunction(Env env) {
for (size_t index = 0; index < ARRAY_LENGTH; index++) {
ints[index] = index;
}

Object exports = Object::New(env);
exports["ARRAY_LENGTH"] = Number::New(env, ARRAY_LENGTH);
exports["MAX_QUEUE_SIZE"] = Number::New(env, MAX_QUEUE_SIZE);
exports["startThread"] = Function::New(env, StartThread);
exports["startThreadNoNative"] = Function::New(env, StartThreadNoNative);
exports["startThreadNonblocking"] = Function::New(env, StartThreadNonblocking);
exports["stopThread"] = Function::New(env, StopThread);
exports["unref"] = Function::New(env, Unref);
exports["release"] = Function::New(env, Release);
return exports;
}
15 changes: 15 additions & 0 deletions test/threadsafe_function/threadsafe_function.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
'use strict';

const buildType = process.config.target_defaults.default_configuration;
const assert = require('assert');

test(require(`../build/${buildType}/binding.node`));
test(require(`../build/${buildType}/binding_noexcept.node`));

function test(binding) {
binding.threadsafe_function.startThread(function(number) {
console.log("callback-ok?" + number);
}, "hello");
console.log(binding.threadsafe_function.ARRAY_LENGTH);
console.log(binding.threadsafe_function.MAX_QUEUE_SIZE);
}

0 comments on commit 3d02ab3

Please sign in to comment.