@@ -63,6 +63,8 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
6363 timeout ?: number ;
6464 } > ( ) ;
6565
66+ readonly #queueAbortListenerCleanupFunctions = new Set < ( ) => void > ( ) ;
67+
6668 /**
6769 Get or set the default timeout for all tasks. Can be changed at runtime.
6870
@@ -452,7 +454,11 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
452454 // Create a unique symbol for tracking this task
453455 const taskSymbol = Symbol ( `task-${ options . id } ` ) ;
454456
455- this . #queue. enqueue ( async ( ) => {
457+ let cleanupQueueAbortHandler = ( ) => undefined ;
458+ const run = async ( ) => {
459+ // Task is now running — remove the queued-state abort listener
460+ cleanupQueueAbortHandler ( ) ;
461+
456462 this . #pending++ ;
457463
458464 // Track this running task
@@ -522,7 +528,44 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
522528 this . #next( ) ;
523529 } ) ;
524530 }
525- } , options ) ;
531+ } ;
532+
533+ this . #queue. enqueue ( run , options ) ;
534+
535+ const removeQueuedTask = ( ) => {
536+ if ( this . #queue instanceof PriorityQueue ) {
537+ this . #queue. remove ( run ) ;
538+ return ;
539+ }
540+
541+ this . #queue. remove ?.( options . id ! ) ; // Intentionally best-effort: queued abort removal is only supported for queue classes that implement `.remove()`.
542+ } ;
543+
544+ // Handle abort while task is waiting in the queue
545+ if ( options . signal ) {
546+ const { signal} = options ;
547+
548+ const queueAbortHandler = ( ) => {
549+ cleanupQueueAbortHandler ( ) ;
550+ removeQueuedTask ( ) ;
551+ reject ( signal . reason ) ;
552+ this . #tryToStartAnother( ) ;
553+ this . emit ( 'next' ) ;
554+ } ;
555+
556+ cleanupQueueAbortHandler = ( ) => {
557+ signal . removeEventListener ( 'abort' , queueAbortHandler ) ;
558+ this . #queueAbortListenerCleanupFunctions. delete ( cleanupQueueAbortHandler ) ;
559+ } ;
560+
561+ if ( signal . aborted ) {
562+ queueAbortHandler ( ) ;
563+ return ;
564+ }
565+
566+ signal . addEventListener ( 'abort' , queueAbortHandler , { once : true } ) ;
567+ this . #queueAbortListenerCleanupFunctions. add ( cleanupQueueAbortHandler ) ;
568+ }
526569
527570 this . emit ( 'add' ) ;
528571
@@ -571,6 +614,10 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
571614 Clear the queue.
572615 */
573616 clear ( ) : void {
617+ for ( const cleanupQueueAbortHandler of this . #queueAbortListenerCleanupFunctions) {
618+ cleanupQueueAbortHandler ( ) ;
619+ }
620+
574621 this . #queue = new this . #queueClass( ) ;
575622
576623 // Clear interval timer since queue is now empty (consistent with #tryToStartAnother)
0 commit comments