Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tsfn: Implement copy constructor #546

Merged
merged 13 commits into from
Nov 1, 2019
35 changes: 18 additions & 17 deletions napi-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -3970,31 +3970,32 @@ inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
}

inline ThreadSafeFunction::ThreadSafeFunction()
: _tsfn(new napi_threadsafe_function(nullptr), _d) {
: _tsfn() {
}

inline ThreadSafeFunction::ThreadSafeFunction(
napi_threadsafe_function tsfn)
: _tsfn(new napi_threadsafe_function(tsfn), _d) {
: _tsfn(tsfn) {
}

inline ThreadSafeFunction::ThreadSafeFunction(ThreadSafeFunction&& other)
: _tsfn(std::move(other._tsfn)) {
other._tsfn.reset();
}

inline ThreadSafeFunction::ThreadSafeFunction(const ThreadSafeFunction& other)
: _tsfn(other._tsfn) {
}

inline ThreadSafeFunction& ThreadSafeFunction::operator =(
ThreadSafeFunction&& other) {
if (*_tsfn != nullptr) {
Error::Fatal("ThreadSafeFunction::operator =",
"You cannot assign a new TSFN because existing one is still alive.");
return *this;
}
_tsfn = std::move(other._tsfn);
other._tsfn.reset();
_tsfn = other._tsfn;
return *this;
}

inline ThreadSafeFunction::operator napi_threadsafe_function() const {
return _tsfn;
}

inline napi_status ThreadSafeFunction::BlockingCall() const {
return CallInternal(nullptr, napi_tsfn_blocking);
}
Expand Down Expand Up @@ -4034,21 +4035,21 @@ inline napi_status ThreadSafeFunction::NonBlockingCall(
}

inline napi_status ThreadSafeFunction::Acquire() const {
return napi_acquire_threadsafe_function(*_tsfn);
return napi_acquire_threadsafe_function(_tsfn);
}

inline napi_status ThreadSafeFunction::Release() {
return napi_release_threadsafe_function(*_tsfn, napi_tsfn_release);
return napi_release_threadsafe_function(_tsfn, napi_tsfn_release);
}

inline napi_status ThreadSafeFunction::Abort() {
return napi_release_threadsafe_function(*_tsfn, napi_tsfn_abort);
return napi_release_threadsafe_function(_tsfn, napi_tsfn_abort);
}

inline ThreadSafeFunction::ConvertibleContext
ThreadSafeFunction::GetContext() const {
void* context;
napi_get_threadsafe_function_context(*_tsfn, &context);
napi_get_threadsafe_function_context(_tsfn, &context);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Abort if failed here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the change doesn't change the behavior here, we could address this issue in another PR. Opened issue here #581.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@legendecas I agree, can you open an issue to make sure we fix it later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mhdawson the issue is above, and PR for fix (which is also conflicting with this PR) is #583

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KevinEady - oops should have read more carefullly. Thanks.

return ConvertibleContext({ context });
}

Expand All @@ -4071,10 +4072,10 @@ inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,

ThreadSafeFunction tsfn;
auto* finalizeData = new details::ThreadSafeFinalize<ContextType, Finalizer,
FinalizerDataType>({ data, finalizeCallback, tsfn._tsfn.get() });
FinalizerDataType>({ data, finalizeCallback, &tsfn._tsfn });
napi_status status = napi_create_threadsafe_function(env, callback, resource,
Value::From(env, resourceName), maxQueueSize, initialThreadCount,
finalizeData, wrapper, context, CallJS, tsfn._tsfn.get());
finalizeData, wrapper, context, CallJS, &tsfn._tsfn);
if (status != napi_ok) {
delete finalizeData;
NAPI_THROW_IF_FAILED(env, status, ThreadSafeFunction());
Expand All @@ -4087,7 +4088,7 @@ inline napi_status ThreadSafeFunction::CallInternal(
CallbackWrapper* callbackWrapper,
napi_threadsafe_function_call_mode mode) const {
napi_status status = napi_call_threadsafe_function(
*_tsfn, callbackWrapper, mode);
_tsfn, callbackWrapper, mode);
if (status != napi_ok && callbackWrapper != nullptr) {
delete callbackWrapper;
}
Expand Down
10 changes: 4 additions & 6 deletions napi.h
Original file line number Diff line number Diff line change
Expand Up @@ -1987,8 +1987,11 @@ namespace Napi {
ThreadSafeFunction(napi_threadsafe_function tsFunctionValue);

ThreadSafeFunction(ThreadSafeFunction&& other);
ThreadSafeFunction(const ThreadSafeFunction& other);
ThreadSafeFunction& operator=(ThreadSafeFunction&& other);

operator napi_threadsafe_function() const;

// This API may be called from any thread.
napi_status BlockingCall() const;

Expand Down Expand Up @@ -2053,13 +2056,8 @@ namespace Napi {
napi_value jsCallback,
void* context,
void* data);
struct Deleter {
// napi_threadsafe_function is managed by Node.js, leave it alone.
void operator()(napi_threadsafe_function*) const {};
};

std::unique_ptr<napi_threadsafe_function, Deleter> _tsfn;
Deleter _d;
napi_threadsafe_function _tsfn;
};
#endif

Expand Down
2 changes: 2 additions & 0 deletions test/binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Object InitObjectDeprecated(Env env);
Object InitPromise(Env env);
#if (NAPI_VERSION > 3)
Object InitThreadSafeFunctionPtr(Env env);
Object InitThreadSafeFunctionSum(Env env);
Object InitThreadSafeFunction(Env env);
#endif
Object InitTypedArray(Env env);
Expand Down Expand Up @@ -83,6 +84,7 @@ Object Init(Env env, Object exports) {
exports.Set("promise", InitPromise(env));
#if (NAPI_VERSION > 3)
exports.Set("threadsafe_function_ptr", InitThreadSafeFunctionPtr(env));
exports.Set("threadsafe_function_sum", InitThreadSafeFunctionSum(env));
exports.Set("threadsafe_function", InitThreadSafeFunction(env));
#endif
exports.Set("typedarray", InitTypedArray(env));
Expand Down
1 change: 1 addition & 0 deletions test/binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
'object/set_property.cc',
'promise.cc',
'threadsafe_function/threadsafe_function_ptr.cc',
'threadsafe_function/threadsafe_function_sum.cc',
'threadsafe_function/threadsafe_function.cc',
'typedarray.cc',
'objectwrap.cc',
Expand Down
1 change: 1 addition & 0 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ let testModules = [
'object/set_property',
'promise',
'threadsafe_function/threadsafe_function_ptr',
'threadsafe_function/threadsafe_function_sum',
'threadsafe_function/threadsafe_function',
'typedarray',
'typedarray-bigint',
Expand Down
155 changes: 155 additions & 0 deletions test/threadsafe_function/threadsafe_function_sum.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
#include "napi.h"
#include <thread>
#include <future>

#if (NAPI_VERSION > 3)

using namespace Napi;
using namespace std;

namespace {

struct TestData {
// Native Promise returned to JavaScript
Promise::Deferred deferred;

// List of threads created for test. This list only ever accessed via main
// thread.
vector<thread> threads = {};

ThreadSafeFunction tsfn = ThreadSafeFunction();
};

void FinalizerCallback(Napi::Env env, TestData* finalizeData){
for (size_t i = 0; i < finalizeData->threads.size(); ++i) {
finalizeData->threads[i].join();
}
finalizeData->deferred.Resolve(Boolean::New(env,true));
delete finalizeData;
}

/**
* See threadsafe_function_sum.js for descriptions of the tests in this file
*/

void entryWithTSFN(ThreadSafeFunction tsfn, int threadId) {
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1));
tsfn.BlockingCall( [=](Napi::Env env, Function callback) {
callback.Call( { Number::New(env, static_cast<double>(threadId))});
});
tsfn.Release();
}

static Value TestWithTSFN(const CallbackInfo& info) {
int threadCount = info[0].As<Number>().Int32Value();
Function cb = info[1].As<Function>();

// We pass the test data to the Finalizer for cleanup. The finalizer is
// responsible for deleting this data as well.
TestData *testData = new TestData({
Promise::Deferred::New(info.Env())
});

ThreadSafeFunction tsfn = ThreadSafeFunction::New(
info.Env(), cb, "Test", 0, threadCount,
std::function<decltype(FinalizerCallback)>(FinalizerCallback), testData);

for (int i = 0; i < threadCount; ++i) {
testData->threads.push_back( thread(entryWithTSFN, tsfn, i) );
}

return testData->deferred.Promise();
}


void entryDelayedTSFN(std::future<ThreadSafeFunction> tsfnFuture, int threadId) {
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1));
ThreadSafeFunction tsfn = tsfnFuture.get();
tsfn.BlockingCall( [=](Napi::Env env, Function callback) {
callback.Call( { Number::New(env, static_cast<double>(threadId))});
});
tsfn.Release();
}

static Value TestDelayedTSFN(const CallbackInfo& info) {
int threadCount = info[0].As<Number>().Int32Value();
Function cb = info[1].As<Function>();

TestData *testData = new TestData({
Promise::Deferred::New(info.Env())
});

vector< std::promise<ThreadSafeFunction> > tsfnPromises;

for (int i = 0; i < threadCount; ++i) {
tsfnPromises.emplace_back();
testData->threads.push_back( thread(entryDelayedTSFN, tsfnPromises[i].get_future(), i) );
}

testData->tsfn = ThreadSafeFunction::New(
info.Env(), cb, "Test", 0, threadCount,
std::function<decltype(FinalizerCallback)>(FinalizerCallback), testData);

for (int i = 0; i < threadCount; ++i) {
tsfnPromises[i].set_value(testData->tsfn);
}

return testData->deferred.Promise();
}

void entryAcquire(ThreadSafeFunction tsfn, int threadId) {
tsfn.Acquire();
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1));
tsfn.BlockingCall( [=](Napi::Env env, Function callback) {
callback.Call( { Number::New(env, static_cast<double>(threadId))});
});
tsfn.Release();
}

