Skip to content

Commit

Permalink
worker: refactor thread life cycle management
Browse files Browse the repository at this point in the history
The current mechanism of uses two async handles, one owned by the
creator of the worker thread to terminate a running worker,
and another one employed by the worker to interrupt its creator on its
natural termination. The force termination piggybacks on the message-
passing mechanism to inform the worker to quiesce.

Also there are few flags that represent the other thread's state /
request state because certain code path is shared by multiple
control flows, and there are certain code path where the async
handles may not have come to life.

Refactor into a LoopStopper abstraction that exposes routines to
install a handle as well as to save a state.

Refs: nodejs#21283
  • Loading branch information
gireeshpunathil committed Mar 1, 2019
1 parent 5843058 commit fbf740e
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 95 deletions.
16 changes: 0 additions & 16 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -584,13 +584,6 @@ void MessagePort::OnMessage() {
// Get the head of the message queue.
Mutex::ScopedLock lock(data_->mutex_);

if (stop_event_loop_) {
Debug(this, "MessagePort stops loop as requested");
CHECK(!data_->receiving_messages_);
uv_stop(env()->event_loop());
break;
}

Debug(this, "MessagePort has message, receiving = %d",
static_cast<int>(data_->receiving_messages_));

Expand Down Expand Up @@ -740,15 +733,6 @@ void MessagePort::Stop() {
data_->receiving_messages_ = false;
}

void MessagePort::StopEventLoop() {
Mutex::ScopedLock lock(data_->mutex_);
data_->receiving_messages_ = false;
stop_event_loop_ = true;

Debug(this, "Received StopEventLoop request");
TriggerAsync();
}

void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
MessagePort* port;
Expand Down
4 changes: 0 additions & 4 deletions src/node_messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,6 @@ class MessagePort : public HandleWrap {
void Start();
// Stop processing messages on this port as a receiving end.
void Stop();
// Stop processing messages on this port as a receiving end,
// and stop the event loop that this port is associated with.
void StopEventLoop();

static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
Expand Down Expand Up @@ -206,7 +203,6 @@ class MessagePort : public HandleWrap {
inline uv_async_t* async();

std::unique_ptr<MessagePortData> data_ = nullptr;
bool stop_event_loop_ = false;

friend class MessagePortData;
};
Expand Down
139 changes: 77 additions & 62 deletions src/node_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,46 @@ void WaitForWorkerInspectorToStop(Environment* child) {

} // anonymous namespace

void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) {
Mutex::ScopedLock lock(mutex_);
env_ = env;
async_ = new uv_async_t;
if (data != nullptr) async_->data = data;
CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0);
}

void AsyncRequest::Uninstall() {
Mutex::ScopedLock lock(mutex_);
if (async_ != nullptr)
env_->CloseHandle(async_, [](uv_async_t* async) { delete async; });
}

void AsyncRequest::Stop() {
Mutex::ScopedLock lock(mutex_);
stop_ = true;
if (async_ != nullptr) uv_async_send(async_);
}

void AsyncRequest::SetStopped(bool flag) {
Mutex::ScopedLock lock(mutex_);
stop_ = flag;
}

bool AsyncRequest::IsStopped() {
Mutex::ScopedLock lock(mutex_);
return stop_;
}

uv_async_t* AsyncRequest::GetHandle() {
Mutex::ScopedLock lock(mutex_);
return async_;
}

void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const {
Mutex::ScopedLock lock(mutex_);
if (async_ != nullptr) tracker->TrackField("async_request", *async_);
}

