From 2abfb2e039a57fe21d70e4ea1e7792590b6884eb Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Thu, 14 May 2026 20:44:36 -0700 Subject: [PATCH 1/3] stream: add sync iterable fast path to pipeTo Avoid normalizing sync iterable sources through from() when pipeTo() has no transforms or signal and the writer can accept sync writes. This keeps writes incremental while preserving async fallback for values that still need it. Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 --- lib/internal/streams/iter/pull.js | 60 +++++++++++++++++-- test/parallel/test-stream-iter-pipeto.js | 76 ++++++++++++++++++++++++ 2 files changed, 131 insertions(+), 5 deletions(-) diff --git a/lib/internal/streams/iter/pull.js b/lib/internal/streams/iter/pull.js index b4a7678237f465..b17c8ee93cfdee 100644 --- a/lib/internal/streams/iter/pull.js +++ b/lib/internal/streams/iter/pull.js @@ -8,6 +8,7 @@ const { ArrayBufferIsView, + ArrayIsArray, ArrayPrototypePush, ArrayPrototypeSlice, PromisePrototypeThen, @@ -38,7 +39,9 @@ const { fromSync, isSyncIterable, isAsyncIterable, + isPrimitiveChunk, isUint8ArrayBatch, + normalizeAsyncValue, } = require('internal/streams/iter/from'); const { @@ -51,7 +54,10 @@ const { } = require('internal/streams/iter/utils'); const { + kValidatedSource, kValidatedTransform, + toAsyncStreamable, + toStreamable, } = require('internal/streams/iter/types'); // ============================================================================= @@ -114,6 +120,22 @@ function parsePipeToArgs(args, requiredMethod) { }; } +function canUseSyncIterablePipeToFastPath(source, transforms, signal) { + if (signal !== undefined || + transforms.length !== 0 || + isPrimitiveChunk(source) || + ArrayIsArray(source) || + source?.[kValidatedSource] || + !isSyncIterable(source) || + isAsyncIterable(source)) { + return false; + } + + // Preserve from()'s top-level protocol precedence for custom iterables. + return typeof source[toAsyncStreamable] !== 'function' && + typeof source[toStreamable] !== 'function'; +} + // ============================================================================= // Transform Output Flattening // ============================================================================= @@ -820,12 +842,13 @@ async function pipeTo(source, ...args) { // Check for abort signal?.throwIfAborted(); - // Normalize source via from() - const normalized = from(source); + const hasWriteSync = typeof writer.writeSync === 'function'; + const useSyncIterableFastPath = + hasWriteSync && canUseSyncIterablePipeToFastPath(source, transforms, signal); + const normalized = useSyncIterableFastPath ? undefined : from(source); let totalBytes = 0; const hasWritev = typeof writer.writev === 'function'; - const hasWriteSync = typeof writer.writeSync === 'function'; const hasWritevSync = typeof writer.writevSync === 'function'; const hasEndSync = typeof writer.endSync === 'function'; // Async fallback for writeBatch when sync write fails partway through. @@ -876,8 +899,35 @@ async function pipeTo(source, ...args) { } try { - // Fast path: no transforms - iterate normalized source directly - if (transforms.length === 0) { + if (useSyncIterableFastPath) { + // Avoid from()'s async sync-iterable batching path. This keeps writes + // incremental for synchronous sources while preserving async + // normalization for non-primitive yielded values. + for (const value of source) { + if (isUint8ArrayBatch(value)) { + if (value.length > 0) { + const p = writeBatch(value); + if (p) await p; + } + continue; + } + if (isUint8Array(value)) { + const p = writeBatch([value]); + if (p) await p; + continue; + } + + const batch = []; + for await (const chunk of normalizeAsyncValue(value)) { + ArrayPrototypePush(batch, chunk); + } + if (batch.length > 0) { + const p = writeBatch(batch); + if (p) await p; + } + } + } else if (transforms.length === 0) { + // Fast path: no transforms - iterate normalized source directly if (signal) { for await (const batch of normalized) { signal.throwIfAborted(); diff --git a/test/parallel/test-stream-iter-pipeto.js b/test/parallel/test-stream-iter-pipeto.js index 9845a5ba254efb..bc6fa9d4984233 100644 --- a/test/parallel/test-stream-iter-pipeto.js +++ b/test/parallel/test-stream-iter-pipeto.js @@ -219,6 +219,79 @@ async function testPipeToSyncMinimalWriter() { assert.strictEqual(chunks.length > 0, true); } +async function testPipeToSyncIterableFastPathWritesIncrementally() { + let pulled = 0; + let firstWritePulled = 0; + const chunks = []; + function* source() { + for (let i = 0; i < 3; i++) { + pulled++; + yield new Uint8Array([0x61 + i]); + } + } + const writer = { + write: common.mustNotCall(), + writeSync(chunk) { + if (firstWritePulled === 0) { + firstWritePulled = pulled; + } + chunks.push(chunk); + return true; + }, + }; + + const totalBytes = await pipeTo(source(), writer); + assert.strictEqual(totalBytes, 3); + assert.strictEqual(firstWritePulled, 1); + assert.deepStrictEqual(chunks, [ + new Uint8Array([0x61]), + new Uint8Array([0x62]), + new Uint8Array([0x63]), + ]); +} + +async function testPipeToSyncIterableFastPathWriteFallback() { + const asyncWrites = []; + const writer = { + writeSync(chunk) { + return chunk[0] !== 0x62; + }, + async write(chunk) { + asyncWrites.push(chunk); + }, + }; + function* source() { + yield new Uint8Array([0x61]); + yield new Uint8Array([0x62]); + yield new Uint8Array([0x63]); + } + + const totalBytes = await pipeTo(source(), writer); + assert.strictEqual(totalBytes, 3); + assert.deepStrictEqual(asyncWrites, [new Uint8Array([0x62])]); +} + +async function testPipeToSyncIterableFastPathAsyncValue() { + const chunks = []; + const writer = { + write: common.mustNotCall(), + writeSync(chunk) { + chunks.push(chunk); + return true; + }, + }; + function* source() { + yield Promise.resolve('a'); + yield new Uint8Array([0x62]); + } + + const totalBytes = await pipeTo(source(), writer); + assert.strictEqual(totalBytes, 2); + const result = new TextDecoder().decode( + new Uint8Array(chunks.reduce((acc, c) => [...acc, ...c], []))); + assert.strictEqual(result, 'ab'); +} + Promise.all([ testPipeToSync(), testPipeTo(), @@ -234,4 +307,7 @@ Promise.all([ testPipeToSyncPreventClose(), testPipeToMinimalWriter(), testPipeToSyncMinimalWriter(), + testPipeToSyncIterableFastPathWritesIncrementally(), + testPipeToSyncIterableFastPathWriteFallback(), + testPipeToSyncIterableFastPathAsyncValue(), ]).then(common.mustCall()); From 004031e0c46e56d911cfbc62516c60d2c8ccf81c Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Thu, 14 May 2026 20:54:30 -0700 Subject: [PATCH 2/3] benchmark: add pipeTo sync source throughput case --- benchmark/streams/iter-throughput-pipeto.js | 27 ++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/benchmark/streams/iter-throughput-pipeto.js b/benchmark/streams/iter-throughput-pipeto.js index 117a78aead1088..819d5e22a8a272 100644 --- a/benchmark/streams/iter-throughput-pipeto.js +++ b/benchmark/streams/iter-throughput-pipeto.js @@ -6,7 +6,7 @@ const common = require('../common.js'); const { Readable, Writable, pipeline } = require('stream'); const bench = common.createBenchmark(main, { - api: ['classic', 'webstream', 'iter', 'iter-sync'], + api: ['classic', 'webstream', 'iter', 'iter-sync-source', 'iter-sync'], datasize: [1024 * 1024, 16 * 1024 * 1024, 64 * 1024 * 1024], n: [5], }, { @@ -26,6 +26,8 @@ function main({ api, datasize, n }) { return benchWebStream(chunk, datasize, n, totalOps); case 'iter': return benchIter(chunk, datasize, n, totalOps); + case 'iter-sync-source': + return benchIterSyncSource(chunk, datasize, n, totalOps); case 'iter-sync': return benchIterSync(chunk, datasize, n, totalOps); } @@ -101,6 +103,29 @@ function benchIter(chunk, datasize, n, totalOps) { })(); } +function benchIterSyncSource(chunk, datasize, n, totalOps) { + const { pipeTo } = require('stream/iter'); + + async function run() { + let remaining = datasize; + function* source() { + while (remaining > 0) { + const size = Math.min(remaining, chunk.length); + remaining -= size; + yield size === chunk.length ? chunk : chunk.subarray(0, size); + } + } + const writer = { write() {}, writeSync() { return true; } }; + await pipeTo(source(), writer); + } + + (async () => { + bench.start(); + for (let i = 0; i < n; i++) await run(); + bench.end(totalOps); + })(); +} + function benchIterSync(chunk, datasize, n, totalOps) { const { pipeToSync } = require('stream/iter'); From 42d7184961676b475dfeb53226da1ce09a1e7c6f Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Fri, 15 May 2026 08:16:24 -0700 Subject: [PATCH 3/3] stream: use ArrayFromAsync in pipeTo normalization --- lib/internal/streams/iter/pull.js | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/internal/streams/iter/pull.js b/lib/internal/streams/iter/pull.js index b17c8ee93cfdee..86a2e8c9393410 100644 --- a/lib/internal/streams/iter/pull.js +++ b/lib/internal/streams/iter/pull.js @@ -8,6 +8,7 @@ const { ArrayBufferIsView, + ArrayFromAsync, ArrayIsArray, ArrayPrototypePush, ArrayPrototypeSlice, @@ -917,10 +918,7 @@ async function pipeTo(source, ...args) { continue; } - const batch = []; - for await (const chunk of normalizeAsyncValue(value)) { - ArrayPrototypePush(batch, chunk); - } + const batch = await ArrayFromAsync(normalizeAsyncValue(value)); if (batch.length > 0) { const p = writeBatch(batch); if (p) await p;