Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
worker: move worker thread setup code into the main script
This patch directly inlines `createMessageHandler()` and
`createWorkerFatalExeception()` in the new
`lib/internal/main/worker_thread.js` since the implementation
of the two methods are related to the execution flow of
workers.
  • Loading branch information
joyeecheung committed Jan 29, 2019
commit 857d84f564a25a695789d189e5d3c8cd3616e6ab
116 changes: 102 additions & 14 deletions lib/internal/main/worker_thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,118 @@ const {
} = require('internal/bootstrap/pre_execution');

const {
getEnvMessagePort,
threadId
threadId,
getEnvMessagePort
} = internalBinding('worker');

const {
createMessageHandler,
createWorkerFatalExeception
} = require('internal/process/worker_thread_only');
messageTypes: {
// Messages that may be received by workers
LOAD_SCRIPT,
// Messages that may be posted from workers
UP_AND_RUNNING,
ERROR_MESSAGE,
COULD_NOT_SERIALIZE_ERROR,
// Messages that may be either received or posted
STDIO_PAYLOAD,
STDIO_WANTS_MORE_DATA,
},
kStdioWantsMoreDataCallback
} = require('internal/worker/io');

const {
fatalException: originalFatalException
} = require('internal/process/execution');

const publicWorker = require('worker_threads');
const debug = require('util').debuglog('worker');
debug(`[${threadId}] is setting up worker child environment`);

function prepareUserCodeExecution() {
initializeClusterIPC();
initializeESMLoader();
loadPreloadModules();
}
debug(`[${threadId}] is setting up worker child environment`);

// Set up the message port and start listening
const port = getEnvMessagePort();
port.on('message', createMessageHandler(port, prepareUserCodeExecution));
port.start();

port.on('message', (message) => {
if (message.type === LOAD_SCRIPT) {
const {
filename,
doEval,
workerData,
publicPort,
manifestSrc,
manifestURL,
hasStdin
} = message;
if (manifestSrc) {
require('internal/process/policy').setup(manifestSrc, manifestURL);
}
initializeClusterIPC();
initializeESMLoader();
loadPreloadModules();
publicWorker.parentPort = publicPort;
publicWorker.workerData = workerData;

if (!hasStdin)
process.stdin.push(null);

debug(`[${threadId}] starts worker script ${filename} ` +
`(eval = ${eval}) at cwd = ${process.cwd()}`);
port.unref();
port.postMessage({ type: UP_AND_RUNNING });
if (doEval) {
const { evalScript } = require('internal/process/execution');
evalScript('[worker eval]', filename);
} else {
process.argv[1] = filename; // script filename
require('module').runMain();
}
return;
} else if (message.type === STDIO_PAYLOAD) {
const { stream, chunk, encoding } = message;
process[stream].push(chunk, encoding);
return;
} else if (message.type === STDIO_WANTS_MORE_DATA) {
const { stream } = message;
process[stream][kStdioWantsMoreDataCallback]();
return;
}

require('assert').fail(`Unknown worker message type ${message.type}`);
});

// Overwrite fatalException
process._fatalException = createWorkerFatalExeception(port);
process._fatalException = (error) => {
debug(`[${threadId}] gets fatal exception`);
let caught = false;
try {
caught = originalFatalException.call(this, error);
} catch (e) {
error = e;
}
debug(`[${threadId}] fatal exception caught = ${caught}`);

if (!caught) {
let serialized;
try {
const { serializeError } = require('internal/error-serdes');
serialized = serializeError(error);
} catch {}
debug(`[${threadId}] fatal exception serialized = ${!!serialized}`);
if (serialized)
port.postMessage({
type: ERROR_MESSAGE,
error: serialized
});
else
port.postMessage({ type: COULD_NOT_SERIALIZE_ERROR });

const { clearAsyncIdStack } = require('internal/async_hooks');
clearAsyncIdStack();

process.exit();
}
};

markBootstrapComplete();

port.start();
107 changes: 1 addition & 106 deletions lib/internal/process/worker_thread_only.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@
// This file contains process bootstrappers that can only be
// run in the worker thread.
const {
getEnvMessagePort,
threadId
getEnvMessagePort
} = internalBinding('worker');

const {
messageTypes,
kStdioWantsMoreDataCallback,
kWaitingStreams,
ReadableWorkerStdio,
WritableWorkerStdio
Expand All @@ -18,15 +15,6 @@ const {
const {
codes: { ERR_WORKER_UNSUPPORTED_OPERATION }
} = require('internal/errors');

let debuglog;
function debug(...args) {
if (!debuglog) {
debuglog = require('util').debuglog('worker');
}
return debuglog(...args);
}

const workerStdio = {};

function initializeWorkerStdio() {
Expand All @@ -43,97 +31,6 @@ function initializeWorkerStdio() {
};
}

function createMessageHandler(port, prepareUserCodeExecution) {
const publicWorker = require('worker_threads');

return function(message) {
if (message.type === messageTypes.LOAD_SCRIPT) {
const {
filename,
doEval,
workerData,
publicPort,
manifestSrc,
manifestURL,
hasStdin
} = message;
if (manifestSrc) {
require('internal/process/policy').setup(manifestSrc, manifestURL);
}
prepareUserCodeExecution();
publicWorker.parentPort = publicPort;
publicWorker.workerData = workerData;

if (!hasStdin)
workerStdio.stdin.push(null);

debug(`[${threadId}] starts worker script ${filename} ` +
`(eval = ${eval}) at cwd = ${process.cwd()}`);
port.unref();
port.postMessage({ type: messageTypes.UP_AND_RUNNING });
if (doEval) {
const { evalScript } = require('internal/process/execution');
evalScript('[worker eval]', filename);
} else {
process.argv[1] = filename; // script filename
require('module').runMain();
}
return;
} else if (message.type === messageTypes.STDIO_PAYLOAD) {
const { stream, chunk, encoding } = message;
workerStdio[stream].push(chunk, encoding);
return;
} else if (message.type === messageTypes.STDIO_WANTS_MORE_DATA) {
const { stream } = message;
workerStdio[stream][kStdioWantsMoreDataCallback]();
return;
}

require('assert').fail(`Unknown worker message type ${message.type}`);
};
}

// XXX(joyeecheung): this has to be returned as an anonymous function
// wrapped in a closure, see the comment of the original
// process._fatalException in lib/internal/process/execution.js
function createWorkerFatalExeception(port) {
const {
fatalException: originalFatalException
} = require('internal/process/execution');

return (error) => {
debug(`[${threadId}] gets fatal exception`);
let caught = false;
try {
caught = originalFatalException.call(this, error);
} catch (e) {
error = e;
}
debug(`[${threadId}] fatal exception caught = ${caught}`);

if (!caught) {
let serialized;
try {
const { serializeError } = require('internal/error-serdes');
serialized = serializeError(error);
} catch {}
debug(`[${threadId}] fatal exception serialized = ${!!serialized}`);
if (serialized)
port.postMessage({
type: messageTypes.ERROR_MESSAGE,
error: serialized
});
else
port.postMessage({ type: messageTypes.COULD_NOT_SERIALIZE_ERROR });

const { clearAsyncIdStack } = require('internal/async_hooks');
clearAsyncIdStack();

process.exit();
}
};
}

// The execution of this function itself should not cause any side effects.
function wrapProcessMethods(binding) {
function umask(mask) {
Expand All @@ -150,7 +47,5 @@ function wrapProcessMethods(binding) {

module.exports = {
initializeWorkerStdio,
createMessageHandler,
createWorkerFatalExeception,
wrapProcessMethods
};