Worker::Worker(Environment* env,
Local<Object> wrap,
const std::string& url,
Expand Down Expand Up @@ -97,9 +137,8 @@ Worker::Worker(Environment* env,
Debug(this, "Preparation for worker %llu finished", thread_id_);
}

bool Worker::is_stopped() const {
Mutex::ScopedLock stopped_lock(stopped_mutex_);
return stopped_;
bool Worker::is_stopped() {
return thread_stopper_.IsStopped();
}

// This class contains data that is only relevant to the child thread itself,
Expand Down Expand Up @@ -207,6 +246,8 @@ void Worker::Run() {
Context::Scope context_scope(env_->context());
if (child_port != nullptr)
child_port->Close();
thread_stopper_.Uninstall();
thread_stopper_.SetStopped(true);
env_->stop_sub_worker_contexts();
env_->RunCleanup();
RunAtExit(env_.get());
Expand All @@ -215,10 +256,7 @@ void Worker::Run() {
WaitForWorkerInspectorToStop(env_.get());
#endif

{
Mutex::ScopedLock stopped_lock(stopped_mutex_);
stopped_ = true;
}
env_->RunCleanup();

// This call needs to be made while the `Environment` is still alive
// because we assume that it is available for async tracking in the
Expand All @@ -227,11 +265,12 @@ void Worker::Run() {
}
});

if (thread_stopper_.IsStopped()) return;
{
HandleScope handle_scope(isolate_);
Local<Context> context = NewContext(isolate_);
if (is_stopped()) return;

if (thread_stopper_.IsStopped()) return;
CHECK(!context.IsEmpty());
Context::Scope context_scope(context);
{
Expand All @@ -253,6 +292,14 @@ void Worker::Run() {
Debug(this, "Created Environment for worker with id %llu", thread_id_);

if (is_stopped()) return;
thread_stopper_.Install(env_.get(), env_.get(), [](uv_async_t* handle) {
Environment* env_ = static_cast<Environment*>(handle->data);
uv_stop(env_->event_loop());
});
uv_unref(reinterpret_cast<uv_handle_t*>(thread_stopper_.GetHandle()));

Debug(this, "Created Environment for worker with id %llu", thread_id_);
if (thread_stopper_.IsStopped()) return;
{
HandleScope handle_scope(isolate_);
Mutex::ScopedLock lock(mutex_);
Expand All @@ -268,7 +315,7 @@ void Worker::Run() {
Debug(this, "Created message port for worker %llu", thread_id_);
}

if (is_stopped()) return;
if (thread_stopper_.IsStopped()) return;
{
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
StartWorkerInspector(env_.get(),
Expand All @@ -289,22 +336,21 @@ void Worker::Run() {
Debug(this, "Loaded environment for worker %llu", thread_id_);
}

if (is_stopped()) return;
if (thread_stopper_.IsStopped()) return;
{
SealHandleScope seal(isolate_);
bool more;
env_->performance_state()->Mark(
node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
do {
if (is_stopped()) break;
if (thread_stopper_.IsStopped()) break;
uv_run(&data.loop_, UV_RUN_DEFAULT);
if (is_stopped()) break;
if (thread_stopper_.IsStopped()) break;

platform_->DrainTasks(isolate_);

more = uv_loop_alive(&data.loop_);
if (more && !is_stopped())
continue;
if (more && !thread_stopper_.IsStopped()) continue;

EmitBeforeExit(env_.get());

Expand All @@ -319,7 +365,7 @@ void Worker::Run() {

{
int exit_code;
bool stopped = is_stopped();
bool stopped = thread_stopper_.IsStopped();
if (!stopped)
exit_code = EmitExit(env_.get());
Mutex::ScopedLock lock(mutex_);
Expand All @@ -341,34 +387,11 @@ void Worker::JoinThread() {
thread_joined_ = true;

env()->remove_sub_worker_context(this);

if (thread_exit_async_) {
env()->CloseHandle(thread_exit_async_.release(), [](uv_async_t* async) {
delete async;
});

if (scheduled_on_thread_stopped_)
OnThreadStopped();
}
OnThreadStopped();
on_thread_finished_.Uninstall();
}

void Worker::OnThreadStopped() {
{
Mutex::ScopedLock lock(mutex_);
scheduled_on_thread_stopped_ = false;

Debug(this, "Worker %llu thread stopped", thread_id_);

{
Mutex::ScopedLock stopped_lock(stopped_mutex_);
CHECK(stopped_);
}

parent_port_ = nullptr;
}

JoinThread();

{
HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(env()->context());
Expand All @@ -391,7 +414,7 @@ Worker::~Worker() {
Mutex::ScopedLock lock(mutex_);
JoinThread();

CHECK(stopped_);
CHECK(thread_stopper_.IsStopped());
CHECK(thread_joined_);

// This has most likely already happened within the worker thread -- this
Expand Down Expand Up @@ -480,16 +503,15 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
Mutex::ScopedLock lock(w->mutex_);

w->env()->add_sub_worker_context(w);
w->stopped_ = false;
w->thread_joined_ = false;
w->thread_stopper_.SetStopped(false);

w->thread_exit_async_.reset(new uv_async_t);
w->thread_exit_async_->data = w;
CHECK_EQ(uv_async_init(w->env()->event_loop(),
w->thread_exit_async_.get(),
[](uv_async_t* handle) {
static_cast<Worker*>(handle->data)->OnThreadStopped();
}), 0);
w->on_thread_finished_.Install(w->env(), w, [](uv_async_t* handle) {
Worker* w_ = static_cast<Worker*>(handle->data);
CHECK(w_->thread_stopper_.IsStopped());
w_->parent_port_ = nullptr;
w_->JoinThread();
});

uv_thread_options_t thread_options;
thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
Expand All @@ -505,9 +527,7 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
w->Run();

Mutex::ScopedLock lock(w->mutex_);
CHECK(w->thread_exit_async_);
w->scheduled_on_thread_stopped_ = true;
uv_async_send(w->thread_exit_async_.get());
w->on_thread_finished_.Stop();
}, static_cast<void*>(w)), 0);
}

Expand All @@ -523,28 +543,23 @@ void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
if (w->thread_exit_async_)
uv_ref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
uv_ref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle()));
}

void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
if (w->thread_exit_async_)
uv_unref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
uv_unref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle()));
}

void Worker::Exit(int code) {
Mutex::ScopedLock lock(mutex_);
Mutex::ScopedLock stopped_lock(stopped_mutex_);

Debug(this, "Worker %llu called Exit(%d)", thread_id_, code);

if (!stopped_) {
stopped_ = true;
if (!thread_stopper_.IsStopped()) {
exit_code_ = code;
if (child_port_ != nullptr)
child_port_->StopEventLoop();
Debug(this, "Received StopEventLoop request");
thread_stopper_.Stop();
if (isolate_ != nullptr)
isolate_->TerminateExecution();
}
Expand Down
39 changes: 26 additions & 13 deletions src/node_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,35 @@

#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#include "node_messaging.h"
#include <unordered_map>
#include "node_messaging.h"
#include "uv.h"

namespace node {
namespace worker {

class WorkerThreadData;

class AsyncRequest : public MemoryRetainer {
public:
AsyncRequest() : stop_(true) {}
void Install(Environment* env, void* data, uv_async_cb target);
void Uninstall();
void Stop();
void SetStopped(bool flag);
bool IsStopped();
uv_async_t* GetHandle();
void MemoryInfo(MemoryTracker* tracker) const override;
SET_MEMORY_INFO_NAME(AsyncRequest)
SET_SELF_SIZE(AsyncRequest)

private:
Environment* env_;
uv_async_t* async_ = nullptr;
mutable Mutex mutex_;
bool stop_ = true;
};

// A worker thread, as represented in its parent thread.
class Worker : public AsyncWrap {
public:
Expand All @@ -34,14 +55,13 @@ class Worker : public AsyncWrap {
tracker->TrackFieldWithSize(
"isolate_data", sizeof(IsolateData), "IsolateData");
tracker->TrackFieldWithSize("env", sizeof(Environment), "Environment");
tracker->TrackField("thread_exit_async", *thread_exit_async_);
tracker->TrackField("parent_port", parent_port_);
}

SET_MEMORY_INFO_NAME(Worker)
SET_SELF_SIZE(Worker)

bool is_stopped() const;
bool is_stopped();

static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void StartThread(const v8::FunctionCallbackInfo<v8::Value>& args);
Expand All @@ -67,16 +87,6 @@ class Worker : public AsyncWrap {
// This mutex protects access to all variables listed below it.
mutable Mutex mutex_;

// Currently only used for telling the parent thread that the child
// thread exited.
std::unique_ptr<uv_async_t> thread_exit_async_;
bool scheduled_on_thread_stopped_ = false;

// This mutex only protects stopped_. If both locks are acquired, this needs
// to be the latter one.
mutable Mutex stopped_mutex_;
bool stopped_ = true;

bool thread_joined_ = true;
int exit_code_ = 0;
uint64_t thread_id_ = -1;
Expand All @@ -96,6 +106,9 @@ class Worker : public AsyncWrap {
// instance refers to it via its [kPort] property.
MessagePort* parent_port_ = nullptr;

AsyncRequest thread_stopper_;
AsyncRequest on_thread_finished_;

friend class WorkerThreadData;
};

Expand Down

0 comments on commit fbf740e

Please sign in to comment.