Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tsfn: Implement copy constructor #546

Merged
merged 13 commits into from
Nov 1, 2019
39 changes: 13 additions & 26 deletions napi-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -4018,29 +4018,16 @@ inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
}

inline ThreadSafeFunction::ThreadSafeFunction()
: _tsfn(new napi_threadsafe_function(nullptr), _d) {
: _tsfn() {
}

inline ThreadSafeFunction::ThreadSafeFunction(
napi_threadsafe_function tsfn)
: _tsfn(new napi_threadsafe_function(tsfn), _d) {
: _tsfn(tsfn) {
}

inline ThreadSafeFunction::ThreadSafeFunction(ThreadSafeFunction&& other)
: _tsfn(std::move(other._tsfn)) {
other._tsfn.reset();
}

inline ThreadSafeFunction& ThreadSafeFunction::operator =(
ThreadSafeFunction&& other) {
if (*_tsfn != nullptr) {
Error::Fatal("ThreadSafeFunction::operator =",
"You cannot assign a new TSFN because existing one is still alive.");
return *this;
}
_tsfn = std::move(other._tsfn);
other._tsfn.reset();
return *this;
inline ThreadSafeFunction::operator napi_threadsafe_function() const {
return _tsfn;
}

inline napi_status ThreadSafeFunction::BlockingCall() const {
Expand Down Expand Up @@ -4083,34 +4070,34 @@ inline napi_status ThreadSafeFunction::NonBlockingCall(

inline void ThreadSafeFunction::Ref(napi_env env) const {
if (_tsfn != nullptr) {
napi_status status = napi_ref_threadsafe_function(env, *_tsfn);
napi_status status = napi_ref_threadsafe_function(env, _tsfn);
NAPI_THROW_IF_FAILED_VOID(env, status);
}
}

inline void ThreadSafeFunction::Unref(napi_env env) const {
if (_tsfn != nullptr) {
napi_status status = napi_unref_threadsafe_function(env, *_tsfn);
napi_status status = napi_unref_threadsafe_function(env, _tsfn);
NAPI_THROW_IF_FAILED_VOID(env, status);
}
}

inline napi_status ThreadSafeFunction::Acquire() const {
return napi_acquire_threadsafe_function(*_tsfn);
return napi_acquire_threadsafe_function(_tsfn);
}

inline napi_status ThreadSafeFunction::Release() {
return napi_release_threadsafe_function(*_tsfn, napi_tsfn_release);
return napi_release_threadsafe_function(_tsfn, napi_tsfn_release);
}

inline napi_status ThreadSafeFunction::Abort() {
return napi_release_threadsafe_function(*_tsfn, napi_tsfn_abort);
return napi_release_threadsafe_function(_tsfn, napi_tsfn_abort);
}

inline ThreadSafeFunction::ConvertibleContext
ThreadSafeFunction::GetContext() const {
void* context;
napi_get_threadsafe_function_context(*_tsfn, &context);
napi_get_threadsafe_function_context(_tsfn, &context);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Abort if failed here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the change doesn't change the behavior here, we could address this issue in another PR. Opened issue here #581.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@legendecas I agree, can you open an issue to make sure we fix it later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mhdawson the issue is above, and PR for fix (which is also conflicting with this PR) is #583

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KevinEady - oops should have read more carefullly. Thanks.

return ConvertibleContext({ context });
}

Expand All @@ -4133,10 +4120,10 @@ inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,

ThreadSafeFunction tsfn;
auto* finalizeData = new details::ThreadSafeFinalize<ContextType, Finalizer,
FinalizerDataType>({ data, finalizeCallback, tsfn._tsfn.get() });
FinalizerDataType>({ data, finalizeCallback, &tsfn._tsfn });
napi_status status = napi_create_threadsafe_function(env, callback, resource,
Value::From(env, resourceName), maxQueueSize, initialThreadCount,
finalizeData, wrapper, context, CallJS, tsfn._tsfn.get());
finalizeData, wrapper, context, CallJS, &tsfn._tsfn);
if (status != napi_ok) {
delete finalizeData;
NAPI_THROW_IF_FAILED(env, status, ThreadSafeFunction());
Expand All @@ -4149,7 +4136,7 @@ inline napi_status ThreadSafeFunction::CallInternal(
CallbackWrapper* callbackWrapper,
napi_threadsafe_function_call_mode mode) const {
napi_status status = napi_call_threadsafe_function(
*_tsfn, callbackWrapper, mode);
_tsfn, callbackWrapper, mode);
if (status != napi_ok && callbackWrapper != nullptr) {
delete callbackWrapper;
}
Expand Down
10 changes: 2 additions & 8 deletions napi.h
Original file line number Diff line number Diff line change
Expand Up @@ -2008,8 +2008,7 @@ namespace Napi {
ThreadSafeFunction();
ThreadSafeFunction(napi_threadsafe_function tsFunctionValue);

ThreadSafeFunction(ThreadSafeFunction&& other);
ThreadSafeFunction& operator=(ThreadSafeFunction&& other);
operator napi_threadsafe_function() const;

// This API may be called from any thread.
napi_status BlockingCall() const;
Expand Down Expand Up @@ -2081,13 +2080,8 @@ namespace Napi {
napi_value jsCallback,
void* context,
void* data);
struct Deleter {
// napi_threadsafe_function is managed by Node.js, leave it alone.
void operator()(napi_threadsafe_function*) const {};
};

std::unique_ptr<napi_threadsafe_function, Deleter> _tsfn;
Deleter _d;
napi_threadsafe_function _tsfn;
};
#endif

Expand Down
2 changes: 2 additions & 0 deletions test/binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Object InitObjectDeprecated(Env env);
Object InitPromise(Env env);
#if (NAPI_VERSION > 3)
Object InitThreadSafeFunctionPtr(Env env);
Object InitThreadSafeFunctionSum(Env env);
Object InitThreadSafeFunctionUnref(Env env);
Object InitThreadSafeFunction(Env env);
#endif
Expand Down Expand Up @@ -84,6 +85,7 @@ Object Init(Env env, Object exports) {
exports.Set("promise", InitPromise(env));
#if (NAPI_VERSION > 3)
exports.Set("threadsafe_function_ptr", InitThreadSafeFunctionPtr(env));
exports.Set("threadsafe_function_sum", InitThreadSafeFunctionSum(env));
exports.Set("threadsafe_function_unref", InitThreadSafeFunctionUnref(env));
exports.Set("threadsafe_function", InitThreadSafeFunction(env));
#endif
Expand Down
1 change: 1 addition & 0 deletions test/binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
'object/set_property.cc',
'promise.cc',
'threadsafe_function/threadsafe_function_ptr.cc',
'threadsafe_function/threadsafe_function_sum.cc',
'threadsafe_function/threadsafe_function_unref.cc',
'threadsafe_function/threadsafe_function.cc',
'typedarray.cc',
Expand Down
2 changes: 2 additions & 0 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ let testModules = [
'object/set_property',
'promise',
'threadsafe_function/threadsafe_function_ptr',
'threadsafe_function/threadsafe_function_sum',
'threadsafe_function/threadsafe_function_unref',
'threadsafe_function/threadsafe_function',
'typedarray',
Expand Down Expand Up @@ -69,6 +70,7 @@ if ((process.env.npm_config_NAPI_VERSION !== undefined) &&
if ((process.env.npm_config_NAPI_VERSION !== undefined) &&
(process.env.npm_config_NAPI_VERSION < 4)) {
testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function_ptr'), 1);
testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function_sum'), 1);
testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function_unref'), 1);
testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function'), 1);
}
Expand Down
199 changes: 199 additions & 0 deletions test/threadsafe_function/threadsafe_function_sum.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
#include "napi.h"
#include <thread>
#include <cstdlib>
#include <condition_variable>
#include <mutex>

#if (NAPI_VERSION > 3)

using namespace Napi;

namespace {

struct TestData {

TestData(Promise::Deferred&& deferred) : deferred(std::move(deferred)) {};

// Native Promise returned to JavaScript
Promise::Deferred deferred;

// List of threads created for test. This list only ever accessed via main
// thread.
std::vector<std::thread> threads = {};

ThreadSafeFunction tsfn = ThreadSafeFunction();
};

void FinalizerCallback(Napi::Env env, TestData* finalizeData){
for (size_t i = 0; i < finalizeData->threads.size(); ++i) {
finalizeData->threads[i].join();
}
finalizeData->deferred.Resolve(Boolean::New(env,true));
delete finalizeData;
}

/**
* See threadsafe_function_sum.js for descriptions of the tests in this file
*/

void entryWithTSFN(ThreadSafeFunction tsfn, int threadId) {
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1));
tsfn.BlockingCall( [=](Napi::Env env, Function callback) {
callback.Call( { Number::New(env, static_cast<double>(threadId))});
});
tsfn.Release();
}

static Value TestWithTSFN(const CallbackInfo& info) {
int threadCount = info[0].As<Number>().Int32Value();
Function cb = info[1].As<Function>();

// We pass the test data to the Finalizer for cleanup. The finalizer is
// responsible for deleting this data as well.
TestData *testData = new TestData(Promise::Deferred::New(info.Env()));

ThreadSafeFunction tsfn = ThreadSafeFunction::New(
info.Env(), cb, "Test", 0, threadCount,
std::function<decltype(FinalizerCallback)>(FinalizerCallback), testData);

for (int i = 0; i < threadCount; ++i) {
// A copy of the ThreadSafeFunction will go to the thread entry point
testData->threads.push_back( std::thread(entryWithTSFN, tsfn, i) );
}

return testData->deferred.Promise();
}

// Task instance created for each new std::thread
class DelayedTSFNTask {
public:
// Each instance has its own tsfn
ThreadSafeFunction tsfn;

// Thread-safety
std::mutex mtx;
std::condition_variable cv;

// Entry point for std::thread
void entryDelayedTSFN(int threadId) {
std::unique_lock<std::mutex> lk(mtx);
cv.wait(lk);
tsfn.BlockingCall([=](Napi::Env env, Function callback) {
callback.Call({Number::New(env, static_cast<double>(threadId))});
});
tsfn.Release();
};
};

struct TestDataDelayed {

TestDataDelayed(Promise::Deferred &&deferred)
: deferred(std::move(deferred)){};
~TestDataDelayed() { taskInsts.clear(); };
// Native Promise returned to JavaScript
Promise::Deferred deferred;

// List of threads created for test. This list only ever accessed via main
// thread.
std::vector<std::thread> threads = {};

// List of DelayedTSFNThread instances
std::vector<std::unique_ptr<DelayedTSFNTask>> taskInsts = {};

ThreadSafeFunction tsfn = ThreadSafeFunction();
};

void FinalizerCallbackDelayed(Napi::Env env, TestDataDelayed *finalizeData) {
for (size_t i = 0; i < finalizeData->threads.size(); ++i) {
finalizeData->threads[i].join();
}
finalizeData->deferred.Resolve(Boolean::New(env, true));
delete finalizeData;
}

static Value TestDelayedTSFN(const CallbackInfo &info) {
int threadCount = info[0].As<Number>().Int32Value();
Function cb = info[1].As<Function>();

TestDataDelayed *testData =
new TestDataDelayed(Promise::Deferred::New(info.Env()));

testData->tsfn =
ThreadSafeFunction::New(info.Env(), cb, "Test", 0, threadCount,
std::function<decltype(FinalizerCallbackDelayed)>(
FinalizerCallbackDelayed),
testData);

for (int i = 0; i < threadCount; ++i) {
testData->taskInsts.push_back(
std::unique_ptr<DelayedTSFNTask>(new DelayedTSFNTask()));
testData->threads.push_back(std::thread(&DelayedTSFNTask::entryDelayedTSFN,
testData->taskInsts.back().get(),
i));
}
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1));

for (auto &task : testData->taskInsts) {
std::lock_guard<std::mutex> lk(task->mtx);
task->tsfn = testData->tsfn;
task->cv.notify_all();
}

return testData->deferred.Promise();
}

void entryAcquire(ThreadSafeFunction tsfn, int threadId) {
tsfn.Acquire();
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1));
tsfn.BlockingCall( [=](Napi::Env env, Function callback) {
callback.Call( { Number::New(env, static_cast<double>(threadId))});
});
tsfn.Release();
}

static Value CreateThread(const CallbackInfo& info) {
TestData* testData = static_cast<TestData*>(info.Data());
ThreadSafeFunction tsfn = testData->tsfn;
int threadId = testData->threads.size();
// A copy of the ThreadSafeFunction will go to the thread entry point
testData->threads.push_back( std::thread(entryAcquire, tsfn, threadId) );
return Number::New(info.Env(), threadId);
}

static Value StopThreads(const CallbackInfo& info) {
TestData* testData = static_cast<TestData*>(info.Data());
ThreadSafeFunction tsfn = testData->tsfn;
tsfn.Release();
return info.Env().Undefined();
}

static Value TestAcquire(const CallbackInfo& info) {
Function cb = info[0].As<Function>();
Napi::Env env = info.Env();

// We pass the test data to the Finalizer for cleanup. The finalizer is
// responsible for deleting this data as well.
TestData *testData = new TestData(Promise::Deferred::New(info.Env()));

testData->tsfn = ThreadSafeFunction::New(
env, cb, "Test", 0, 1,
std::function<decltype(FinalizerCallback)>(FinalizerCallback), testData);

Object result = Object::New(env);
result["createThread"] = Function::New( env, CreateThread, "createThread", testData);
result["stopThreads"] = Function::New( env, StopThreads, "stopThreads", testData);
result["promise"] = testData->deferred.Promise();

return result;
}
}

Object InitThreadSafeFunctionSum(Env env) {
Object exports = Object::New(env);
exports["testDelayedTSFN"] = Function::New(env, TestDelayedTSFN);
exports["testWithTSFN"] = Function::New(env, TestWithTSFN);
exports["testAcquire"] = Function::New(env, TestAcquire);
return exports;
}

#endif
Loading