Skip to content

Commit a64b316

Browse files
committed
Fix signal option not rejecting when task is aborted while queued
Fixes #241
1 parent 3bd13ea commit a64b316

4 files changed

Lines changed: 507 additions & 15 deletions

File tree

source/index.ts

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
6363
timeout?: number;
6464
}>();
6565

66+
readonly #queueAbortListenerCleanupFunctions = new Set<() => void>();
67+
6668
/**
6769
Get or set the default timeout for all tasks. Can be changed at runtime.
6870
@@ -452,7 +454,11 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
452454
// Create a unique symbol for tracking this task
453455
const taskSymbol = Symbol(`task-${options.id}`);
454456

455-
this.#queue.enqueue(async () => {
457+
let cleanupQueueAbortHandler = () => undefined;
458+
const run = async () => {
459+
// Task is now running — remove the queued-state abort listener
460+
cleanupQueueAbortHandler();
461+
456462
this.#pending++;
457463

458464
// Track this running task
@@ -522,7 +528,44 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
522528
this.#next();
523529
});
524530
}
525-
}, options);
531+
};
532+
533+
this.#queue.enqueue(run, options);
534+
535+
const removeQueuedTask = () => {
536+
if (this.#queue instanceof PriorityQueue) {
537+
this.#queue.remove(run);
538+
return;
539+
}
540+
541+
this.#queue.remove?.(options.id!); // Intentionally best-effort: queued abort removal is only supported for queue classes that implement `.remove()`.
542+
};
543+
544+
// Handle abort while task is waiting in the queue
545+
if (options.signal) {
546+
const {signal} = options;
547+
548+
const queueAbortHandler = () => {
549+
cleanupQueueAbortHandler();
550+
removeQueuedTask();
551+
reject(signal.reason);
552+
this.#tryToStartAnother();
553+
this.emit('next');
554+
};
555+
556+
cleanupQueueAbortHandler = () => {
557+
signal.removeEventListener('abort', queueAbortHandler);
558+
this.#queueAbortListenerCleanupFunctions.delete(cleanupQueueAbortHandler);
559+
};
560+
561+
if (signal.aborted) {
562+
queueAbortHandler();
563+
return;
564+
}
565+
566+
signal.addEventListener('abort', queueAbortHandler, {once: true});
567+
this.#queueAbortListenerCleanupFunctions.add(cleanupQueueAbortHandler);
568+
}
526569

527570
this.emit('add');
528571

@@ -571,6 +614,10 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
571614
Clear the queue.
572615
*/
573616
clear(): void {
617+
for (const cleanupQueueAbortHandler of this.#queueAbortListenerCleanupFunctions) {
618+
cleanupQueueAbortHandler();
619+
}
620+
574621
this.#queue = new this.#queueClass();
575622

576623
// Clear interval timer since queue is now empty (consistent with #tryToStartAnother)

source/priority-queue.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,22 @@ export default class PriorityQueue implements Queue<RunFunction, PriorityQueueOp
4040
this.enqueue(item!.run, {priority, id});
4141
}
4242

43+
remove(id: string): void;
44+
remove(run: RunFunction): void;
45+
remove(idOrRun: string | RunFunction): void {
46+
const index = this.#queue.findIndex((element: Readonly<PriorityQueueOptions & {run: RunFunction}>) => {
47+
if (typeof idOrRun === 'string') {
48+
return element.id === idOrRun;
49+
}
50+
51+
return element.run === idOrRun;
52+
});
53+
54+
if (index !== -1) {
55+
this.#queue.splice(index, 1);
56+
}
57+
}
58+
4359
dequeue(): RunFunction | undefined {
4460
const item = this.#queue.shift();
4561
return item?.run;

source/queue.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,6 @@ export type Queue<Element, Options> = {
66
dequeue: () => Element | undefined;
77
enqueue: (run: Element, options?: Partial<Options>) => void;
88
setPriority: (id: string, priority: number) => void;
9+
// TODO: Make this required in the next major version.
10+
remove?: (id: string) => void;
911
};

0 commit comments

Comments
 (0)