static Value CreateThread(const CallbackInfo& info) {
TestData* testData = static_cast<TestData*>(info.Data());
ThreadSafeFunction tsfn = testData->tsfn;
int threadId = testData->threads.size();
testData->threads.push_back( thread(entryAcquire, tsfn, threadId) );
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be useful to highlight with a comment that this is where the copy constructor is being tested. Since the main change is to add the copy constructor it would be good to make it obvious in the test that it is being used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment

return Number::New(info.Env(), threadId);
}

static Value StopThreads(const CallbackInfo& info) {
TestData* testData = static_cast<TestData*>(info.Data());
ThreadSafeFunction tsfn = testData->tsfn;
tsfn.Release();
return info.Env().Undefined();
}

static Value TestAcquire(const CallbackInfo& info) {
Function cb = info[0].As<Function>();
Napi::Env env = info.Env();

// We pass the test data to the Finalizer for cleanup. The finalizer is
// responsible for deleting this data as well.
TestData *testData = new TestData({
Promise::Deferred::New(env)
});

testData->tsfn = ThreadSafeFunction::New(
env, cb, "Test", 0, 1,
std::function<decltype(FinalizerCallback)>(FinalizerCallback), testData);

Object result = Object::New(env);
result["createThread"] = Function::New( env, CreateThread, "createThread", testData);
result["stopThreads"] = Function::New( env, StopThreads, "stopThreads", testData);
result["promise"] = testData->deferred.Promise();

return result;
}
}

