@@ -35,7 +35,7 @@ const {
3535const { exitCodes : { kUnsettledTopLevelAwait } } = internalBinding ( 'errors' ) ;
3636const { URL } = require ( 'internal/url' ) ;
3737const { canParse : URLCanParse } = internalBinding ( 'url' ) ;
38- const { receiveMessageOnPort } = require ( 'worker_threads' ) ;
38+ const { receiveMessageOnPort, isMainThread } = require ( 'worker_threads' ) ;
3939const {
4040 isAnyArrayBuffer,
4141 isArrayBufferView,
@@ -481,6 +481,7 @@ class HooksProxy {
481481 * The InternalWorker instance, which lets us communicate with the loader thread.
482482 */
483483 #worker;
484+ #portToHooksThread;
484485
485486 /**
486487 * The last notification ID received from the worker. This is used to detect
@@ -499,26 +500,40 @@ class HooksProxy {
499500 #isReady = false ;
500501
501502 constructor ( ) {
502- const { InternalWorker } = require ( 'internal/worker' ) ;
503+ const { InternalWorker, hooksPort } = require ( 'internal/worker' ) ;
503504 MessageChannel ??= require ( 'internal/worker/io' ) . MessageChannel ;
504505
505506 const lock = new SharedArrayBuffer ( SHARED_MEMORY_BYTE_LENGTH ) ;
506507 this . #lock = new Int32Array ( lock ) ;
507508
508- this . #worker = new InternalWorker ( loaderWorkerId , {
509- stderr : false ,
510- stdin : false ,
511- stdout : false ,
512- trackUnmanagedFds : false ,
513- workerData : {
514- lock,
515- } ,
516- } ) ;
517- this . #worker. unref ( ) ; // ! Allows the process to eventually exit.
518- this . #worker. on ( 'exit' , process . exit ) ;
509+ if ( isMainThread ) {
510+ // Main thread is the only one that creates the internal single hooks worker
511+ this . #worker = new InternalWorker ( loaderWorkerId , {
512+ stderr : false ,
513+ stdin : false ,
514+ stdout : false ,
515+ trackUnmanagedFds : false ,
516+ workerData : {
517+ lock,
518+ } ,
519+ } ) ;
520+ this . #worker. unref ( ) ; // ! Allows the process to eventually exit.
521+ this . #worker. on ( 'exit' , process . exit ) ;
522+ this . #portToHooksThread = this . #worker;
523+ } else {
524+ this . #portToHooksThread = hooksPort ;
525+ }
519526 }
520527
521528 waitForWorker ( ) {
529+ // There is one Hooks instance for each worker thread. But only one of these Hooks instances
530+ // has an InternalWorker. That was the Hooks instance created for the main thread.
531+ // It means for all Hooks instances that are not on the main thread => they are ready because they
532+ // delegate to the single InternalWorker anyway.
533+ if ( ! isMainThread ) {
534+ return ;
535+ }
536+
522537 if ( ! this . #isReady) {
523538 const { kIsOnline } = require ( 'internal/worker' ) ;
524539 if ( ! this . #worker[ kIsOnline ] ) {
@@ -535,6 +550,23 @@ class HooksProxy {
535550 }
536551 }
537552
553+ #postMessageToWorker( method , type , transferList , ...args ) {
554+ this . waitForWorker ( ) ;
555+ MessageChannel ??= require ( 'internal/worker/io' ) . MessageChannel ;
556+ const { port1 : fromHooksThread , port2 : toHooksThread } = new MessageChannel ( ) ;
557+
558+ // Pass work to the worker.
559+ debug ( `post ${ type } message to worker` , { method, args, transferList } ) ;
560+ const usedTransferList = [ toHooksThread ] ;
561+ if ( transferList ) {
562+ ArrayPrototypePushApply ( usedTransferList , transferList ) ;
563+ }
564+ this . #portToHooksThread. postMessage (
565+ { __proto__ : null , method, args, lock : this . #lock, port : toHooksThread } , usedTransferList ) ;
566+
567+ return fromHooksThread ;
568+ }
569+
538570 /**
539571 * Invoke a remote method asynchronously.
540572 * @param {string } method Method to invoke
@@ -543,22 +575,7 @@ class HooksProxy {
543575 * @returns {Promise<any> }
544576 */
545577 async makeAsyncRequest ( method , transferList , ...args ) {
546- this . waitForWorker ( ) ;
547-
548- MessageChannel ??= require ( 'internal/worker/io' ) . MessageChannel ;
549- const asyncCommChannel = new MessageChannel ( ) ;
550-
551- // Pass work to the worker.
552- debug ( 'post async message to worker' , { method, args, transferList } ) ;
553- const finalTransferList = [ asyncCommChannel . port2 ] ;
554- if ( transferList ) {
555- ArrayPrototypePushApply ( finalTransferList , transferList ) ;
556- }
557- this . #worker. postMessage ( {
558- __proto__ : null ,
559- method, args,
560- port : asyncCommChannel . port2 ,
561- } , finalTransferList ) ;
578+ const fromHooksThread = this . #postMessageToWorker( method , 'Async' , transferList , ...args ) ;
562579
563580 if ( this . #numberOfPendingAsyncResponses++ === 0 ) {
564581 // On the next lines, the main thread will await a response from the worker thread that might
@@ -567,7 +584,11 @@ class HooksProxy {
567584 // However we want to keep the process alive until the worker thread responds (or until the
568585 // event loop of the worker thread is also empty), so we ref the worker until we get all the
569586 // responses back.
570- this . #worker. ref ( ) ;
587+ if ( this . #worker) {
588+ this . #worker. ref ( ) ;
589+ } else {
590+ this . #portToHooksThread. ref ( ) ;
591+ }
571592 }
572593
573594 let response ;
@@ -576,18 +597,25 @@ class HooksProxy {
576597 await AtomicsWaitAsync ( this . #lock, WORKER_TO_MAIN_THREAD_NOTIFICATION , this . #workerNotificationLastId) . value ;
577598 this . #workerNotificationLastId = AtomicsLoad ( this . #lock, WORKER_TO_MAIN_THREAD_NOTIFICATION ) ;
578599
579- response = receiveMessageOnPort ( asyncCommChannel . port1 ) ;
600+ response = receiveMessageOnPort ( fromHooksThread ) ;
580601 } while ( response == null ) ;
581602 debug ( 'got async response from worker' , { method, args } , this . #lock) ;
582603
583604 if ( -- this . #numberOfPendingAsyncResponses === 0 ) {
584605 // We got all the responses from the worker, its job is done (until next time).
585- this . #worker. unref ( ) ;
606+ if ( this . #worker) {
607+ this . #worker. unref ( ) ;
608+ } else {
609+ this . #portToHooksThread. unref ( ) ;
610+ }
586611 }
587612
588- const body = this . #unwrapMessage( response ) ;
589- asyncCommChannel . port1 . close ( ) ;
590- return body ;
613+ if ( response . message . status === 'exit' ) {
614+ process . exit ( response . message . body ) ;
615+ }
616+
617+ fromHooksThread . close ( ) ;
618+ return this . #unwrapMessage( response ) ;
591619 }
592620
593621 /**
@@ -598,11 +626,7 @@ class HooksProxy {
598626 * @returns {any }
599627 */
600628 makeSyncRequest ( method , transferList , ...args ) {
601- this . waitForWorker ( ) ;
602-
603- // Pass work to the worker.
604- debug ( 'post sync message to worker' , { method, args, transferList } ) ;
605- this . #worker. postMessage ( { __proto__ : null , method, args } , transferList ) ;
629+ const fromHooksThread = this . #postMessageToWorker( method , 'Sync' , transferList , ...args ) ;
606630
607631 let response ;
608632 do {
@@ -611,14 +635,15 @@ class HooksProxy {
611635 AtomicsWait ( this . #lock, WORKER_TO_MAIN_THREAD_NOTIFICATION , this . #workerNotificationLastId) ;
612636 this . #workerNotificationLastId = AtomicsLoad ( this . #lock, WORKER_TO_MAIN_THREAD_NOTIFICATION ) ;
613637
614- response = this . #worker . receiveMessageSync ( ) ;
638+ response = receiveMessageOnPort ( fromHooksThread ) ;
615639 } while ( response == null ) ;
616640 debug ( 'got sync response from worker' , { method, args } ) ;
617641 if ( response . message . status === 'never-settle' ) {
618642 process . exit ( kUnsettledTopLevelAwait ) ;
619643 } else if ( response . message . status === 'exit' ) {
620644 process . exit ( response . message . body ) ;
621645 }
646+ fromHooksThread . close ( ) ;
622647 return this . #unwrapMessage( response ) ;
623648 }
624649
0 commit comments