diff --git a/benchmark/streams/iter-from-batching.js b/benchmark/streams/iter-from-batching.js new file mode 100644 index 00000000000000..0e09537b9eae11 --- /dev/null +++ b/benchmark/streams/iter-from-batching.js @@ -0,0 +1,100 @@ +// Measures batching behavior for stream/iter from() and fromSync() +// with plain synchronous Uint8Array iterables. +'use strict'; + +const common = require('../common.js'); +const { closeSync, openSync, writeSync, writevSync } = require('fs'); +const { devNull } = require('os'); + +const bench = common.createBenchmark(main, { + method: ['from-first-batch', 'from-sync-writev'], + chunks: [256, 4096, 16384], + chunkSize: [16], + n: [100, 1000], +}, { + flags: ['--experimental-stream-iter'], + combinationFilter({ method, chunks, n }) { + if (n === 1) { + return true; + } + if (method === 'from-first-batch') { + return n === 1000; + } + return n === 100 && chunks !== 16384; + }, + test: { + chunks: 256, + chunkSize: 16, + n: 1, + }, +}); + +function main({ method, chunks, chunkSize, n }) { + switch (method) { + case 'from-first-batch': + return benchFromFirstBatch(chunks, chunkSize, n); + case 'from-sync-writev': + return benchFromSyncWritev(chunks, chunkSize, n); + } +} + +function* source(chunks, chunk) { + for (let i = 0; i < chunks; i++) { + yield chunk; + } +} + +function benchFromFirstBatch(chunks, chunkSize, n) { + const { from } = require('stream/iter'); + const chunk = new Uint8Array(chunkSize); + let seen = 0; + + (async () => { + bench.start(); + for (let i = 0; i < n; i++) { + const iterator = from(source(chunks, chunk))[Symbol.asyncIterator](); + const { value, done } = await iterator.next(); + if (done || value.length === 0) { + throw new Error('expected a batch'); + } + seen += value.length; + } + bench.end(n); + if (seen === 0) { + throw new Error('expected chunks'); + } + })(); +} + +function benchFromSyncWritev(chunks, chunkSize, n) { + const { pipeToSync } = require('stream/iter'); + const chunk = new Uint8Array(chunkSize); + const expected = chunks * chunkSize * n; + let seen = 0; + let total = 0; + const fd = openSync(devNull, 'w'); + const writer = { + writeSync(chunk) { + writeSync(fd, chunk); + seen++; + }, + writevSync(batch) { + writevSync(fd, batch); + seen += batch.length; + }, + }; + + try { + bench.start(); + for (let i = 0; i < n; i++) { + total += pipeToSync(source(chunks, chunk), writer); + } + bench.end(chunks * n); + } finally { + closeSync(fd); + } + + if (total !== expected || seen !== chunks * n) { + throw new Error('unexpected chunk count'); + } +} diff --git a/lib/internal/streams/iter/from.js b/lib/internal/streams/iter/from.js index 5ac802a00f2d08..1efe83e9a04162 100644 --- a/lib/internal/streams/iter/from.js +++ b/lib/internal/streams/iter/from.js @@ -47,7 +47,7 @@ const { toUint8Array, } = require('internal/streams/iter/utils'); -// Maximum number of chunks to yield per batch from from(Uint8Array[]). +// Maximum number of chunks to yield per batch from from()/fromSync(). // Bounds peak memory when arrays flow through transforms, which must // allocate output for the entire batch at once. const FROM_BATCH_SIZE = 128; @@ -190,33 +190,66 @@ function isUint8ArrayBatch(value) { return true; } +function* yieldBoundedBatch(batch) { + if (batch.length === 0) { + return; + } + if (batch.length <= FROM_BATCH_SIZE) { + yield batch; + return; + } + for (let i = 0; i < batch.length; i += FROM_BATCH_SIZE) { + yield ArrayPrototypeSlice(batch, i, i + FROM_BATCH_SIZE); + } +} + /** * Normalize a sync streamable source, yielding batches of Uint8Array. * @param {Iterable} source * @yields {Uint8Array[]} */ function* normalizeSyncSource(source) { + let batch = []; + for (const value of source) { // Fast path 1: value is already a Uint8Array[] batch if (isUint8ArrayBatch(value)) { - if (value.length > 0) { - yield value; + if (batch.length > 0) { + yield batch; + batch = []; } + yield* yieldBoundedBatch(value); continue; } // Fast path 2: value is a single Uint8Array (very common) if (isUint8Array(value)) { - yield [value]; + ArrayPrototypePush(batch, value); + if (batch.length === FROM_BATCH_SIZE) { + yield batch; + batch = []; + } continue; } // Slow path: normalize the value - const batch = []; - for (const chunk of normalizeSyncValue(value)) { - ArrayPrototypePush(batch, chunk); - } if (batch.length > 0) { yield batch; + batch = []; + } + let valueBatch = []; + for (const chunk of normalizeSyncValue(value)) { + ArrayPrototypePush(valueBatch, chunk); + if (valueBatch.length === FROM_BATCH_SIZE) { + yield valueBatch; + valueBatch = []; + } } + if (valueBatch.length > 0) { + yield valueBatch; + } + } + + if (batch.length > 0) { + yield batch; } } @@ -329,36 +362,42 @@ async function* normalizeAsyncSource(source) { return; } - // Fall back to sync iteration - batch all sync values together + // Fall back to sync iteration - batch sync values together with a bound. if (isSyncIterable(source)) { - const batch = []; + let batch = []; for (const value of source) { // Fast path 1: value is already a Uint8Array[] batch if (isUint8ArrayBatch(value)) { // Flush any accumulated batch first if (batch.length > 0) { - yield ArrayPrototypeSlice(batch); - batch.length = 0; - } - if (value.length > 0) { - yield value; + yield batch; + batch = []; } + yield* yieldBoundedBatch(value); continue; } // Fast path 2: value is a single Uint8Array (very common) if (isUint8Array(value)) { ArrayPrototypePush(batch, value); + if (batch.length === FROM_BATCH_SIZE) { + yield batch; + batch = []; + } continue; } // Slow path: normalize the value - must flush and yield individually if (batch.length > 0) { - yield ArrayPrototypeSlice(batch); - batch.length = 0; + yield batch; + batch = []; } - const asyncBatch = []; + let asyncBatch = []; for await (const chunk of normalizeAsyncValue(value)) { ArrayPrototypePush(asyncBatch, chunk); + if (asyncBatch.length === FROM_BATCH_SIZE) { + yield asyncBatch; + asyncBatch = []; + } } if (asyncBatch.length > 0) { yield asyncBatch; diff --git a/test/parallel/test-stream-iter-from-coverage.js b/test/parallel/test-stream-iter-from-coverage.js index c4f622a56bd7fa..eb75bec4826206 100644 --- a/test/parallel/test-stream-iter-from-coverage.js +++ b/test/parallel/test-stream-iter-from-coverage.js @@ -31,6 +31,22 @@ async function testFromSyncSubBatching() { assert.strictEqual(totalChunks, 200); } +// fromSync: generic sync iterables of Uint8Array use bounded batches +async function testFromSyncIterableSubBatching() { + function* gen() { + for (let i = 0; i < 200; i++) { + yield new Uint8Array([i & 0xFF]); + } + } + const batches = []; + for (const batch of fromSync(gen())) { + batches.push(batch); + } + assert.strictEqual(batches.length, 2); + assert.strictEqual(batches[0].length, 128); + assert.strictEqual(batches[1].length, 72); +} + // from: Uint8Array[] with > 128 elements triggers sub-batching (async) async function testFromAsyncSubBatching() { const bigBatch = Array.from({ length: 200 }, @@ -44,6 +60,22 @@ async function testFromAsyncSubBatching() { assert.strictEqual(batches[1].length, 72); } +// from: sync iterables use bounded batches instead of one unbounded batch +async function testFromAsyncSyncIterableSubBatching() { + function* gen() { + for (let i = 0; i < 200; i++) { + yield new Uint8Array([i & 0xFF]); + } + } + const batches = []; + for await (const batch of from(gen())) { + batches.push(batch); + } + assert.strictEqual(batches.length, 2); + assert.strictEqual(batches[0].length, 128); + assert.strictEqual(batches[1].length, 72); +} + // Exact boundary: 128 elements → single batch (no split) async function testFromSubBatchingBoundary() { const exactBatch = Array.from({ length: 128 }, @@ -133,7 +165,9 @@ async function testFromSyncInvalidYield() { Promise.all([ testFromSyncSubBatching(), + testFromSyncIterableSubBatching(), testFromAsyncSubBatching(), + testFromAsyncSyncIterableSubBatching(), testFromSubBatchingBoundary(), testFromSubBatchingBoundaryPlus1(), testFromSyncDataViewInGenerator(), diff --git a/test/parallel/test-stream-iter-from-sync.js b/test/parallel/test-stream-iter-from-sync.js index a9ce0bd575abdc..d3a5cf671e10d8 100644 --- a/test/parallel/test-stream-iter-from-sync.js +++ b/test/parallel/test-stream-iter-from-sync.js @@ -66,9 +66,10 @@ function testFromSyncGenerator() { for (const batch of readable) { batches.push(batch); } - assert.strictEqual(batches.length, 2); + assert.strictEqual(batches.length, 1); + assert.strictEqual(batches[0].length, 2); assert.deepStrictEqual(batches[0][0], new Uint8Array([1, 2])); - assert.deepStrictEqual(batches[1][0], new Uint8Array([3, 4])); + assert.deepStrictEqual(batches[0][1], new Uint8Array([3, 4])); } function testFromSyncNestedIterables() { diff --git a/test/parallel/test-stream-iter-pipeto-writev.js b/test/parallel/test-stream-iter-pipeto-writev.js index 505bdd6d2b2ced..eb5284d5515de6 100644 --- a/test/parallel/test-stream-iter-pipeto-writev.js +++ b/test/parallel/test-stream-iter-pipeto-writev.js @@ -121,6 +121,27 @@ async function testPipeToSyncWritev() { assert.ok(batches.some((b) => b.length > 1)); } +// pipeToSync batches plain Uint8Array chunks for writevSync +async function testPipeToSyncPlainChunksWritev() { + const batches = []; + const writes = []; + const writer = { + writevSync(chunks) { batches.push(chunks); }, + writeSync(chunk) { writes.push(chunk); return true; }, + endSync() { return 0; }, + }; + function* source() { + yield new Uint8Array([1]); + yield new Uint8Array([2]); + yield new Uint8Array([3]); + } + const total = pipeToSync(source(), writer); + assert.strictEqual(total, 3); + assert.strictEqual(batches.length, 1); + assert.strictEqual(batches[0].length, 3); + assert.strictEqual(writes.length, 0); +} + // pipeToSync with writer that has write() and writeSync() — writeSync preferred async function testPipeToSyncWriteFallback() { const syncWrites = []; @@ -143,5 +164,6 @@ Promise.all([ testWriteSyncFailsMidBatch(), testWriteSyncAlwaysFails(), testPipeToSyncWritev(), + testPipeToSyncPlainChunksWritev(), testPipeToSyncWriteFallback(), ]).then(common.mustCall());