Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
worker: allow transferring/cloning generic BaseObjects
Extend support for transferring objects à la `MessagePort` to other
types of `BaseObject` subclasses, as well as implement cloning
support for cases in which destructive transferring is not needed
or optional.

PR-URL: #33772
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
  • Loading branch information
addaleax committed Jun 19, 2020
commit 3f52842a6517ebd32240192fbce44904d1ec7f26
5 changes: 3 additions & 2 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -1584,8 +1584,9 @@ is thrown if a required option is missing.
<a id="ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST"></a>
### `ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST`

A `MessagePort` was found in the object passed to a `postMessage()` call,
but not provided in the `transferList` for that call.
An object that needs to be explicitly listed in the `transferList` argument
was found in the object passed to a `postMessage()` call, but not provided in
the `transferList` for that call. Usually, this is a `MessagePort`.

<a id="ERR_MISSING_PASSPHRASE"></a>
### `ERR_MISSING_PASSPHRASE`
Expand Down
38 changes: 37 additions & 1 deletion src/base_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ class Environment;
template <typename T, bool kIsWeak>
class BaseObjectPtrImpl;

namespace worker {
class TransferData;
}

class BaseObject : public MemoryRetainer {
public:
enum InternalFields { kSlot, kInternalFieldCount };
Expand Down Expand Up @@ -101,7 +105,39 @@ class BaseObject : public MemoryRetainer {
static v8::Local<v8::FunctionTemplate> GetConstructorTemplate(
Environment* env);

protected:
// Interface for transferring BaseObject instances using the .postMessage()
// method of MessagePorts (and, by extension, Workers).
// GetTransferMode() returns a transfer mode that indicates how to deal with
// the current object:
// - kUntransferable:
// No transfer is possible, either because this type of BaseObject does
// not know how to be transfered, or because it is not in a state in
// which it is possible to do so (e.g. because it has already been
// transfered).
// - kTransferable:
// This object can be transfered in a destructive fashion, i.e. will be
// rendered unusable on the sending side of the channel in the process
// of being transfered. (In C++ this would be referred to as movable but
// not copyable.) Objects of this type need to be listed in the
// `transferList` argument of the relevant postMessage() call in order to
// make sure that they are not accidentally destroyed on the sending side.
// TransferForMessaging() will be called to get a representation of the
// object that is used for subsequent deserialization.
// - kCloneable:
// This object can be cloned without being modified.
// CloneForMessaging() will be called to get a representation of the
// object that is used for subsequent deserialization, unless the
// object is listed in transferList, in which case TransferForMessaging()
// is attempted first.
enum class TransferMode {
kUntransferable,
kTransferable,
kCloneable
};
virtual TransferMode GetTransferMode() const;
virtual std::unique_ptr<worker::TransferData> TransferForMessaging();
virtual std::unique_ptr<worker::TransferData> CloneForMessaging() const;

virtual inline void OnGCCollect();

private:
Expand Down
164 changes: 117 additions & 47 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ using v8::ValueSerializer;
using v8::WasmModuleObject;

namespace node {

BaseObject::TransferMode BaseObject::GetTransferMode() const {
return BaseObject::TransferMode::kUntransferable;
}

std::unique_ptr<worker::TransferData> BaseObject::TransferForMessaging() {
return CloneForMessaging();
}

std::unique_ptr<worker::TransferData> BaseObject::CloneForMessaging() const {
return {};
}

namespace worker {

Message::Message(MallocedBuffer<char>&& buffer)
Expand All @@ -55,21 +68,20 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
DeserializerDelegate(
Message* m,
Environment* env,
const std::vector<MessagePort*>& message_ports,
const std::vector<BaseObjectPtr<BaseObject>>& host_objects,
const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
const std::vector<CompiledWasmModule>& wasm_modules)
: message_ports_(message_ports),
: host_objects_(host_objects),
shared_array_buffers_(shared_array_buffers),
wasm_modules_(wasm_modules) {}

MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
// Currently, only MessagePort hosts objects are supported, so identifying
// by the index in the message's MessagePort array is sufficient.
// Identifying the index in the message's BaseObject array is sufficient.
uint32_t id;
if (!deserializer->ReadUint32(&id))
return MaybeLocal<Object>();
CHECK_LE(id, message_ports_.size());
return message_ports_[id]->object(isolate);
CHECK_LE(id, host_objects_.size());
return host_objects_[id]->object(isolate);
}

MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
Expand All @@ -88,7 +100,7 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
ValueDeserializer* deserializer = nullptr;

private:
const std::vector<MessagePort*>& message_ports_;
const std::vector<BaseObjectPtr<BaseObject>>& host_objects_;
const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
const std::vector<CompiledWasmModule>& wasm_modules_;
};
Expand All @@ -102,22 +114,25 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
EscapableHandleScope handle_scope(env->isolate());
Context::Scope context_scope(context);

// Create all necessary MessagePort handles.
std::vector<MessagePort*> ports(message_ports_.size());
for (uint32_t i = 0; i < message_ports_.size(); ++i) {
ports[i] = MessagePort::New(env,
context,
std::move(message_ports_[i]));
if (ports[i] == nullptr) {
for (MessagePort* port : ports) {
// This will eventually release the MessagePort object itself.
if (port != nullptr)
port->Close();
// Create all necessary objects for transferables, e.g. MessagePort handles.
std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size());
for (uint32_t i = 0; i < transferables_.size(); ++i) {
TransferData* data = transferables_[i].get();
host_objects[i] = data->Deserialize(
env, context, std::move(transferables_[i]));
if (!host_objects[i]) {
for (BaseObjectPtr<BaseObject> object : host_objects) {
if (!object) continue;

// Since creating one of the objects failed, we don't want to have the
// other objects lying around in memory. We act as if the object has
// been garbage-collected.
object->Detach();
}
return MaybeLocal<Value>();
}
}
message_ports_.clear();
transferables_.clear();

std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
// Attach all transferred SharedArrayBuffers to their new Isolate.
Expand All @@ -130,7 +145,7 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
shared_array_buffers_.clear();

DeserializerDelegate delegate(
this, env, ports, shared_array_buffers, wasm_modules_);
this, env, host_objects, shared_array_buffers, wasm_modules_);
ValueDeserializer deserializer(
env->isolate(),
reinterpret_cast<const uint8_t*>(main_message_buf_.data),
Expand All @@ -157,8 +172,8 @@ void Message::AddSharedArrayBuffer(
shared_array_buffers_.emplace_back(std::move(backing_store));
}

void Message::AddMessagePort(std::unique_ptr<MessagePortData>&& data) {
message_ports_.emplace_back(std::move(data));
void Message::AddTransferable(std::unique_ptr<TransferData>&& data) {
transferables_.emplace_back(std::move(data));
}

uint32_t Message::AddWASMModule(CompiledWasmModule&& mod) {
Expand Down Expand Up @@ -224,8 +239,8 @@ class SerializerDelegate : public ValueSerializer::Delegate {
}

Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
if (env_->message_port_constructor_template()->HasInstance(object)) {
return WriteMessagePort(Unwrap<MessagePort>(object));
if (env_->base_object_ctor_template()->HasInstance(object)) {
return WriteHostObject(Unwrap<BaseObject>(object));
}

ThrowDataCloneError(env_->clone_unsupported_type_str());
Expand Down Expand Up @@ -257,32 +272,61 @@ class SerializerDelegate : public ValueSerializer::Delegate {
void Finish() {
// Only close the MessagePort handles and actually transfer them
// once we know that serialization succeeded.
for (MessagePort* port : ports_) {
port->Close();
msg_->AddMessagePort(port->Detach());
for (uint32_t i = 0; i < host_objects_.size(); i++) {
BaseObject* host_object = host_objects_[i];
std::unique_ptr<TransferData> data;
if (i < first_cloned_object_index_)
data = host_object->TransferForMessaging();
if (!data)
data = host_object->CloneForMessaging();
CHECK(data);
msg_->AddTransferable(std::move(data));
}
}

inline void AddHostObject(BaseObject* host_object) {
// Make sure we have not started serializing the value itself yet.
CHECK_EQ(first_cloned_object_index_, SIZE_MAX);
host_objects_.push_back(host_object);
}

ValueSerializer* serializer = nullptr;

private:
Maybe<bool> WriteMessagePort(MessagePort* port) {
for (uint32_t i = 0; i < ports_.size(); i++) {
if (ports_[i] == port) {
Maybe<bool> WriteHostObject(BaseObject* host_object) {
for (uint32_t i = 0; i < host_objects_.size(); i++) {
if (host_objects_[i] == host_object) {
serializer->WriteUint32(i);
return Just(true);
}
}

THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);
return Nothing<bool>();
BaseObject::TransferMode mode = host_object->GetTransferMode();
if (mode == BaseObject::TransferMode::kUntransferable) {
ThrowDataCloneError(env_->clone_unsupported_type_str());
return Nothing<bool>();
} else if (mode == BaseObject::TransferMode::kTransferable) {
// TODO(addaleax): This message code is too specific. Fix that in a
// semver-major follow-up.
THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);
return Nothing<bool>();
}

CHECK_EQ(mode, BaseObject::TransferMode::kCloneable);
uint32_t index = host_objects_.size();
if (first_cloned_object_index_ == SIZE_MAX)
first_cloned_object_index_ = index;
serializer->WriteUint32(index);
host_objects_.push_back(host_object);
return Just(true);
}

Environment* env_;
Local<Context> context_;
Message* msg_;
std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
std::vector<MessagePort*> ports_;
std::vector<BaseObject*> host_objects_;
size_t first_cloned_object_index_ = SIZE_MAX;

friend class worker::Message;
};
Expand Down Expand Up @@ -344,8 +388,7 @@ Maybe<bool> Message::Serialize(Environment* env,
array_buffers.push_back(ab);
serializer.TransferArrayBuffer(id, ab);
continue;
} else if (env->message_port_constructor_template()
->HasInstance(entry)) {
} else if (env->base_object_ctor_template()->HasInstance(entry)) {
// Check if the source MessagePort is being transferred.
if (!source_port.IsEmpty() && entry == source_port) {
ThrowDataCloneException(
Expand All @@ -354,26 +397,34 @@ Maybe<bool> Message::Serialize(Environment* env,
"Transfer list contains source port"));
return Nothing<bool>();
}
MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
if (port == nullptr || port->IsDetached()) {
BaseObject* host_object = Unwrap<BaseObject>(entry.As<Object>());
if (env->message_port_constructor_template()->HasInstance(entry) &&
(host_object == nullptr ||
static_cast<MessagePort*>(host_object)->IsDetached())) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(
env->isolate(),
"MessagePort in transfer list is already detached"));
return Nothing<bool>();
}
if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) !=
delegate.ports_.end()) {
if (std::find(delegate.host_objects_.begin(),
delegate.host_objects_.end(),
host_object) != delegate.host_objects_.end()) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(
env->isolate(),
"Transfer list contains duplicate MessagePort"));
String::Concat(env->isolate(),
FIXED_ONE_BYTE_STRING(
env->isolate(),
"Transfer list contains duplicate "),
entry.As<Object>()->GetConstructorName()));
return Nothing<bool>();
}
delegate.ports_.push_back(port);
continue;
if (host_object != nullptr && host_object->GetTransferMode() !=
BaseObject::TransferMode::kUntransferable) {
delegate.AddHostObject(host_object);
continue;
}
}

THROW_ERR_INVALID_TRANSFER_OBJECT(env);
Expand Down Expand Up @@ -406,7 +457,7 @@ Maybe<bool> Message::Serialize(Environment* env,
void Message::MemoryInfo(MemoryTracker* tracker) const {
tracker->TrackField("array_buffers_", array_buffers_);
tracker->TrackField("shared_array_buffers", shared_array_buffers_);
tracker->TrackField("message_ports", message_ports_);
tracker->TrackField("transferables", transferables_);
}

MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
Expand Down Expand Up @@ -672,6 +723,25 @@ std::unique_ptr<MessagePortData> MessagePort::Detach() {
return std::move(data_);
}

BaseObject::TransferMode MessagePort::GetTransferMode() const {
if (IsDetached())
return BaseObject::TransferMode::kUntransferable;
return BaseObject::TransferMode::kTransferable;
}

std::unique_ptr<TransferData> MessagePort::TransferForMessaging() {
Close();
return Detach();
}

BaseObjectPtr<BaseObject> MessagePortData::Deserialize(
Environment* env,
Local<Context> context,
std::unique_ptr<TransferData> self) {
return BaseObjectPtr<MessagePort> { MessagePort::New(
env, context,
static_unique_pointer_cast<MessagePortData>(std::move(self))) };
}

Maybe<bool> MessagePort::PostMessage(Environment* env,
Local<Value> message_v,
Expand Down Expand Up @@ -699,8 +769,8 @@ Maybe<bool> MessagePort::PostMessage(Environment* env,

// Check if the target port is posted to itself.
if (data_->sibling_ != nullptr) {
for (const auto& port_data : msg.message_ports()) {
if (data_->sibling_ == port_data.get()) {
for (const auto& transferable : msg.transferables()) {
if (data_->sibling_ == transferable.get()) {
doomed = true;
ProcessEmitWarning(env, "The target port was posted to itself, and "
"the communication channel was lost");
Expand Down
Loading