From 208a30c2ebae7bb65f8d59699a9daac937e1768f Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Thu, 18 Apr 2019 00:54:59 +0200 Subject: [PATCH 1/3] worker: move `receiving_messages_` field to `MessagePort` This is a property of the native object associated with the `MessagePort`, not something that should be set on the conceptual `MessagePort` that may be transferred around. --- src/node_messaging.cc | 11 +++++------ src/node_messaging.h | 2 +- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/node_messaging.cc b/src/node_messaging.cc index c7d0b327003f41..b9212ba272d0fe 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -588,9 +588,9 @@ void MessagePort::OnMessage() { Mutex::ScopedLock lock(data_->mutex_); Debug(this, "MessagePort has message, receiving = %d", - static_cast(data_->receiving_messages_)); + static_cast(receiving_messages_)); - if (!data_->receiving_messages_) + if (!receiving_messages_) break; if (data_->incoming_messages_.empty()) break; @@ -722,17 +722,16 @@ void MessagePort::PostMessage(const FunctionCallbackInfo& args) { } void MessagePort::Start() { - Mutex::ScopedLock lock(data_->mutex_); Debug(this, "Start receiving messages"); - data_->receiving_messages_ = true; + receiving_messages_ = true; + Mutex::ScopedLock lock(data_->mutex_); if (!data_->incoming_messages_.empty()) TriggerAsync(); } void MessagePort::Stop() { - Mutex::ScopedLock lock(data_->mutex_); Debug(this, "Stop receiving messages"); - data_->receiving_messages_ = false; + receiving_messages_ = false; } void MessagePort::Start(const FunctionCallbackInfo& args) { diff --git a/src/node_messaging.h b/src/node_messaging.h index aa2559af2c8061..0a729c141088cb 100644 --- a/src/node_messaging.h +++ b/src/node_messaging.h @@ -116,7 +116,6 @@ class MessagePortData : public MemoryRetainer { // This mutex protects all fields below it, with the exception of // sibling_. mutable Mutex mutex_; - bool receiving_messages_ = false; std::list incoming_messages_; MessagePort* owner_ = nullptr; // This mutex protects the sibling_ field and is shared between two entangled @@ -205,6 +204,7 @@ class MessagePort : public HandleWrap { void TriggerAsync(); std::unique_ptr data_ = nullptr; + bool receiving_messages_ = false; uv_async_t async_; friend class MessagePortData; From 489d500cc4ae374d8a68b6def8b1423f321899fe Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Wed, 15 May 2019 01:24:57 +0200 Subject: [PATCH 2/3] worker: use special message as MessagePort close command When a `MessagePort` connected to another `MessagePort` closes, the latter `MessagePort` will be closed as well. Until now, this is done by testing whether the ports are still entangled after processing messages. This leaves open a race condition window in which messages sent just before the closure can be lost when timing is unfortunate. (A description of the timing is in the test file.) This can be addressed by using a special message instead, which is the last message received by a `MessagePort`. This way, all previously sent messages are processed first. Fixes: https://github.com/nodejs/node/issues/22762 --- src/node_messaging.cc | 53 +++++++++---------- src/node_messaging.h | 16 +++--- ...orker-message-port-message-before-close.js | 38 +++++++++++++ 3 files changed, 71 insertions(+), 36 deletions(-) create mode 100644 test/parallel/test-worker-message-port-message-before-close.js diff --git a/src/node_messaging.cc b/src/node_messaging.cc index b9212ba272d0fe..98ef42df758fd9 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -40,6 +40,10 @@ namespace worker { Message::Message(MallocedBuffer&& buffer) : main_message_buf_(std::move(buffer)) {} +bool Message::IsCloseMessage() const { + return main_message_buf_.data == nullptr; +} + namespace { // This is used to tell V8 how to read transferred host objects, like other @@ -91,6 +95,8 @@ class DeserializerDelegate : public ValueDeserializer::Delegate { MaybeLocal Message::Deserialize(Environment* env, Local context) { + CHECK(!IsCloseMessage()); + EscapableHandleScope handle_scope(env->isolate()); Context::Scope context_scope(context); @@ -395,6 +401,7 @@ Maybe Message::Serialize(Environment* env, // The serializer gave us a buffer allocated using `malloc()`. std::pair data = serializer.Release(); + CHECK_NOT_NULL(data.first); main_message_buf_ = MallocedBuffer(reinterpret_cast(data.first), data.second); return Just(true); @@ -430,11 +437,6 @@ void MessagePortData::AddToIncomingQueue(Message&& message) { } } -bool MessagePortData::IsSiblingClosed() const { - Mutex::ScopedLock lock(*sibling_mutex_); - return sibling_ == nullptr; -} - void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) { CHECK_NULL(a->sibling_); CHECK_NULL(b->sibling_); @@ -443,12 +445,6 @@ void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) { a->sibling_mutex_ = b->sibling_mutex_; } -void MessagePortData::PingOwnerAfterDisentanglement() { - Mutex::ScopedLock lock(mutex_); - if (owner_ != nullptr) - owner_->TriggerAsync(); -} - void MessagePortData::Disentangle() { // Grab a copy of the sibling mutex, then replace it so that each sibling // has its own sibling_mutex_ now. @@ -462,11 +458,12 @@ void MessagePortData::Disentangle() { sibling_ = nullptr; } - // We close MessagePorts after disentanglement, so we trigger the - // corresponding uv_async_t to let them know that this happened. - PingOwnerAfterDisentanglement(); + // We close MessagePorts after disentanglement, so we enqueue a corresponding + // message and trigger the corresponding uv_async_t to let them know that + // this happened. + AddToIncomingQueue(Message()); if (sibling != nullptr) { - sibling->PingOwnerAfterDisentanglement(); + sibling->AddToIncomingQueue(Message()); } } @@ -590,14 +587,25 @@ void MessagePort::OnMessage() { Debug(this, "MessagePort has message, receiving = %d", static_cast(receiving_messages_)); - if (!receiving_messages_) - break; - if (data_->incoming_messages_.empty()) + // We have nothing to do if: + // - There are no pending messages + // - We are not intending to receive messages, and the message we would + // receive is not the final "close" message. + if (data_->incoming_messages_.empty() || + (!receiving_messages_ && + !data_->incoming_messages_.front().IsCloseMessage())) { break; + } + received = std::move(data_->incoming_messages_.front()); data_->incoming_messages_.pop_front(); } + if (received.IsCloseMessage()) { + Close(); + return; + } + if (!env()->can_call_into_js()) { Debug(this, "MessagePort drains queue because !can_call_into_js()"); // In this case there is nothing to do but to drain the current queue. @@ -628,15 +636,6 @@ void MessagePort::OnMessage() { } } } - - if (data_ && data_->IsSiblingClosed()) { - Close(); - } -} - -bool MessagePort::IsSiblingClosed() const { - CHECK(data_); - return data_->IsSiblingClosed(); } void MessagePort::OnClose() { diff --git a/src/node_messaging.h b/src/node_messaging.h index 0a729c141088cb..08a6798e3cd3c1 100644 --- a/src/node_messaging.h +++ b/src/node_messaging.h @@ -17,6 +17,9 @@ class MessagePort; // Represents a single communication message. class Message : public MemoryRetainer { public: + // Create a Message with a specific underlying payload, in the format of the + // V8 ValueSerializer API. If `payload` is empty, this message indicates + // that the receiving message port should close itself. explicit Message(MallocedBuffer&& payload = MallocedBuffer()); Message(Message&& other) = default; @@ -24,6 +27,10 @@ class Message : public MemoryRetainer { Message& operator=(const Message&) = delete; Message(const Message&) = delete; + // Whether this is a message indicating that the port is to be closed. + // This is the last message to be received by a MessagePort. + bool IsCloseMessage() const; + // Deserialize the contained JS value. May only be called once, and only // after Serialize() has been called (e.g. by another thread). v8::MaybeLocal Deserialize(Environment* env, @@ -89,10 +96,6 @@ class MessagePortData : public MemoryRetainer { // This may be called from any thread. void AddToIncomingQueue(Message&& message); - // Returns true if and only this MessagePort is currently not entangled - // with another message port. - bool IsSiblingClosed() const; - // Turns `a` and `b` into siblings, i.e. connects the sending side of one // to the receiving side of the other. This is not thread-safe. static void Entangle(MessagePortData* a, MessagePortData* b); @@ -109,10 +112,6 @@ class MessagePortData : public MemoryRetainer { SET_SELF_SIZE(MessagePortData) private: - // After disentangling this message port, the owner handle (if any) - // is asynchronously triggered, so that it can close down naturally. - void PingOwnerAfterDisentanglement(); - // This mutex protects all fields below it, with the exception of // sibling_. mutable Mutex mutex_; @@ -178,7 +177,6 @@ class MessagePort : public HandleWrap { // messages. std::unique_ptr Detach(); - bool IsSiblingClosed() const; void Close( v8::Local close_callback = v8::Local()) override; diff --git a/test/parallel/test-worker-message-port-message-before-close.js b/test/parallel/test-worker-message-port-message-before-close.js new file mode 100644 index 00000000000000..ecaad9c8767a93 --- /dev/null +++ b/test/parallel/test-worker-message-port-message-before-close.js @@ -0,0 +1,38 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const { once } = require('events'); +const { Worker, MessageChannel } = require('worker_threads'); + +// This is a regression test for the race condition underlying +// https://github.com/nodejs/node/issues/22762. +// It ensures that all messages send before a MessagePort#close() call are +// received. Previously, what could happen was a race condition like this: +// - Thread 1 sends message A +// - Thread 2 begins receiving/emitting message A +// - Thread 1 sends message B +// - Thread 1 closes its side of the channel +// - Thread 2 finishes receiving/emitting message A +// - Thread 2 sees that the port should be closed +// - Thread 2 closes the port, discarding message B in the process. + +async function test() { + const worker = new Worker(` + require('worker_threads').parentPort.on('message', ({ port }) => { + port.postMessage('firstMessage'); + port.postMessage('lastMessage'); + port.close(); + }); + `, { eval: true }); + + for (let i = 0; i < 10000; i++) { + const { port1, port2 } = new MessageChannel(); + worker.postMessage({ port: port2 }, [ port2 ]); + await once(port1, 'message'); // 'complexObject' + assert.deepStrictEqual(await once(port1, 'message'), ['lastMessage']); + } + + worker.terminate(); +} + +test().then(common.mustCall()); From 5e0e3df0776c7f65407b3ab531bb74e89a33edaa Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Wed, 15 May 2019 01:34:52 +0200 Subject: [PATCH 3/3] test: un-mark worker syntax error tests as flaky These tests should be fixed now. --- test/parallel/parallel.status | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/parallel/parallel.status b/test/parallel/parallel.status index c0a3f2a4c0000f..965e378bfd952b 100644 --- a/test/parallel/parallel.status +++ b/test/parallel/parallel.status @@ -14,8 +14,6 @@ test-tls-enable-trace-cli: PASS,FLAKY [$system==win32] test-http2-pipe: PASS,FLAKY -test-worker-syntax-error: PASS,FLAKY -test-worker-syntax-error-file: PASS,FLAKY # https://github.com/nodejs/node/issues/23277 test-worker-memory: PASS,FLAKY # https://github.com/nodejs/node/issues/20750