From 63a6c32e8098af967062e9673efc1c956cc031ee Mon Sep 17 00:00:00 2001 From: Kevin Eady <8634912+KevinEady@users.noreply.github.com> Date: Wed, 21 Apr 2021 11:55:56 +0200 Subject: [PATCH] test: fix intermittent TSFN crashes PR-URL: https://github.com/nodejs/node-addon-api/pull/974 Reviewed-By: Michael Dawson --- test/binding.cc | 2 +- .../threadsafe_function.cc | 24 ++++++++--- .../typed_threadsafe_function.cc | 42 ++++++++++++------- 3 files changed, 47 insertions(+), 21 deletions(-) diff --git a/test/binding.cc b/test/binding.cc index 2dc089cbe..92f6c77df 100644 --- a/test/binding.cc +++ b/test/binding.cc @@ -121,7 +121,7 @@ Object Init(Env env, Object exports) { exports.Set("threadsafe_function_ptr", InitThreadSafeFunctionPtr(env)); exports.Set("threadsafe_function_sum", InitThreadSafeFunctionSum(env)); exports.Set("threadsafe_function_unref", InitThreadSafeFunctionUnref(env)); - exports.Set("threadsafe_function", InitTypedThreadSafeFunction(env)); + exports.Set("threadsafe_function", InitThreadSafeFunction(env)); exports.Set("typed_threadsafe_function_ctx", InitTypedThreadSafeFunctionCtx(env)); exports.Set("typed_threadsafe_function_existing_tsfn", diff --git a/test/threadsafe_function/threadsafe_function.cc b/test/threadsafe_function/threadsafe_function.cc index e9b16083b..6886eef47 100644 --- a/test/threadsafe_function/threadsafe_function.cc +++ b/test/threadsafe_function/threadsafe_function.cc @@ -1,4 +1,6 @@ #include +#include +#include #include #include "napi.h" @@ -22,6 +24,9 @@ struct ThreadSafeFunctionInfo { bool startSecondary; FunctionReference jsFinalizeCallback; uint32_t maxQueueSize; + bool closeCalledFromJs; + std::mutex protect; + std::condition_variable signal; } tsfnInfo; // Thread data to transmit to JS @@ -65,12 +70,13 @@ static void DataSourceThread() { break; } - if (info->maxQueueSize == 0) { - // Let's make this thread really busy for 200 ms to give the main thread a - // chance to abort. - auto start = std::chrono::high_resolution_clock::now(); - constexpr auto MS_200 = std::chrono::milliseconds(200); - for (; std::chrono::high_resolution_clock::now() - start < MS_200;); + if (info->abort && info->type != ThreadSafeFunctionInfo::NON_BLOCKING) { + // Let's make this thread really busy to give the main thread a chance to + // abort / close. + std::unique_lock lk(info->protect); + while (!info->closeCalledFromJs) { + info->signal.wait(lk); + } } switch (status) { @@ -112,6 +118,11 @@ static Value StopThread(const CallbackInfo& info) { } else { tsfn.Release(); } + { + std::lock_guard _(tsfnInfo.protect); + tsfnInfo.closeCalledFromJs = true; + tsfnInfo.signal.notify_one(); + } return Value(); } @@ -134,6 +145,7 @@ static Value StartThreadInternal(const CallbackInfo& info, tsfnInfo.abort = info[1].As(); tsfnInfo.startSecondary = info[2].As(); tsfnInfo.maxQueueSize = info[3].As().Uint32Value(); + tsfnInfo.closeCalledFromJs = false; tsfn = ThreadSafeFunction::New(info.Env(), info[0].As(), "Test", tsfnInfo.maxQueueSize, 2, &tsfnInfo, JoinTheThreads, threads); diff --git a/test/typed_threadsafe_function/typed_threadsafe_function.cc b/test/typed_threadsafe_function/typed_threadsafe_function.cc index f9896db86..c25268aaf 100644 --- a/test/typed_threadsafe_function/typed_threadsafe_function.cc +++ b/test/typed_threadsafe_function/typed_threadsafe_function.cc @@ -1,4 +1,6 @@ #include +#include +#include #include #include "napi.h" @@ -17,6 +19,9 @@ static struct ThreadSafeFunctionInfo { bool startSecondary; FunctionReference jsFinalizeCallback; uint32_t maxQueueSize; + bool closeCalledFromJs; + std::mutex protect; + std::condition_variable signal; } tsfnInfo; static void TSFNCallJS(Env env, @@ -42,7 +47,7 @@ static int ints[ARRAY_LENGTH]; static void SecondaryThread() { if (tsfn.Release() != napi_ok) { - Error::Fatal("SecondaryThread", "ThreadSafeFunction.Release() failed"); + Error::Fatal("TypedSecondaryThread", "ThreadSafeFunction.Release() failed"); } } @@ -52,7 +57,8 @@ static void DataSourceThread() { if (info->startSecondary) { if (tsfn.Acquire() != napi_ok) { - Error::Fatal("DataSourceThread", "ThreadSafeFunction.Acquire() failed"); + Error::Fatal("TypedDataSourceThread", + "ThreadSafeFunction.Acquire() failed"); } threads[1] = std::thread(SecondaryThread); @@ -75,13 +81,13 @@ static void DataSourceThread() { break; } - if (info->maxQueueSize == 0) { - // Let's make this thread really busy for 200 ms to give the main thread a - // chance to abort. - auto start = std::chrono::high_resolution_clock::now(); - constexpr auto MS_200 = std::chrono::milliseconds(200); - for (; std::chrono::high_resolution_clock::now() - start < MS_200;) - ; + if (info->abort && info->type != ThreadSafeFunctionInfo::NON_BLOCKING) { + // Let's make this thread really busy to give the main thread a chance to + // abort / close. + std::unique_lock lk(info->protect); + while (!info->closeCalledFromJs) { + info->signal.wait(lk); + } } switch (status) { @@ -98,20 +104,22 @@ static void DataSourceThread() { break; default: - Error::Fatal("DataSourceThread", "ThreadSafeFunction.*Call() failed"); + Error::Fatal("TypedDataSourceThread", + "ThreadSafeFunction.*Call() failed"); } } if (info->type == ThreadSafeFunctionInfo::NON_BLOCKING && !queueWasFull) { - Error::Fatal("DataSourceThread", "Queue was never full"); + Error::Fatal("TypedDataSourceThread", "Queue was never full"); } if (info->abort && !queueWasClosing) { - Error::Fatal("DataSourceThread", "Queue was never closing"); + Error::Fatal("TypedDataSourceThread", "Queue was never closing"); } if (!queueWasClosing && tsfn.Release() != napi_ok) { - Error::Fatal("DataSourceThread", "ThreadSafeFunction.Release() failed"); + Error::Fatal("TypedDataSourceThread", + "ThreadSafeFunction.Release() failed"); } } @@ -123,6 +131,11 @@ static Value StopThread(const CallbackInfo& info) { } else { tsfn.Release(); } + { + std::lock_guard _(tsfnInfo.protect); + tsfnInfo.closeCalledFromJs = true; + tsfnInfo.signal.notify_one(); + } return Value(); } @@ -145,6 +158,7 @@ static Value StartThreadInternal(const CallbackInfo& info, tsfnInfo.abort = info[1].As(); tsfnInfo.startSecondary = info[2].As(); tsfnInfo.maxQueueSize = info[3].As().Uint32Value(); + tsfnInfo.closeCalledFromJs = false; tsfn = TSFN::New(info.Env(), info[0].As(), @@ -163,7 +177,7 @@ static Value StartThreadInternal(const CallbackInfo& info, static Value Release(const CallbackInfo& /* info */) { if (tsfn.Release() != napi_ok) { - Error::Fatal("Release", "ThreadSafeFunction.Release() failed"); + Error::Fatal("Release", "TypedThreadSafeFunction.Release() failed"); } return Value(); }