Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions benchmark/streams/iter-from-batching.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Measures batching behavior for stream/iter from() and fromSync()
// with plain synchronous Uint8Array iterables.
'use strict';

const common = require('../common.js');
const { closeSync, openSync, writeSync, writevSync } = require('fs');
const { devNull } = require('os');

const bench = common.createBenchmark(main, {
method: ['from-first-batch', 'from-sync-writev'],
chunks: [256, 4096, 16384],
chunkSize: [16],
n: [100, 1000],
}, {
flags: ['--experimental-stream-iter'],
combinationFilter({ method, chunks, n }) {
if (n === 1) {
return true;
}
if (method === 'from-first-batch') {
return n === 1000;
}
return n === 100 && chunks !== 16384;
},
test: {
chunks: 256,
chunkSize: 16,
n: 1,
},
});

function main({ method, chunks, chunkSize, n }) {
switch (method) {
case 'from-first-batch':
return benchFromFirstBatch(chunks, chunkSize, n);
case 'from-sync-writev':
return benchFromSyncWritev(chunks, chunkSize, n);
}
}

function* source(chunks, chunk) {
for (let i = 0; i < chunks; i++) {
yield chunk;
}
}

function benchFromFirstBatch(chunks, chunkSize, n) {
const { from } = require('stream/iter');
const chunk = new Uint8Array(chunkSize);
let seen = 0;

(async () => {
bench.start();
for (let i = 0; i < n; i++) {
const iterator = from(source(chunks, chunk))[Symbol.asyncIterator]();
const { value, done } = await iterator.next();
if (done || value.length === 0) {
throw new Error('expected a batch');
}
seen += value.length;
}
bench.end(n);
if (seen === 0) {
throw new Error('expected chunks');
}
})();
}

function benchFromSyncWritev(chunks, chunkSize, n) {
const { pipeToSync } = require('stream/iter');
const chunk = new Uint8Array(chunkSize);
const expected = chunks * chunkSize * n;
let seen = 0;
let total = 0;
const fd = openSync(devNull, 'w');
const writer = {
writeSync(chunk) {
writeSync(fd, chunk);
seen++;
},
writevSync(batch) {
writevSync(fd, batch);
seen += batch.length;
},
};

try {
bench.start();
for (let i = 0; i < n; i++) {
total += pipeToSync(source(chunks, chunk), writer);
}
bench.end(chunks * n);
} finally {
closeSync(fd);
}

if (total !== expected || seen !== chunks * n) {
throw new Error('unexpected chunk count');
}
}
75 changes: 57 additions & 18 deletions lib/internal/streams/iter/from.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const {
toUint8Array,
} = require('internal/streams/iter/utils');

// Maximum number of chunks to yield per batch from from(Uint8Array[]).
// Maximum number of chunks to yield per batch from from()/fromSync().
// Bounds peak memory when arrays flow through transforms, which must
// allocate output for the entire batch at once.
const FROM_BATCH_SIZE = 128;
Expand Down Expand Up @@ -190,33 +190,66 @@ function isUint8ArrayBatch(value) {
return true;
}

function* yieldBoundedBatch(batch) {
if (batch.length === 0) {
return;
}
if (batch.length <= FROM_BATCH_SIZE) {
yield batch;
return;
}
for (let i = 0; i < batch.length; i += FROM_BATCH_SIZE) {
yield ArrayPrototypeSlice(batch, i, i + FROM_BATCH_SIZE);
}
}

