Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
workers: initial implementation
  • Loading branch information
petkaantonov committed Feb 13, 2016
commit 7ece8d71e0de3dea24a5cbb3fac2e410e1e48f76
216 changes: 216 additions & 0 deletions lib/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
'use strict';

if (!process.features.experimental_workers) {
throw new Error('Experimental workers are not enabled');
}

const util = require('util');
const assert = require('assert');
const EventEmitter = require('events');
const WorkerContextBinding = process.binding('worker_context');
const JSONStringify = function(value) {
if (value === undefined) value = null;
return JSON.stringify(value);
};
const JSONParse = JSON.parse;
const EMPTY_ARRAY = [];

const workerContextSymbol = Symbol('workerContext');
const installEventsSymbol = Symbol('installEvents');
const checkAliveSymbol = Symbol('checkAlive');
const initSymbol = WorkerContextBinding.initSymbol;

const builtinErrorTypes = new Map([
Error, SyntaxError, RangeError, URIError, TypeError, EvalError, ReferenceError
].map(function(Type) {
return [Type.name, Type];
}));

const Worker = WorkerContextBinding.JsConstructor;
util.inherits(Worker, EventEmitter);

Worker.prototype[initSymbol] = function(entryModulePath, options) {
if (typeof entryModulePath !== 'string')
throw new TypeError('entryModulePath must be a string');
EventEmitter.call(this);
options = Object(options);
const keepAlive =
options.keepAlive === undefined ? true : !!options.keepAlive;
const evalCode = !!options.eval;
const userData = JSONStringify(options.data);
this[workerContextSymbol] =
new WorkerContextBinding.WorkerContext(entryModulePath, {
keepAlive: keepAlive,
userData: userData,
eval: evalCode
});
this[installEventsSymbol]();
};

Worker.prototype[installEventsSymbol] = function() {
const workerObject = this;
const workerContext = this[workerContextSymbol];

const onerror = function(payload) {
var ErrorConstructor = builtinErrorTypes.get(payload.builtinType);
if (typeof ErrorConstructor !== 'function')
ErrorConstructor = Error;
const error = new ErrorConstructor(payload.message);
error.stack = payload.stack;
util._extend(error, payload.additionalProperties);
workerObject.emit('error', error);
};

workerContext._onexit = function(exitCode) {
workerObject[workerContextSymbol] = null;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe use arrow functions for these events so you don't have to do const workerObject = this; ?

workerObject.emit('exit', exitCode);
};

workerContext._onmessage = function(payload, messageType) {
payload = JSONParse(payload);
switch (messageType) {
case WorkerContextBinding.USER:
return workerObject.emit('message', payload);
case WorkerContextBinding.EXCEPTION:
return onerror(payload);
default:
assert.fail('unreachable');
}
};
};

Worker.prototype[checkAliveSymbol] = function() {
if (!this[workerContextSymbol])
throw new RangeError('this worker has been terminated');
};

Worker.prototype.postMessage = function(payload) {
this[checkAliveSymbol]();
this[workerContextSymbol].postMessage(JSONStringify(payload),
EMPTY_ARRAY,
WorkerContextBinding.USER);
};

Worker.prototype.terminate = function(callback) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be naive, but maybe we could use the same vocab as child_process?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tflanagan depends on what you want to be closer to: node child_process or Web Workers.

this[checkAliveSymbol]();
const context = this[workerContextSymbol];
this[workerContextSymbol] = null;
if (typeof callback === 'function') {
this.once('exit', function(exitCode) {
callback(null, exitCode);
});
}
context.terminate();
};

Worker.prototype.ref = function() {
this[checkAliveSymbol]();
this[workerContextSymbol].ref();
};

Worker.prototype.unref = function() {
this[checkAliveSymbol]();
this[workerContextSymbol].unref();
};

if (process.isWorkerThread) {
const postMessage = function(payload, transferList, type) {
if (!Array.isArray(transferList))
throw new TypeError('transferList must be an array');

WorkerContextBinding.workerWrapper._postMessage(JSONStringify(payload),
transferList,
type);
};
const workerFatalError = function(er) {
const defaultStack = null;
const defaultMessage = '[toString() conversion failed]';
const defaultBuiltinType = 'Error';

var message = defaultMessage;
var builtinType = defaultBuiltinType;
var stack = defaultStack;
var additionalProperties = {};

// As this is a fatal error handler we cannot throw any new errors here
// but should still try to extract as much info as possible from the
// cause.
if (er instanceof Error) {
// .name can be a getter that throws.
try {
builtinType = er.name;
} catch (ignore) {}

if (typeof builtinType !== 'string')
builtinType = defaultBuiltinType;

// .stack can be a getter that throws.
try {
stack = er.stack;
} catch (ignore) {}

if (typeof stack !== 'string')
stack = defaultStack;

try {
// Get inherited enumerable properties.
// .name, .stack and .message are all non-enumerable
for (var key in er)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be const

additionalProperties[key] = er[key];
// The message delivery must always succeed, otherwise the real cause
// of this fatal error is masked.
JSONStringify(additionalProperties);
} catch (e) {
additionalProperties = {};
}
}

try {
// .message can be a getter that throws or the call to toString may fail.
if (er instanceof Error) {
message = er.message;
if (typeof message !== 'string')
message = '' + er;
} else {
message = '' + er;
}
} catch (e) {
message = defaultMessage;
}

postMessage({
message: message,
stack: stack,
additionalProperties: additionalProperties,
builtinType: builtinType
}, EMPTY_ARRAY, WorkerContextBinding.EXCEPTION);
};

util._extend(Worker, EventEmitter.prototype);
EventEmitter.call(Worker);

WorkerContextBinding.workerWrapper._onmessage =
function(payload, messageType) {
payload = JSONParse(payload);
switch (messageType) {
case WorkerContextBinding.USER:
return Worker.emit('message', payload);
default:
assert.fail('unreachable');
}
};

Worker.postMessage = function(payload) {
postMessage(payload, EMPTY_ARRAY, WorkerContextBinding.USER);
};

Object.defineProperty(Worker, '_workerFatalError', {
configurable: true,
writable: false,
enumerable: false,
value: workerFatalError
});
}


