From 6019060cbb0620d685c8a106a4184475420e922b Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Wed, 15 May 2019 01:24:57 +0200 Subject: [PATCH] worker: use special message as MessagePort close command When a `MessagePort` connected to another `MessagePort` closes, the latter `MessagePort` will be closed as well. Until now, this is done by testing whether the ports are still entangled after processing messages. This leaves open a race condition window in which messages sent just before the closure can be lost when timing is unfortunate. (A description of the timing is in the test file.) This can be addressed by using a special message instead, which is the last message received by a `MessagePort`. This way, all previously sent messages are processed first. Fixes: https://github.com/nodejs/node/issues/22762 PR-URL: https://github.com/nodejs/node/pull/27705 Reviewed-By: Rich Trott Reviewed-By: Colin Ihrig --- src/node_messaging.cc | 53 +++++++++---------- src/node_messaging.h | 16 +++--- ...orker-message-port-message-before-close.js | 38 +++++++++++++ 3 files changed, 71 insertions(+), 36 deletions(-) create mode 100644 test/parallel/test-worker-message-port-message-before-close.js diff --git a/src/node_messaging.cc b/src/node_messaging.cc index b9212ba272d0fe..98ef42df758fd9 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -40,6 +40,10 @@ namespace worker { Message::Message(MallocedBuffer&& buffer) : main_message_buf_(std::move(buffer)) {} +bool Message::IsCloseMessage() const { + return main_message_buf_.data == nullptr; +} + namespace { // This is used to tell V8 how to read transferred host objects, like other @@ -91,6 +95,8 @@ class DeserializerDelegate : public ValueDeserializer::Delegate { MaybeLocal Message::Deserialize(Environment* env, Local context) { + CHECK(!IsCloseMessage()); + EscapableHandleScope handle_scope(env->isolate()); Context::Scope context_scope(context); @@ -395,6 +401,7 @@ Maybe Message::Serialize(Environment* env, // The serializer gave us a buffer allocated using `malloc()`. std::pair data = serializer.Release(); + CHECK_NOT_NULL(data.first); main_message_buf_ = MallocedBuffer(reinterpret_cast(data.first), data.second); return Just(true); @@ -430,11 +437,6 @@ void MessagePortData::AddToIncomingQueue(Message&& message) { } } -bool MessagePortData::IsSiblingClosed() const { - Mutex::ScopedLock lock(*sibling_mutex_); - return sibling_ == nullptr; -} - void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) { CHECK_NULL(a->sibling_); CHECK_NULL(b->sibling_); @@ -443,12 +445,6 @@ void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) { a->sibling_mutex_ = b->sibling_mutex_; } -void MessagePortData::PingOwnerAfterDisentanglement() { - Mutex::ScopedLock lock(mutex_); - if (owner_ != nullptr) - owner_->TriggerAsync(); -} - void MessagePortData::Disentangle() { // Grab a copy of the sibling mutex, then replace it so that each sibling // has its own sibling_mutex_ now. @@ -462,11 +458,12 @@ void MessagePortData::Disentangle() { sibling_ = nullptr; } - // We close MessagePorts after disentanglement, so we trigger the - // corresponding uv_async_t to let them know that this happened. - PingOwnerAfterDisentanglement(); + // We close MessagePorts after disentanglement, so we enqueue a corresponding + // message and trigger the corresponding uv_async_t to let them know that + // this happened. + AddToIncomingQueue(Message()); if (sibling != nullptr) { - sibling->PingOwnerAfterDisentanglement(); + sibling->AddToIncomingQueue(Message()); } } @@ -590,14 +587,25 @@ void MessagePort::OnMessage() { Debug(this, "MessagePort has message, receiving = %d", static_cast(receiving_messages_)); - if (!receiving_messages_) - break; - if (data_->incoming_messages_.empty()) + // We have nothing to do if: + // - There are no pending messages + // - We are not intending to receive messages, and the message we would + // receive is not the final "close" message. + if (data_->incoming_messages_.empty() || + (!receiving_messages_ && + !data_->incoming_messages_.front().IsCloseMessage())) { break; + } + received = std::move(data_->incoming_messages_.front()); data_->incoming_messages_.pop_front(); } + if (received.IsCloseMessage()) { + Close(); + return; + } + if (!env()->can_call_into_js()) { Debug(this, "MessagePort drains queue because !can_call_into_js()"); // In this case there is nothing to do but to drain the current queue. @@ -628,15 +636,6 @@ void MessagePort::OnMessage() { } } } - - if (data_ && data_->IsSiblingClosed()) { - Close(); - } -} - -bool MessagePort::IsSiblingClosed() const { - CHECK(data_); - return data_->IsSiblingClosed(); } void MessagePort::OnClose() { diff --git a/src/node_messaging.h b/src/node_messaging.h index 0a729c141088cb..08a6798e3cd3c1 100644 --- a/src/node_messaging.h +++ b/src/node_messaging.h @@ -17,6 +17,9 @@ class MessagePort; // Represents a single communication message. class Message : public MemoryRetainer { public: + // Create a Message with a specific underlying payload, in the format of the + // V8 ValueSerializer API. If `payload` is empty, this message indicates + // that the receiving message port should close itself. explicit Message(MallocedBuffer&& payload = MallocedBuffer()); Message(Message&& other) = default; @@ -24,6 +27,10 @@ class Message : public MemoryRetainer { Message& operator=(const Message&) = delete; Message(const Message&) = delete; + // Whether this is a message indicating that the port is to be closed. + // This is the last message to be received by a MessagePort. + bool IsCloseMessage() const; + // Deserialize the contained JS value. May only be called once, and only // after Serialize() has been called (e.g. by another thread). v8::MaybeLocal Deserialize(Environment* env, @@ -89,10 +96,6 @@ class MessagePortData : public MemoryRetainer { // This may be called from any thread. void AddToIncomingQueue(Message&& message); - // Returns true if and only this MessagePort is currently not entangled - // with another message port. - bool IsSiblingClosed() const; - // Turns `a` and `b` into siblings, i.e. connects the sending side of one // to the receiving side of the other. This is not thread-safe. static void Entangle(MessagePortData* a, MessagePortData* b); @@ -109,10 +112,6 @@ class MessagePortData : public MemoryRetainer { SET_SELF_SIZE(MessagePortData) private: - // After disentangling this message port, the owner handle (if any) - // is asynchronously triggered, so that it can close down naturally. - void PingOwnerAfterDisentanglement(); - // This mutex protects all fields below it, with the exception of // sibling_. mutable Mutex mutex_; @@ -178,7 +177,6 @@ class MessagePort : public HandleWrap { // messages. std::unique_ptr Detach(); - bool IsSiblingClosed() const; void Close( v8::Local close_callback = v8::Local()) override; diff --git a/test/parallel/test-worker-message-port-message-before-close.js b/test/parallel/test-worker-message-port-message-before-close.js new file mode 100644 index 00000000000000..ecaad9c8767a93 --- /dev/null +++ b/test/parallel/test-worker-message-port-message-before-close.js @@ -0,0 +1,38 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const { once } = require('events'); +const { Worker, MessageChannel } = require('worker_threads'); + +// This is a regression test for the race condition underlying +// https://github.com/nodejs/node/issues/22762. +// It ensures that all messages send before a MessagePort#close() call are +// received. Previously, what could happen was a race condition like this: +// - Thread 1 sends message A +// - Thread 2 begins receiving/emitting message A +// - Thread 1 sends message B +// - Thread 1 closes its side of the channel +// - Thread 2 finishes receiving/emitting message A +// - Thread 2 sees that the port should be closed +// - Thread 2 closes the port, discarding message B in the process. + +async function test() { + const worker = new Worker(` + require('worker_threads').parentPort.on('message', ({ port }) => { + port.postMessage('firstMessage'); + port.postMessage('lastMessage'); + port.close(); + }); + `, { eval: true }); + + for (let i = 0; i < 10000; i++) { + const { port1, port2 } = new MessageChannel(); + worker.postMessage({ port: port2 }, [ port2 ]); + await once(port1, 'message'); // 'complexObject' + assert.deepStrictEqual(await once(port1, 'message'), ['lastMessage']); + } + + worker.terminate(); +} + +test().then(common.mustCall());