Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
esm/worker.js
  • Loading branch information
GeoffreyBooth committed Sep 18, 2023
commit 9b3f8cc1b3a94c0203fc27009757679a7a8c592b
36 changes: 36 additions & 0 deletions lib/internal/modules/esm/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ const {
const { initializeHooks } = require('internal/modules/esm/utils');
const { isMarkedAsUntransferable } = require('internal/buffer');

/**
* Transfers an ArrayBuffer, TypedArray, or DataView to a worker thread.
* @param {boolean} hasError - Whether an error occurred during transfer.
* @param {ArrayBuffer | TypedArray | DataView} source - The data to transfer.
*/
function transferArrayBuffer(hasError, source) {
if (hasError || source == null) { return; }
let arrayBuffer;
Expand All @@ -47,6 +52,11 @@ function transferArrayBuffer(hasError, source) {
}
}

/**
* Wraps a message with a status and body, and serializes the body if necessary.
* @param {string} status - The status of the message.
* @param {unknown} body - The body of the message.
*/
function wrapMessage(status, body) {
if (status === 'success' || body === null ||
(typeof body !== 'object' &&
Expand All @@ -73,6 +83,14 @@ function wrapMessage(status, body) {
};
}

/**
* Initializes a worker thread for a customized module loader.
* @param {SharedArrayBuffer} lock - The lock used to synchronize communication between the worker and the main thread.
* @param {MessagePort} syncCommPort - The message port used for synchronous communication between the worker and the
* main thread.
* @param {(err: Error, origin?: string) => void} errorHandler - The function to use for uncaught exceptions.
* @returns {Promise<void>} A promise that resolves when the worker thread has been initialized.
*/
async function customizedModuleWorker(lock, syncCommPort, errorHandler) {
let hooks;
let initializationError;
Expand Down Expand Up @@ -114,6 +132,9 @@ async function customizedModuleWorker(lock, syncCommPort, errorHandler) {
AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);

let immediate;
/**
* Checks for messages on the syncCommPort and handles them asynchronously.
*/
function checkForMessages() {
immediate = setImmediate(checkForMessages).unref();
// We need to let the event loop tick a few times to give the main thread a chance to send
Expand Down Expand Up @@ -147,6 +168,13 @@ async function customizedModuleWorker(lock, syncCommPort, errorHandler) {
setImmediate(() => {});
});

/**
* Handles incoming messages from the main thread or other workers.
* @param {object} options - The options object.
* @param {string} options.method - The name of the hook.
* @param {Array} options.args - The arguments to pass to the method.
* @param {MessagePort} options.port - The message port to use for communication.
*/
async function handleMessage({ method, args, port }) {
// Each potential exception needs to be caught individually so that the correct error is sent to
// the main thread.
Expand Down Expand Up @@ -205,11 +233,19 @@ async function customizedModuleWorker(lock, syncCommPort, errorHandler) {
}

/**
* Initializes a worker thread for a module with customized hooks.
* ! Run everything possible within this function so errors get reported.
* @param {{lock: SharedArrayBuffer}} workerData - The lock used to synchronize with the main thread.
* @param {MessagePort} syncCommPort - The communication port used to communicate with the main thread.
*/
module.exports = function setupModuleWorker(workerData, syncCommPort) {
const lock = new Int32Array(workerData.lock);

/**
* Handles errors that occur in the worker thread.
* @param {Error} err - The error that occurred.
* @param {string} [origin='unhandledRejection'] - The origin of the error.
*/
function errorHandler(err, origin = 'unhandledRejection') {
AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1);
AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);
Expand Down