Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
worker: implement worker.moveMessagePortToContext()
This enables using `MessagePort`s in different `vm.Context`s,
aiding with the isolation that the `vm` module seeks to provide.

Refs: ayojs/ayo#111

PR-URL: #26497
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
addaleax committed Mar 15, 2019
commit 23bf4ce91bb1cff44c8fb88a3aef796adec39cc0
27 changes: 27 additions & 0 deletions doc/api/worker_threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,30 @@ if (isMainThread) {
}
```

## worker.moveMessagePortToContext(port, contextifiedSandbox)
<!-- YAML
added: REPLACEME
-->

* `port` {MessagePort} The message port which will be transferred.
* `contextifiedSandbox` {Object} A [contextified][] object as returned by the
`vm.createContext()` method.

* Returns: {MessagePort}

Transfer a `MessagePort` to a different [`vm`][] Context. The original `port`
object will be rendered unusable, and the returned `MessagePort` instance will
take its place.

The returned `MessagePort` will be an object in the target context, and will
inherit from its global `Object` class. Objects passed to the
[`port.onmessage()`][] listener will also be created in the target context
and inherit from its global `Object` class.

However, the created `MessagePort` will no longer inherit from
[`EventEmitter`][], and only [`port.onmessage()`][] can be used to receive
events using it.

## worker.parentPort
<!-- YAML
added: v10.5.0
Expand Down Expand Up @@ -583,6 +607,7 @@ active handle in the event system. If the worker is already `unref()`ed calling
[`Worker`]: #worker_threads_class_worker
[`cluster` module]: cluster.html
[`port.on('message')`]: #worker_threads_event_message
[`port.onmessage()`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort/onmessage
[`port.postMessage()`]: #worker_threads_port_postmessage_value_transferlist
[`process.abort()`]: process.html#process_process_abort
[`process.chdir()`]: process.html#process_process_chdir_directory
Expand All @@ -600,6 +625,7 @@ active handle in the event system. If the worker is already `unref()`ed calling
[`require('worker_threads').threadId`]: #worker_threads_worker_threadid
[`require('worker_threads').workerData`]: #worker_threads_worker_workerdata
[`trace_events`]: tracing.html
[`vm`]: vm.html
[`worker.on('message')`]: #worker_threads_event_message_1
[`worker.postMessage()`]: #worker_threads_worker_postmessage_value_transferlist
[`worker.terminate()`]: #worker_threads_worker_terminate_callback
Expand All @@ -610,4 +636,5 @@ active handle in the event system. If the worker is already `unref()`ed calling
[Web Workers]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API
[browser `MessagePort`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort
[child processes]: child_process.html
[contextified]: vm.html#vm_what_does_it_mean_to_contextify_an_object
[v8.serdes]: v8.html#v8_serialization_api
2 changes: 2 additions & 0 deletions lib/internal/worker/io.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const {
MessagePort,
MessageChannel,
drainMessagePort,
moveMessagePortToContext,
stopMessagePort
} = internalBinding('messaging');
const { threadId } = internalBinding('worker');
Expand Down Expand Up @@ -233,6 +234,7 @@ module.exports = {
kIncrementsPortRef,
kWaitingStreams,
kStdioWantsMoreDataCallback,
moveMessagePortToContext,
MessagePort,
MessageChannel,
setupPortReferencing,
Expand Down
4 changes: 3 additions & 1 deletion lib/worker_threads.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ const {

const {
MessagePort,
MessageChannel
MessageChannel,
moveMessagePortToContext,
} = require('internal/worker/io');

module.exports = {
isMainThread,
MessagePort,
MessageChannel,
moveMessagePortToContext,
threadId,
Worker,
parentPort: null,
Expand Down
39 changes: 36 additions & 3 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

#include "async_wrap-inl.h"
#include "debug_utils.h"
#include "node_contextify.h"
#include "node_buffer.h"
#include "node_errors.h"
#include "node_process.h"
#include "util.h"

using node::contextify::ContextifyContext;
using v8::Array;
using v8::ArrayBuffer;
using v8::ArrayBufferCreationMode;
Expand Down Expand Up @@ -760,6 +762,35 @@ void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
port->OnMessage();
}

void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
if (!args[0]->IsObject() ||
!env->message_port_constructor_template()->HasInstance(args[0])) {
return THROW_ERR_INVALID_ARG_TYPE(env,
"First argument needs to be a MessagePort instance");
}
MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
CHECK_NOT_NULL(port);

Local<Value> context_arg = args[1];
ContextifyContext* context_wrapper;
if (!context_arg->IsObject() ||
(context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
env, context_arg.As<Object>())) == nullptr) {
return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
}

std::unique_ptr<MessagePortData> data;
if (!port->IsDetached())
data = port->Detach();

Context::Scope context_scope(context_wrapper->context());
MessagePort* target =
MessagePort::New(env, context_wrapper->context(), std::move(data));
if (target != nullptr)
args.GetReturnValue().Set(target->object());
}

void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
Entangle(a, b->data_.get());
}
Expand Down Expand Up @@ -816,9 +847,9 @@ static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
MessagePort* port2 = MessagePort::New(env, context);
MessagePort::Entangle(port1, port2);

args.This()->Set(env->context(), env->port1_string(), port1->object())
args.This()->Set(context, env->port1_string(), port1->object())
.FromJust();
args.This()->Set(env->context(), env->port2_string(), port2->object())
args.This()->Set(context, env->port2_string(), port2->object())
.FromJust();
}

Expand All @@ -833,7 +864,7 @@ static void InitMessaging(Local<Object> target,
FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel");
Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel);
templ->SetClassName(message_channel_string);
target->Set(env->context(),
target->Set(context,
message_channel_string,
templ->GetFunction(context).ToLocalChecked()).FromJust();
}
Expand All @@ -847,6 +878,8 @@ static void InitMessaging(Local<Object> target,
// the browser equivalents do not provide them.
env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
env->SetMethod(target, "moveMessagePortToContext",
MessagePort::MoveToContext);
}

} // anonymous namespace
Expand Down
5 changes: 5 additions & 0 deletions src/node_messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,17 @@ class MessagePort : public HandleWrap {
// Stop processing messages on this port as a receiving end.
void Stop();

/* constructor */
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
/* prototype methods */
static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Drain(const v8::FunctionCallbackInfo<v8::Value>& args);

/* static */
static void MoveToContext(const v8::FunctionCallbackInfo<v8::Value>& args);

// Turns `a` and `b` into siblings, i.e. connects the sending side of one
// to the receiving side of the other. This is not thread-safe.
static void Entangle(MessagePort* a, MessagePort* b);
Expand Down
69 changes: 69 additions & 0 deletions test/parallel/test-worker-message-port-move.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/* global port */
'use strict';
const common = require('../common');
const assert = require('assert');
const vm = require('vm');
const {
MessagePort, MessageChannel, moveMessagePortToContext
} = require('worker_threads');

const context = vm.createContext();
const { port1, port2 } = new MessageChannel();
context.port = moveMessagePortToContext(port1, context);
context.global = context;
Object.assign(context, {
global: context,
assert,
MessagePort,
MessageChannel
});

vm.runInContext('(' + function() {
{
assert(port.postMessage instanceof Function);
assert(port.constructor instanceof Function);
for (let obj = port; obj !== null; obj = Object.getPrototypeOf(obj)) {
for (const key of Object.getOwnPropertyNames(obj)) {
if (typeof obj[key] === 'object' && obj[key] !== null) {
assert(obj[key] instanceof Object);
} else if (typeof obj[key] === 'function') {
assert(obj[key] instanceof Function);
}
}
}

assert(!(port instanceof MessagePort));
assert.strictEqual(port.onmessage, undefined);
port.onmessage = function({ data }) {
assert(data instanceof Object);
port.postMessage(data);
};
port.start();
}

{
let threw = false;
try {
port.postMessage(global);
} catch (e) {
assert.strictEqual(e.constructor.name, 'DOMException');
assert(e instanceof Object);
assert(e instanceof Error);
threw = true;
}
assert(threw);
}

{
const newDummyPort = new (port.constructor)();
assert(!(newDummyPort instanceof MessagePort));
assert(newDummyPort.close instanceof Function);
newDummyPort.close();
}
} + ')()', context);

port2.on('message', common.mustCall((msg) => {
assert(msg instanceof Object);
port2.close();
}));
port2.postMessage({});