From 96510617ccf6e4fee58d18a9181a39e448e2def8 Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Sat, 23 May 2026 09:29:56 -0700 Subject: [PATCH] stream: settle pending stream iter reads on return Resolve pending next() calls when stream/iter push and broadcast consumers are returned, so the promises do not remain pending after iterator cleanup. Fixes: https://github.com/nodejs/node/issues/63519 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 --- lib/internal/streams/iter/broadcast.js | 1 + lib/internal/streams/iter/push.js | 8 ++++++++ test/parallel/test-stream-iter-broadcast-basic.js | 13 +++++++++++++ test/parallel/test-stream-iter-push-basic.js | 13 +++++++++++++ 4 files changed, 35 insertions(+) diff --git a/lib/internal/streams/iter/broadcast.js b/lib/internal/streams/iter/broadcast.js index e6a404729d6e0b..aa83c9636598ea 100644 --- a/lib/internal/streams/iter/broadcast.js +++ b/lib/internal/streams/iter/broadcast.js @@ -165,6 +165,7 @@ class BroadcastImpl { function detach() { state.detached = true; + state.resolve?.({ __proto__: null, done: true, value: undefined }); state.resolve = null; state.reject = null; if (self.#deleteConsumer(state)) { diff --git a/lib/internal/streams/iter/push.js b/lib/internal/streams/iter/push.js index 1c367ff02bae71..b6e450e5e4bc59 100644 --- a/lib/internal/streams/iter/push.js +++ b/lib/internal/streams/iter/push.js @@ -423,6 +423,7 @@ class PushQueue { if (this.#consumerState !== 'active') return; this.#consumerState = 'returned'; this.#cleanup(); + this.#resolvePendingReadsDone(); this.#rejectPendingWrites( new ERR_INVALID_STATE.TypeError('Stream closed by consumer')); // If closing, reject the pending end promise @@ -526,6 +527,13 @@ class PushQueue { } } + #resolvePendingReadsDone() { + while (this.#pendingReads.length > 0) { + this.#pendingReads.shift().resolve( + { __proto__: null, value: undefined, done: true }); + } + } + #rejectPendingWrites(error) { while (this.#pendingWrites.length > 0) { this.#pendingWrites.shift().reject(error); diff --git a/test/parallel/test-stream-iter-broadcast-basic.js b/test/parallel/test-stream-iter-broadcast-basic.js index ab2c81304ec2ac..32c1750fb4cbfd 100644 --- a/test/parallel/test-stream-iter-broadcast-basic.js +++ b/test/parallel/test-stream-iter-broadcast-basic.js @@ -161,6 +161,18 @@ async function testCancelWithReason() { assert.strictEqual(result.message, 'cancelled'); } +async function testPendingNextSettlesAfterReturn() { + const { broadcast: bc } = broadcast(); + const iter = bc.push()[Symbol.asyncIterator](); + + const pendingNext = iter.next(); + await iter.return(); + + const result = await pendingNext; + assert.strictEqual(result.done, true); + assert.strictEqual(result.value, undefined); +} + // ============================================================================= // Writer fail detaches consumers // ============================================================================= @@ -254,6 +266,7 @@ Promise.all([ testCancelWithoutReason(), testCancelWithReason(), testCancelWithFalsyReason(), + testPendingNextSettlesAfterReturn(), testFailDetachesConsumers(), testWriterFailIdempotent(), testLateJoinerSeesBufferedData(), diff --git a/test/parallel/test-stream-iter-push-basic.js b/test/parallel/test-stream-iter-push-basic.js index 22d5b26c830a47..713f6cc9b7d29e 100644 --- a/test/parallel/test-stream-iter-push-basic.js +++ b/test/parallel/test-stream-iter-push-basic.js @@ -135,6 +135,18 @@ async function testConsumerBreakWriteSyncReturnsFalse() { assert.strictEqual(writer.desiredSize, null); } +async function testPendingNextSettlesAfterReturn() { + const { readable } = push(); + const iter = readable[Symbol.asyncIterator](); + + const pendingNext = iter.next(); + await iter.return(); + + const result = await pendingNext; + assert.strictEqual(result.done, true); + assert.strictEqual(result.value, undefined); +} + async function testPushWithTransforms() { const upper = (chunks) => { if (chunks === null) return null; @@ -177,6 +189,7 @@ Promise.all([ testAbortSignal(), testPreAbortedSignal(), testConsumerBreakWriteSyncReturnsFalse(), + testPendingNextSettlesAfterReturn(), testPushWithTransforms(), testInvalidBackpressure(), ]).then(common.mustCall());