module.exports = Worker;
14 changes: 13 additions & 1 deletion node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
'lib/v8.js',
'lib/vm.js',
'lib/zlib.js',
'lib/worker.js',
'lib/internal/child_process.js',
'lib/internal/cluster.js',
'lib/internal/freelist.js',
Expand Down Expand Up @@ -139,6 +140,7 @@
'src/node_watchdog.cc',
'src/node_zlib.cc',
'src/node_i18n.cc',
'src/notification-channel.cc',
'src/persistent-handle-cleanup.cc',
'src/pipe_wrap.cc',
'src/signal_wrap.cc',
Expand All @@ -152,6 +154,7 @@
'src/process_wrap.cc',
'src/udp_wrap.cc',
'src/uv.cc',
'src/worker.cc',
# headers to make for a more pleasant IDE experience
'src/async-wrap.h',
'src/async-wrap-inl.h',
Expand All @@ -176,8 +179,10 @@
'src/node_wrap.h',
'src/node_revert.h',
'src/node_i18n.h',
'src/notification-channel.h',
'src/persistent-handle-cleanup.h',
'src/pipe_wrap.h',
'src/producer-consumer-queue.h',
'src/tty_wrap.h',
'src/tcp_wrap.h',
'src/udp_wrap.h',
Expand All @@ -192,6 +197,7 @@
'src/util-inl.h',
'src/util.cc',
'src/string_search.cc',
'src/worker.h',
'deps/http_parser/http_parser.h',
'deps/v8/include/v8.h',
'deps/v8/include/v8-debug.h',
Expand All @@ -210,6 +216,11 @@
'V8_DEPRECATION_WARNINGS=1',
],

'xcode_settings': {
'OTHER_LDFLAGS': [
'-stdlib=libc++',
],
},

'conditions': [
[ 'node_tag!=""', {
Expand Down Expand Up @@ -682,7 +693,8 @@
'dependencies': [
'deps/gtest/gtest.gyp:gtest',
'deps/v8/tools/gyp/v8.gyp:v8',
'deps/v8/tools/gyp/v8.gyp:v8_libplatform'
'deps/v8/tools/gyp/v8.gyp:v8_libplatform',
'deps/uv/uv.gyp:libuv',
],
'include_dirs': [
'src',
Expand Down
29 changes: 22 additions & 7 deletions src/async-wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ using v8::Isolate;
using v8::Local;
using v8::Object;
using v8::RetainedObjectInfo;
using v8::TryCatch;
using v8::Value;

namespace node {
Expand Down Expand Up @@ -175,6 +174,8 @@ void LoadAsyncWrapperInfo(Environment* env) {
Local<Value> AsyncWrap::MakeCallback(const Local<Function> cb,
int argc,
Local<Value>* argv) {
if (!env()->CanCallIntoJs())
return Undefined(env()->isolate());
CHECK(env()->context() == env()->isolate()->GetCurrentContext());

Local<Function> pre_fn = env()->async_hooks_pre_function();
Expand All @@ -200,22 +201,33 @@ Local<Value> AsyncWrap::MakeCallback(const Local<Function> cb,
Local<Value> enter_v = domain->Get(env()->enter_string());
if (enter_v->IsFunction()) {
if (enter_v.As<Function>()->Call(domain, 0, nullptr).IsEmpty()) {
FatalError("node::AsyncWrap::MakeCallback",
if (env()->CanCallIntoJs())
FatalError("node::AsyncWrap::MakeCallback",
"domain enter callback threw, please report this");
else
return Undefined(env()->isolate());
}
}
}

if (ran_init_callback() && !pre_fn.IsEmpty()) {
if (pre_fn->Call(context, 0, nullptr).IsEmpty())
FatalError("node::AsyncWrap::MakeCallback", "pre hook threw");
if (pre_fn->Call(context, 0, nullptr).IsEmpty()) {
if (env()->CanCallIntoJs())
FatalError("node::AsyncWrap::MakeCallback", "pre hook threw");
else
return Undefined(env()->isolate());
}
}

Local<Value> ret = cb->Call(context, argc, argv);

if (ran_init_callback() && !post_fn.IsEmpty()) {
if (post_fn->Call(context, 0, nullptr).IsEmpty())
FatalError("node::AsyncWrap::MakeCallback", "post hook threw");
if (post_fn->Call(context, 0, nullptr).IsEmpty()) {
if (env()->CanCallIntoJs())
FatalError("node::AsyncWrap::MakeCallback", "post hook threw");
else
return Undefined(env()->isolate());
}
}

if (ret.IsEmpty()) {
Expand All @@ -226,8 +238,11 @@ Local<Value> AsyncWrap::MakeCallback(const Local<Function> cb,
Local<Value> exit_v = domain->Get(env()->exit_string());
if (exit_v->IsFunction()) {
if (exit_v.As<Function>()->Call(domain, 0, nullptr).IsEmpty()) {
FatalError("node::AsyncWrap::MakeCallback",
if (env()->CanCallIntoJs())
FatalError("node::AsyncWrap::MakeCallback",
"domain exit callback threw, please report this");
else
return Undefined(env()->isolate());
}
}
}
Expand Down
Loading