Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 37 additions & 3 deletions lib/internal/streams/iter/consumers.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ const {
validateObject,
} = require('internal/validators');

const {
markPromiseAsHandled,
} = internalBinding('util');

const {
from,
fromSync,
Expand Down Expand Up @@ -434,6 +438,20 @@ function merge(...args) {
const ready = [];
let activeCount = normalized.length;
let waitResolve = null;
let onAbort;

if (signal) {
onAbort = () => {
if (waitResolve) {
waitResolve();
waitResolve = null;
}
};
signal.addEventListener('abort', onAbort, {
__proto__: null,
once: true,
});
}

// Called when a source's .next() settles. Pushes the result into
// the ready queue and wakes the consumer if it's waiting.
Expand Down Expand Up @@ -498,27 +516,43 @@ function merge(...args) {
if (activeCount > 0) {
await new Promise((resolve) => {
waitResolve = resolve;
if (signal?.aborted) {
waitResolve = null;
resolve();
}
});
}
}
} catch (err) {
primaryError = err;
} finally {
if (onAbort !== undefined) {
signal.removeEventListener('abort', onAbort);
}
// Clean up: return all iterators. Cleanup errors are not
// swallowed - a broken iterator.return() (e.g., failing to
// release a resource) should be visible to the caller.
await cleanupIterators(iterators, primaryError);
await cleanupIterators(
iterators,
primaryError,
signal?.aborted && primaryError === signal.reason,
);
}
},
};
}

async function cleanupIterators(iterators, primaryError) {
async function cleanupIterators(iterators, primaryError, skipAwaitCleanup) {
let cleanupError;
await SafePromiseAllReturnVoid(iterators, async (iterator) => {
if (iterator.return) {
try {
await iterator.return();
const result = iterator.return();
if (skipAwaitCleanup) {
markPromiseAsHandled(result);
} else {
await result;
}
} catch (err) {
// Keep the first cleanup error encountered.
cleanupError ??= err;
Expand Down
20 changes: 20 additions & 0 deletions test/parallel/test-stream-iter-consumers-merge.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,25 @@ async function testMergeSignalMidIteration() {
await assert.rejects(() => iter.next(), { name: 'AbortError' });
}

async function testMergeSignalDuringPendingMultiSourceRead() {
const ac = new AbortController();

async function* pending() {
await new Promise(() => {});
yield [];
}

const iter = merge(pending(), pending(), {
__proto__: null,
signal: ac.signal,
})[Symbol.asyncIterator]();

const next = iter.next();
ac.abort();

await assert.rejects(next, { name: 'AbortError' });
}

// merge() accepts string sources (normalized via from())
async function testMergeStringSources() {
const batches = [];
Expand Down Expand Up @@ -286,6 +305,7 @@ Promise.all([
testMergeSourceError(),
testMergeConsumerBreak(),
testMergeSignalMidIteration(),
testMergeSignalDuringPendingMultiSourceRead(),
testMergeStringSources(),
testMergeObjectLikeSources(),
testMergeCleanupErrorOnly(),
Expand Down
Loading