Skip to content

Commit 77d3f55

Browse files
committed
stream: reject pull() reads on abort
Make pull() race pending source reads against the provided AbortSignal so aborting can reject a pending next() even when the source is waiting before yielding data. Fixes: #63497 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5
1 parent a7d5446 commit 77d3f55

2 files changed

Lines changed: 109 additions & 6 deletions

File tree

lib/internal/streams/iter/pull.js

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ const {
1313
ArrayPrototypePush,
1414
ArrayPrototypeSlice,
1515
PromisePrototypeThen,
16+
PromiseWithResolvers,
17+
SafePromiseRace,
18+
SafePromisePrototypeFinally,
1619
SymbolAsyncIterator,
1720
SymbolIterator,
1821
TypedArrayPrototypeGetByteLength,
@@ -607,6 +610,77 @@ async function* applyValidatedStatefulAsyncTransform(source, transform, options)
607610
options.signal?.throwIfAborted();
608611
}
609612

613+
/**
614+
* Read one item from an async iterator, rejecting early if the signal aborts.
615+
* @param {AsyncIterator} iterator - The iterator to read from.
616+
* @param {AbortSignal|undefined} signal - Optional abort signal.
617+
* @returns {Promise<IteratorResult<Uint8Array[]>>|IteratorResult<Uint8Array[]>}
618+
*/
619+
function abortableNext(iterator, signal) {
620+
if (signal === undefined) {
621+
return iterator.next();
622+
}
623+
624+
signal.throwIfAborted();
625+
626+
const next = iterator.next();
627+
const { promise, reject } = PromiseWithResolvers();
628+
const onAbort = () => reject(signal.reason);
629+
signal.addEventListener('abort', onAbort, { __proto__: null, once: true });
630+
if (signal.aborted) {
631+
onAbort();
632+
}
633+
634+
return SafePromisePrototypeFinally(SafePromiseRace([next, promise]), () => {
635+
signal.removeEventListener('abort', onAbort);
636+
});
637+
}
638+
639+
/**
640+
* Wrap an async source so each pending read is abort-aware.
641+
* @param {AsyncIterable<Uint8Array[]>} source - The source to read from.
642+
* @param {AbortSignal|undefined} signal - Optional abort signal.
643+
* @returns {AsyncIterable<Uint8Array[]>}
644+
*/
645+
function yieldAbortable(source, signal) {
646+
if (signal === undefined) {
647+
return source;
648+
}
649+
650+
return {
651+
__proto__: null,
652+
async *[SymbolAsyncIterator]() {
653+
const iterator = source[SymbolAsyncIterator]();
654+
let completed = false;
655+
let aborted = false;
656+
657+
try {
658+
while (true) {
659+
const { done, value } = await abortableNext(iterator, signal);
660+
if (done) {
661+
completed = true;
662+
return;
663+
}
664+
signal.throwIfAborted();
665+
yield value;
666+
}
667+
} catch (error) {
668+
aborted = signal.aborted;
669+
throw error;
670+
} finally {
671+
if (!completed && typeof iterator.return === 'function') {
672+
const result = iterator.return();
673+
if (aborted) {
674+
PromisePrototypeThen(result, undefined, () => {});
675+
} else {
676+
await result;
677+
}
678+
}
679+
}
680+
},
681+
};
682+
}
683+
610684
/**
611685
* Create an async pipeline from source through transforms.
612686
* @yields {Uint8Array[]}
@@ -615,17 +689,14 @@ async function* createAsyncPipeline(source, transforms, signal) {
615689
// Check for abort
616690
signal?.throwIfAborted();
617691

618-
const normalized = source;
619-
620692
// Fast path: no transforms, just yield normalized source directly
621693
if (transforms.length === 0) {
622-
for await (const batch of normalized) {
623-
signal?.throwIfAborted();
624-
yield batch;
625-
}
694+
yield* yieldAbortable(source, signal);
626695
return;
627696
}
628697

698+
const normalized = yieldAbortable(source, signal);
699+
629700
// Create internal controller for transform cancellation.
630701
// Note: if signal was already aborted, we threw above - no need to check here.
631702
const controller = new AbortController();

test/parallel/test-stream-iter-pull-async.js

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,36 @@ async function testPullSignalAbortMidIteration() {
156156
await assert.rejects(() => iter.next(), { name: 'AbortError' });
157157
}
158158

159+
async function testPullSignalAbortWhileSourceNextPending() {
160+
const source = {
161+
async *[Symbol.asyncIterator]() {
162+
await new Promise(() => {});
163+
},
164+
};
165+
const ac = new AbortController();
166+
const iter = pull(source, { signal: ac.signal })[Symbol.asyncIterator]();
167+
const next = iter.next();
168+
ac.abort();
169+
await assert.rejects(next, { name: 'AbortError' });
170+
}
171+
172+
async function testPullSignalAbortWithTransformWhileSourceNextPending() {
173+
const source = {
174+
async *[Symbol.asyncIterator]() {
175+
await new Promise(() => {});
176+
},
177+
};
178+
const ac = new AbortController();
179+
const iter = pull(
180+
source,
181+
(chunks) => chunks,
182+
{ signal: ac.signal },
183+
)[Symbol.asyncIterator]();
184+
const next = iter.next();
185+
ac.abort();
186+
await assert.rejects(next, { name: 'AbortError' });
187+
}
188+
159189
// Pull consumer break (return()) cleans up transform signal
160190
async function testPullConsumerBreakCleanup() {
161191
let signalAborted = false;
@@ -351,6 +381,8 @@ async function testTransformOptionsNotShared() {
351381
testPullSourceError(),
352382
testTapCallbackError(),
353383
testPullSignalAbortMidIteration(),
384+
testPullSignalAbortWhileSourceNextPending(),
385+
testPullSignalAbortWithTransformWhileSourceNextPending(),
354386
testPullConsumerBreakCleanup(),
355387
testPullTransformReturnsPromise(),
356388
testPullTransformYieldsStrings(),

0 commit comments

Comments
 (0)