Object InitThreadSafeFunctionSum(Env env) {
Object exports = Object::New(env);
exports["testDelayedTSFN"] = Function::New(env, TestDelayedTSFN);
exports["testWithTSFN"] = Function::New(env, TestWithTSFN);
exports["testAcquire"] = Function::New(env, TestAcquire);
return exports;
}

#endif
65 changes: 65 additions & 0 deletions test/threadsafe_function/threadsafe_function_sum.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
'use strict';
const assert = require('assert');
const buildType = process.config.target_defaults.default_configuration;

/**
*
* ThreadSafeFunction Tests: Thread Id Sums
*
* Every native C++ function that utilizes the TSFN will call the registered
* callback with the thread id. Passing Array.prototype.push with a bound array
* will push the thread id to the array. Therefore, starting `N` threads, we
* will expect the sum of all elements in the array to be `(N-1) * (N) / 2` (as
* thread IDs are 0-based)
*
* We check different methods of passing a ThreadSafeFunction around multiple
* threads:
* - `testWithTSFN`: The main thread creates the TSFN. Then, it creates
* threads, passing the TSFN at thread construction. The number of threads is
* static (known at TSFN creation).
* - `testDelayedTSFN`: The main thread creates threads, passing a promise to a
* TSFN at construction. Then, it creates the TSFN, and resolves each
* threads' promise. The number of threads is static.
* - `testAcquire`: The native binding returns a function to start a new. A
* call to this function will return `false` once `N` calls have been made.
* Each thread will acquire its own use of the TSFN, call it, and then
* release.
*/

const THREAD_COUNT = 5;
const EXPECTED_SUM = (THREAD_COUNT - 1) * (THREAD_COUNT) / 2;

module.exports = Promise.all([
test(require(`../build/${buildType}/binding.node`)),
test(require(`../build/${buildType}/binding_noexcept.node`))
]);

/** @param {number[]} N */
const sum = (N) => N.reduce((sum, n) => sum + n, 0);

function test(binding) {
async function check(bindingFunction) {
const calls = [];
const result = await bindingFunction(THREAD_COUNT, Array.prototype.push.bind(calls));
assert.ok(result);
assert.equal(sum(calls), EXPECTED_SUM);
}

async function checkAcquire() {
const calls = [];
const { promise, createThread, stopThreads } = binding.threadsafe_function_sum.testAcquire(Array.prototype.push.bind(calls));
for (let i = 0; i < THREAD_COUNT; i++) {
createThread();
}
stopThreads();
const result = await promise;
assert.ok(result);
assert.equal(sum(calls), EXPECTED_SUM);
}

return Promise.all([
check(binding.threadsafe_function_sum.testDelayedTSFN),
check(binding.threadsafe_function_sum.testWithTSFN),
checkAcquire()
]);
}