Skip to content

Commit

Permalink
worker: support MessagePort passing in messages
Browse files Browse the repository at this point in the history
Support passing `MessagePort` instances through other `MessagePort`s,
as expected by the `MessagePort` spec.

Thanks to Stephen Belanger for reviewing this change in its original PR.

Refs: ayojs/ayo#106

PR-URL: #20876
Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Shingo Inoue <leko.noor@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Tiancheng "Timothy" Gu <timothygu99@gmail.com>
Reviewed-By: John-David Dalton <john.david.dalton@gmail.com>
Reviewed-By: Gus Caplan <me@gus.host>
  • Loading branch information
addaleax authored and targos committed Jun 13, 2018
1 parent 337be58 commit f447acd
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 6 deletions.
12 changes: 12 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,12 @@ An operation outside the bounds of a `Buffer` was attempted.
An attempt has been made to create a `Buffer` larger than the maximum allowed
size.

<a id="ERR_CANNOT_TRANSFER_OBJECT"></a>
### ERR_CANNOT_TRANSFER_OBJECT

The value passed to `postMessage()` contained an object that is not supported
for transferring.

<a id="ERR_CANNOT_WATCH_SIGINT"></a>
### ERR_CANNOT_WATCH_SIGINT

Expand Down Expand Up @@ -1304,6 +1310,12 @@ strict compliance with the API specification (which in some cases may accept
`func(undefined)` and `func()` are treated identically, and the
[`ERR_INVALID_ARG_TYPE`][] error code may be used instead.

<a id="ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST"></a>
### ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST

A `MessagePort` was found in the object passed to a `postMessage()` call,
but not provided in the `transferList` for that call.

<a id="ERR_MISSING_MODULE"></a>
### ERR_MISSING_MODULE

Expand Down
2 changes: 1 addition & 1 deletion doc/api/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ the [HTML structured clone algorithm][]. In particular, it may contain circular
references and objects like typed arrays that the `JSON` API is not able
to stringify.

`transferList` may be a list of `ArrayBuffer` objects.
`transferList` may be a list of `ArrayBuffer` and `MessagePort` objects.
After transferring, they will not be usable on the sending side of the channel
anymore (even if they are not contained in `value`).

Expand Down
7 changes: 6 additions & 1 deletion src/node_errors.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace node {
#define ERRORS_WITH_CODE(V) \
V(ERR_BUFFER_OUT_OF_BOUNDS, RangeError) \
V(ERR_BUFFER_TOO_LARGE, Error) \
V(ERR_CANNOT_TRANSFER_OBJECT, TypeError) \
V(ERR_CLOSED_MESSAGE_PORT, Error) \
V(ERR_CONSTRUCT_CALL_REQUIRED, Error) \
V(ERR_INDEX_OUT_OF_RANGE, RangeError) \
Expand All @@ -27,6 +28,7 @@ namespace node {
V(ERR_INVALID_TRANSFER_OBJECT, TypeError) \
V(ERR_MEMORY_ALLOCATION_FAILED, Error) \
V(ERR_MISSING_ARGS, TypeError) \
V(ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST, TypeError) \
V(ERR_MISSING_MODULE, Error) \
V(ERR_STRING_TOO_LONG, Error) \

Expand All @@ -51,11 +53,14 @@ namespace node {
// Errors with predefined static messages

#define PREDEFINED_ERROR_MESSAGES(V) \
V(ERR_CANNOT_TRANSFER_OBJECT, "Cannot transfer object of unsupported type")\
V(ERR_CLOSED_MESSAGE_PORT, "Cannot send data on closed MessagePort") \
V(ERR_CONSTRUCT_CALL_REQUIRED, "Cannot call constructor without `new`") \
V(ERR_INDEX_OUT_OF_RANGE, "Index out of range") \
V(ERR_INVALID_TRANSFER_OBJECT, "Found invalid object in transferList") \
V(ERR_MEMORY_ALLOCATION_FAILED, "Failed to allocate memory")
V(ERR_MEMORY_ALLOCATION_FAILED, "Failed to allocate memory") \
V(ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST, \
"MessagePort was found in message but not listed in transferList")

#define V(code, message) \
inline v8::Local<v8::Value> code(v8::Isolate* isolate) { \
Expand Down
80 changes: 76 additions & 4 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,27 @@ namespace {
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
class DeserializerDelegate : public ValueDeserializer::Delegate {
public:
DeserializerDelegate(Message* m, Environment* env)
: env_(env), msg_(m) {}
DeserializerDelegate(Message* m,
Environment* env,
const std::vector<MessagePort*>& message_ports)
: env_(env), msg_(m), message_ports_(message_ports) {}

MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
// Currently, only MessagePort hosts objects are supported, so identifying
// by the index in the message's MessagePort array is sufficient.
uint32_t id;
if (!deserializer->ReadUint32(&id))
return MaybeLocal<Object>();
CHECK_LE(id, message_ports_.size());
return message_ports_[id]->object();
};

ValueDeserializer* deserializer = nullptr;

private:
Environment* env_;
Message* msg_;
const std::vector<MessagePort*>& message_ports_;
};

} // anonymous namespace
Expand All @@ -58,7 +71,23 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
EscapableHandleScope handle_scope(env->isolate());
Context::Scope context_scope(context);

DeserializerDelegate delegate(this, env);
// Create all necessary MessagePort handles.
std::vector<MessagePort*> ports(message_ports_.size());
for (uint32_t i = 0; i < message_ports_.size(); ++i) {
ports[i] = MessagePort::New(env,
context,
std::move(message_ports_[i]));
if (ports[i] == nullptr) {
for (MessagePort* port : ports) {
// This will eventually release the MessagePort object itself.
port->Close();
}
return MaybeLocal<Value>();
}
}
message_ports_.clear();

DeserializerDelegate delegate(this, env, ports);
ValueDeserializer deserializer(
env->isolate(),
reinterpret_cast<const uint8_t*>(main_message_buf_.data),
Expand All @@ -83,6 +112,10 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
deserializer.ReadValue(context).FromMaybe(Local<Value>()));
}

void Message::AddMessagePort(std::unique_ptr<MessagePortData>&& data) {
message_ports_.emplace_back(std::move(data));
}

namespace {

// This tells V8 how to serialize objects that it does not understand
Expand All @@ -97,12 +130,43 @@ class SerializerDelegate : public ValueSerializer::Delegate {
env_->isolate()->ThrowException(Exception::Error(message));
}

Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
if (env_->message_port_constructor_template()->HasInstance(object)) {
return WriteMessagePort(Unwrap<MessagePort>(object));
}

THROW_ERR_CANNOT_TRANSFER_OBJECT(env_);
return Nothing<bool>();
}

void Finish() {
// Only close the MessagePort handles and actually transfer them
// once we know that serialization succeeded.
for (MessagePort* port : ports_) {
port->Close();
msg_->AddMessagePort(port->Detach());
}
}

ValueSerializer* serializer = nullptr;

private:
Maybe<bool> WriteMessagePort(MessagePort* port) {
for (uint32_t i = 0; i < ports_.size(); i++) {
if (ports_[i] == port) {
serializer->WriteUint32(i);
return Just(true);
}
}

THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);
return Nothing<bool>();
}

Environment* env_;
Local<Context> context_;
Message* msg_;
std::vector<MessagePort*> ports_;

friend class worker::Message;
};
Expand Down Expand Up @@ -131,7 +195,7 @@ Maybe<bool> Message::Serialize(Environment* env,
Local<Value> entry;
if (!transfer_list->Get(context, i).ToLocal(&entry))
return Nothing<bool>();
// Currently, we support ArrayBuffers.
// Currently, we support ArrayBuffers and MessagePorts.
if (entry->IsArrayBuffer()) {
Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
// If we cannot render the ArrayBuffer unusable in this Isolate and
Expand All @@ -144,6 +208,12 @@ Maybe<bool> Message::Serialize(Environment* env,
array_buffers.push_back(ab);
serializer.TransferArrayBuffer(id, ab);
continue;
} else if (env->message_port_constructor_template()
->HasInstance(entry)) {
MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
CHECK_NE(port, nullptr);
delegate.ports_.push_back(port);
continue;
}

THROW_ERR_INVALID_TRANSFER_OBJECT(env);
Expand All @@ -167,6 +237,8 @@ Maybe<bool> Message::Serialize(Environment* env,
contents.ByteLength() });
}

delegate.Finish();

// The serializer gave us a buffer allocated using `malloc()`.
std::pair<uint8_t*, size_t> data = serializer.Release();
main_message_buf_ =
Expand Down
5 changes: 5 additions & 0 deletions src/node_messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,14 @@ class Message {
v8::Local<v8::Value> input,
v8::Local<v8::Value> transfer_list);

// Internal method of Message that is called once serialization finishes
// and that transfers ownership of `data` to this message.
void AddMessagePort(std::unique_ptr<MessagePortData>&& data);

private:
MallocedBuffer<char> main_message_buf_;
std::vector<MallocedBuffer<char>> array_buffer_contents_;
std::vector<std::unique_ptr<MessagePortData>> message_ports_;

friend class MessagePort;
};
Expand Down
23 changes: 23 additions & 0 deletions test/parallel/test-message-port-message-port-transferring.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Flags: --experimental-worker
'use strict';
const common = require('../common');
const assert = require('assert');

const { MessageChannel } = require('worker');

{
const { port1: basePort1, port2: basePort2 } = new MessageChannel();
const {
port1: transferredPort1, port2: transferredPort2
} = new MessageChannel();

basePort1.postMessage({ transferredPort1 }, [ transferredPort1 ]);
basePort2.on('message', common.mustCall(({ transferredPort1 }) => {
transferredPort1.postMessage('foobar');
transferredPort2.on('message', common.mustCall((msg) => {
assert.strictEqual(msg, 'foobar');
transferredPort1.close(common.mustCall());
basePort1.close(common.mustCall());
}));
}));
}

0 comments on commit f447acd

Please sign in to comment.