diff --git a/benchmark/streams/iter-throughput-pipeto.js b/benchmark/streams/iter-throughput-pipeto.js index 117a78aead1088..819d5e22a8a272 100644 --- a/benchmark/streams/iter-throughput-pipeto.js +++ b/benchmark/streams/iter-throughput-pipeto.js @@ -6,7 +6,7 @@ const common = require('../common.js'); const { Readable, Writable, pipeline } = require('stream'); const bench = common.createBenchmark(main, { - api: ['classic', 'webstream', 'iter', 'iter-sync'], + api: ['classic', 'webstream', 'iter', 'iter-sync-source', 'iter-sync'], datasize: [1024 * 1024, 16 * 1024 * 1024, 64 * 1024 * 1024], n: [5], }, { @@ -26,6 +26,8 @@ function main({ api, datasize, n }) { return benchWebStream(chunk, datasize, n, totalOps); case 'iter': return benchIter(chunk, datasize, n, totalOps); + case 'iter-sync-source': + return benchIterSyncSource(chunk, datasize, n, totalOps); case 'iter-sync': return benchIterSync(chunk, datasize, n, totalOps); } @@ -101,6 +103,29 @@ function benchIter(chunk, datasize, n, totalOps) { })(); } +function benchIterSyncSource(chunk, datasize, n, totalOps) { + const { pipeTo } = require('stream/iter'); + + async function run() { + let remaining = datasize; + function* source() { + while (remaining > 0) { + const size = Math.min(remaining, chunk.length); + remaining -= size; + yield size === chunk.length ? chunk : chunk.subarray(0, size); + } + } + const writer = { write() {}, writeSync() { return true; } }; + await pipeTo(source(), writer); + } + + (async () => { + bench.start(); + for (let i = 0; i < n; i++) await run(); + bench.end(totalOps); + })(); +} + function benchIterSync(chunk, datasize, n, totalOps) { const { pipeToSync } = require('stream/iter'); diff --git a/lib/internal/streams/iter/pull.js b/lib/internal/streams/iter/pull.js index b4a7678237f465..86a2e8c9393410 100644 --- a/lib/internal/streams/iter/pull.js +++ b/lib/internal/streams/iter/pull.js @@ -8,6 +8,8 @@ const { ArrayBufferIsView, + ArrayFromAsync, + ArrayIsArray, ArrayPrototypePush, ArrayPrototypeSlice, PromisePrototypeThen, @@ -38,7 +40,9 @@ const { fromSync, isSyncIterable, isAsyncIterable, + isPrimitiveChunk, isUint8ArrayBatch, + normalizeAsyncValue, } = require('internal/streams/iter/from'); const { @@ -51,7 +55,10 @@ const { } = require('internal/streams/iter/utils'); const { + kValidatedSource, kValidatedTransform, + toAsyncStreamable, + toStreamable, } = require('internal/streams/iter/types'); // ============================================================================= @@ -114,6 +121,22 @@ function parsePipeToArgs(args, requiredMethod) { }; } +function canUseSyncIterablePipeToFastPath(source, transforms, signal) { + if (signal !== undefined || + transforms.length !== 0 || + isPrimitiveChunk(source) || + ArrayIsArray(source) || + source?.[kValidatedSource] || + !isSyncIterable(source) || + isAsyncIterable(source)) { + return false; + } + + // Preserve from()'s top-level protocol precedence for custom iterables. + return typeof source[toAsyncStreamable] !== 'function' && + typeof source[toStreamable] !== 'function'; +} + // ============================================================================= // Transform Output Flattening // ============================================================================= @@ -820,12 +843,13 @@ async function pipeTo(source, ...args) { // Check for abort signal?.throwIfAborted(); - // Normalize source via from() - const normalized = from(source); + const hasWriteSync = typeof writer.writeSync === 'function'; + const useSyncIterableFastPath = + hasWriteSync && canUseSyncIterablePipeToFastPath(source, transforms, signal); + const normalized = useSyncIterableFastPath ? undefined : from(source); let totalBytes = 0; const hasWritev = typeof writer.writev === 'function'; - const hasWriteSync = typeof writer.writeSync === 'function'; const hasWritevSync = typeof writer.writevSync === 'function'; const hasEndSync = typeof writer.endSync === 'function'; // Async fallback for writeBatch when sync write fails partway through. @@ -876,8 +900,32 @@ async function pipeTo(source, ...args) { } try { - // Fast path: no transforms - iterate normalized source directly - if (transforms.length === 0) { + if (useSyncIterableFastPath) { + // Avoid from()'s async sync-iterable batching path. This keeps writes + // incremental for synchronous sources while preserving async + // normalization for non-primitive yielded values. + for (const value of source) { + if (isUint8ArrayBatch(value)) { + if (value.length > 0) { + const p = writeBatch(value); + if (p) await p; + } + continue; + } + if (isUint8Array(value)) { + const p = writeBatch([value]); + if (p) await p; + continue; + } + + const batch = await ArrayFromAsync(normalizeAsyncValue(value)); + if (batch.length > 0) { + const p = writeBatch(batch); + if (p) await p; + } + } + } else if (transforms.length === 0) { + // Fast path: no transforms - iterate normalized source directly if (signal) { for await (const batch of normalized) { signal.throwIfAborted(); diff --git a/test/parallel/test-stream-iter-pipeto.js b/test/parallel/test-stream-iter-pipeto.js index 9845a5ba254efb..bc6fa9d4984233 100644 --- a/test/parallel/test-stream-iter-pipeto.js +++ b/test/parallel/test-stream-iter-pipeto.js @@ -219,6 +219,79 @@ async function testPipeToSyncMinimalWriter() { assert.strictEqual(chunks.length > 0, true); } +async function testPipeToSyncIterableFastPathWritesIncrementally() { + let pulled = 0; + let firstWritePulled = 0; + const chunks = []; + function* source() { + for (let i = 0; i < 3; i++) { + pulled++; + yield new Uint8Array([0x61 + i]); + } + } + const writer = { + write: common.mustNotCall(), + writeSync(chunk) { + if (firstWritePulled === 0) { + firstWritePulled = pulled; + } + chunks.push(chunk); + return true; + }, + }; + + const totalBytes = await pipeTo(source(), writer); + assert.strictEqual(totalBytes, 3); + assert.strictEqual(firstWritePulled, 1); + assert.deepStrictEqual(chunks, [ + new Uint8Array([0x61]), + new Uint8Array([0x62]), + new Uint8Array([0x63]), + ]); +} + +async function testPipeToSyncIterableFastPathWriteFallback() { + const asyncWrites = []; + const writer = { + writeSync(chunk) { + return chunk[0] !== 0x62; + }, + async write(chunk) { + asyncWrites.push(chunk); + }, + }; + function* source() { + yield new Uint8Array([0x61]); + yield new Uint8Array([0x62]); + yield new Uint8Array([0x63]); + } + + const totalBytes = await pipeTo(source(), writer); + assert.strictEqual(totalBytes, 3); + assert.deepStrictEqual(asyncWrites, [new Uint8Array([0x62])]); +} + +async function testPipeToSyncIterableFastPathAsyncValue() { + const chunks = []; + const writer = { + write: common.mustNotCall(), + writeSync(chunk) { + chunks.push(chunk); + return true; + }, + }; + function* source() { + yield Promise.resolve('a'); + yield new Uint8Array([0x62]); + } + + const totalBytes = await pipeTo(source(), writer); + assert.strictEqual(totalBytes, 2); + const result = new TextDecoder().decode( + new Uint8Array(chunks.reduce((acc, c) => [...acc, ...c], []))); + assert.strictEqual(result, 'ab'); +} + Promise.all([ testPipeToSync(), testPipeTo(), @@ -234,4 +307,7 @@ Promise.all([ testPipeToSyncPreventClose(), testPipeToMinimalWriter(), testPipeToSyncMinimalWriter(), + testPipeToSyncIterableFastPathWritesIncrementally(), + testPipeToSyncIterableFastPathWriteFallback(), + testPipeToSyncIterableFastPathAsyncValue(), ]).then(common.mustCall());