Skip to content

Commit

Permalink
worker: use special message as MessagePort close command
Browse files Browse the repository at this point in the history
When a `MessagePort` connected to another `MessagePort` closes, the
latter `MessagePort` will be closed as well. Until now, this is done
by testing whether the ports are still entangled after processing
messages. This leaves open a race condition window in which messages
sent just before the closure can be lost when timing is unfortunate.
(A description of the timing is in the test file.)

This can be addressed by using a special message instead, which is
the last message received by a `MessagePort`. This way, all previously
sent messages are processed first.

Fixes: #22762
PR-URL: #27705
Reviewed-By: Rich Trott <rtrott@gmail.com>
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
  • Loading branch information
addaleax authored and targos committed May 18, 2019
1 parent b7ed4d7 commit e004d42
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 36 deletions.
53 changes: 26 additions & 27 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ namespace worker {
Message::Message(MallocedBuffer<char>&& buffer)
: main_message_buf_(std::move(buffer)) {}

bool Message::IsCloseMessage() const {
return main_message_buf_.data == nullptr;
}

namespace {

// This is used to tell V8 how to read transferred host objects, like other
Expand Down Expand Up @@ -91,6 +95,8 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {

MaybeLocal<Value> Message::Deserialize(Environment* env,
Local<Context> context) {
CHECK(!IsCloseMessage());

EscapableHandleScope handle_scope(env->isolate());
Context::Scope context_scope(context);

Expand Down Expand Up @@ -395,6 +401,7 @@ Maybe<bool> Message::Serialize(Environment* env,

// The serializer gave us a buffer allocated using `malloc()`.
std::pair<uint8_t*, size_t> data = serializer.Release();
CHECK_NOT_NULL(data.first);
main_message_buf_ =
MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
return Just(true);
Expand Down Expand Up @@ -430,11 +437,6 @@ void MessagePortData::AddToIncomingQueue(Message&& message) {
}
}

bool MessagePortData::IsSiblingClosed() const {
Mutex::ScopedLock lock(*sibling_mutex_);
return sibling_ == nullptr;
}

void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
CHECK_NULL(a->sibling_);
CHECK_NULL(b->sibling_);
Expand All @@ -443,12 +445,6 @@ void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
a->sibling_mutex_ = b->sibling_mutex_;
}

void MessagePortData::PingOwnerAfterDisentanglement() {
Mutex::ScopedLock lock(mutex_);
if (owner_ != nullptr)
owner_->TriggerAsync();
}

void MessagePortData::Disentangle() {
// Grab a copy of the sibling mutex, then replace it so that each sibling
// has its own sibling_mutex_ now.
Expand All @@ -462,11 +458,12 @@ void MessagePortData::Disentangle() {
sibling_ = nullptr;
}

// We close MessagePorts after disentanglement, so we trigger the
// corresponding uv_async_t to let them know that this happened.
PingOwnerAfterDisentanglement();
// We close MessagePorts after disentanglement, so we enqueue a corresponding
// message and trigger the corresponding uv_async_t to let them know that
// this happened.
AddToIncomingQueue(Message());
if (sibling != nullptr) {
sibling->PingOwnerAfterDisentanglement();
sibling->AddToIncomingQueue(Message());
}
}

Expand Down Expand Up @@ -590,14 +587,25 @@ void MessagePort::OnMessage() {
Debug(this, "MessagePort has message, receiving = %d",
static_cast<int>(receiving_messages_));

if (!receiving_messages_)
break;
if (data_->incoming_messages_.empty())
// We have nothing to do if:
// - There are no pending messages
// - We are not intending to receive messages, and the message we would
// receive is not the final "close" message.
if (data_->incoming_messages_.empty() ||
(!receiving_messages_ &&
!data_->incoming_messages_.front().IsCloseMessage())) {
break;
}

received = std::move(data_->incoming_messages_.front());
data_->incoming_messages_.pop_front();
}

if (received.IsCloseMessage()) {
Close();
return;
}

if (!env()->can_call_into_js()) {
Debug(this, "MessagePort drains queue because !can_call_into_js()");
// In this case there is nothing to do but to drain the current queue.
Expand Down Expand Up @@ -628,15 +636,6 @@ void MessagePort::OnMessage() {
}
}
}

if (data_ && data_->IsSiblingClosed()) {
Close();
}
}

bool MessagePort::IsSiblingClosed() const {
CHECK(data_);
return data_->IsSiblingClosed();
}

void MessagePort::OnClose() {
Expand Down
16 changes: 7 additions & 9 deletions src/node_messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@ class MessagePort;
// Represents a single communication message.
class Message : public MemoryRetainer {
public:
// Create a Message with a specific underlying payload, in the format of the
// V8 ValueSerializer API. If `payload` is empty, this message indicates
// that the receiving message port should close itself.
explicit Message(MallocedBuffer<char>&& payload = MallocedBuffer<char>());

Message(Message&& other) = default;
Message& operator=(Message&& other) = default;
Message& operator=(const Message&) = delete;
Message(const Message&) = delete;

// Whether this is a message indicating that the port is to be closed.
// This is the last message to be received by a MessagePort.
bool IsCloseMessage() const;

// Deserialize the contained JS value. May only be called once, and only
// after Serialize() has been called (e.g. by another thread).
v8::MaybeLocal<v8::Value> Deserialize(Environment* env,
Expand Down Expand Up @@ -89,10 +96,6 @@ class MessagePortData : public MemoryRetainer {
// This may be called from any thread.
void AddToIncomingQueue(Message&& message);

// Returns true if and only this MessagePort is currently not entangled
// with another message port.
bool IsSiblingClosed() const;

// Turns `a` and `b` into siblings, i.e. connects the sending side of one
// to the receiving side of the other. This is not thread-safe.
static void Entangle(MessagePortData* a, MessagePortData* b);
Expand All @@ -109,10 +112,6 @@ class MessagePortData : public MemoryRetainer {
SET_SELF_SIZE(MessagePortData)

private:
// After disentangling this message port, the owner handle (if any)
// is asynchronously triggered, so that it can close down naturally.
void PingOwnerAfterDisentanglement();

// This mutex protects all fields below it, with the exception of
// sibling_.
mutable Mutex mutex_;
Expand Down Expand Up @@ -178,7 +177,6 @@ class MessagePort : public HandleWrap {
// messages.
std::unique_ptr<MessagePortData> Detach();

bool IsSiblingClosed() const;
void Close(
v8::Local<v8::Value> close_callback = v8::Local<v8::Value>()) override;

Expand Down
38 changes: 38 additions & 0 deletions test/parallel/test-worker-message-port-message-before-close.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { once } = require('events');
const { Worker, MessageChannel } = require('worker_threads');

// This is a regression test for the race condition underlying
// https://github.com/nodejs/node/issues/22762.
// It ensures that all messages send before a MessagePort#close() call are
// received. Previously, what could happen was a race condition like this:
// - Thread 1 sends message A
// - Thread 2 begins receiving/emitting message A
// - Thread 1 sends message B
// - Thread 1 closes its side of the channel
// - Thread 2 finishes receiving/emitting message A
// - Thread 2 sees that the port should be closed
// - Thread 2 closes the port, discarding message B in the process.

async function test() {
const worker = new Worker(`
require('worker_threads').parentPort.on('message', ({ port }) => {
port.postMessage('firstMessage');
port.postMessage('lastMessage');
port.close();
});
`, { eval: true });

for (let i = 0; i < 10000; i++) {
const { port1, port2 } = new MessageChannel();
worker.postMessage({ port: port2 }, [ port2 ]);
await once(port1, 'message'); // 'complexObject'
assert.deepStrictEqual(await once(port1, 'message'), ['lastMessage']);
}

worker.terminate();
}

test().then(common.mustCall());

0 comments on commit e004d42

Please sign in to comment.