/**
* Normalize a sync streamable source, yielding batches of Uint8Array.
* @param {Iterable} source
* @yields {Uint8Array[]}
*/
function* normalizeSyncSource(source) {
let batch = [];

for (const value of source) {
// Fast path 1: value is already a Uint8Array[] batch
if (isUint8ArrayBatch(value)) {
if (value.length > 0) {
yield value;
if (batch.length > 0) {
yield batch;
batch = [];
}
yield* yieldBoundedBatch(value);
continue;
}
// Fast path 2: value is a single Uint8Array (very common)
if (isUint8Array(value)) {
yield [value];
ArrayPrototypePush(batch, value);
if (batch.length === FROM_BATCH_SIZE) {
yield batch;
batch = [];
}
continue;
}
// Slow path: normalize the value
const batch = [];
for (const chunk of normalizeSyncValue(value)) {
ArrayPrototypePush(batch, chunk);
}
if (batch.length > 0) {
yield batch;
batch = [];
}
let valueBatch = [];
for (const chunk of normalizeSyncValue(value)) {
ArrayPrototypePush(valueBatch, chunk);
if (valueBatch.length === FROM_BATCH_SIZE) {
yield valueBatch;
valueBatch = [];
}
}
if (valueBatch.length > 0) {
yield valueBatch;
}
}

if (batch.length > 0) {
yield batch;
}
}

Expand Down Expand Up @@ -329,36 +362,42 @@ async function* normalizeAsyncSource(source) {
return;
}

// Fall back to sync iteration - batch all sync values together
// Fall back to sync iteration - batch sync values together with a bound.
if (isSyncIterable(source)) {
const batch = [];
let batch = [];

for (const value of source) {
// Fast path 1: value is already a Uint8Array[] batch
if (isUint8ArrayBatch(value)) {
// Flush any accumulated batch first
if (batch.length > 0) {
yield ArrayPrototypeSlice(batch);
batch.length = 0;
}
if (value.length > 0) {
yield value;
yield batch;
batch = [];
}
yield* yieldBoundedBatch(value);
continue;
}
// Fast path 2: value is a single Uint8Array (very common)
if (isUint8Array(value)) {
ArrayPrototypePush(batch, value);
if (batch.length === FROM_BATCH_SIZE) {
yield batch;
batch = [];
}
continue;
}
// Slow path: normalize the value - must flush and yield individually
if (batch.length > 0) {
yield ArrayPrototypeSlice(batch);
batch.length = 0;
yield batch;
batch = [];
}
const asyncBatch = [];
let asyncBatch = [];
for await (const chunk of normalizeAsyncValue(value)) {
ArrayPrototypePush(asyncBatch, chunk);
if (asyncBatch.length === FROM_BATCH_SIZE) {
yield asyncBatch;
asyncBatch = [];
}
}
if (asyncBatch.length > 0) {
yield asyncBatch;
Expand Down
34 changes: 34 additions & 0 deletions test/parallel/test-stream-iter-from-coverage.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,22 @@ async function testFromSyncSubBatching() {
assert.strictEqual(totalChunks, 200);
}

// fromSync: generic sync iterables of Uint8Array use bounded batches
async function testFromSyncIterableSubBatching() {
function* gen() {
for (let i = 0; i < 200; i++) {
yield new Uint8Array([i & 0xFF]);
}
}
const batches = [];
for (const batch of fromSync(gen())) {
batches.push(batch);
}
assert.strictEqual(batches.length, 2);
assert.strictEqual(batches[0].length, 128);
assert.strictEqual(batches[1].length, 72);
}

// from: Uint8Array[] with > 128 elements triggers sub-batching (async)
async function testFromAsyncSubBatching() {
const bigBatch = Array.from({ length: 200 },
Expand All @@ -44,6 +60,22 @@ async function testFromAsyncSubBatching() {
assert.strictEqual(batches[1].length, 72);
}

// from: sync iterables use bounded batches instead of one unbounded batch
async function testFromAsyncSyncIterableSubBatching() {
function* gen() {
for (let i = 0; i < 200; i++) {
yield new Uint8Array([i & 0xFF]);
}
}
const batches = [];
for await (const batch of from(gen())) {
batches.push(batch);
}
assert.strictEqual(batches.length, 2);
assert.strictEqual(batches[0].length, 128);
assert.strictEqual(batches[1].length, 72);
}

// Exact boundary: 128 elements → single batch (no split)
async function testFromSubBatchingBoundary() {
const exactBatch = Array.from({ length: 128 },
Expand Down Expand Up @@ -133,7 +165,9 @@ async function testFromSyncInvalidYield() {

Promise.all([
testFromSyncSubBatching(),
testFromSyncIterableSubBatching(),
testFromAsyncSubBatching(),
testFromAsyncSyncIterableSubBatching(),
testFromSubBatchingBoundary(),
testFromSubBatchingBoundaryPlus1(),
testFromSyncDataViewInGenerator(),
Expand Down
5 changes: 3 additions & 2 deletions test/parallel/test-stream-iter-from-sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ function testFromSyncGenerator() {
for (const batch of readable) {
batches.push(batch);
}
assert.strictEqual(batches.length, 2);
assert.strictEqual(batches.length, 1);
assert.strictEqual(batches[0].length, 2);
assert.deepStrictEqual(batches[0][0], new Uint8Array([1, 2]));
assert.deepStrictEqual(batches[1][0], new Uint8Array([3, 4]));
assert.deepStrictEqual(batches[0][1], new Uint8Array([3, 4]));
}

function testFromSyncNestedIterables() {
Expand Down
22 changes: 22 additions & 0 deletions test/parallel/test-stream-iter-pipeto-writev.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,27 @@ async function testPipeToSyncWritev() {
assert.ok(batches.some((b) => b.length > 1));
}

// pipeToSync batches plain Uint8Array chunks for writevSync
async function testPipeToSyncPlainChunksWritev() {
const batches = [];
const writes = [];
const writer = {
writevSync(chunks) { batches.push(chunks); },
writeSync(chunk) { writes.push(chunk); return true; },
endSync() { return 0; },
};
function* source() {
yield new Uint8Array([1]);
yield new Uint8Array([2]);
yield new Uint8Array([3]);
}
const total = pipeToSync(source(), writer);
assert.strictEqual(total, 3);
assert.strictEqual(batches.length, 1);
assert.strictEqual(batches[0].length, 3);
assert.strictEqual(writes.length, 0);
}

// pipeToSync with writer that has write() and writeSync() — writeSync preferred
async function testPipeToSyncWriteFallback() {
const syncWrites = [];
Expand All @@ -143,5 +164,6 @@ Promise.all([
testWriteSyncFailsMidBatch(),
testWriteSyncAlwaysFails(),
testPipeToSyncWritev(),
testPipeToSyncPlainChunksWritev(),
testPipeToSyncWriteFallback(),
]).then(common.mustCall());
Loading