Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion benchmark/streams/iter-throughput-pipeto.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const common = require('../common.js');
const { Readable, Writable, pipeline } = require('stream');

const bench = common.createBenchmark(main, {
api: ['classic', 'webstream', 'iter', 'iter-sync'],
api: ['classic', 'webstream', 'iter', 'iter-sync-source', 'iter-sync'],
datasize: [1024 * 1024, 16 * 1024 * 1024, 64 * 1024 * 1024],
n: [5],
}, {
Expand All @@ -26,6 +26,8 @@ function main({ api, datasize, n }) {
return benchWebStream(chunk, datasize, n, totalOps);
case 'iter':
return benchIter(chunk, datasize, n, totalOps);
case 'iter-sync-source':
return benchIterSyncSource(chunk, datasize, n, totalOps);
case 'iter-sync':
return benchIterSync(chunk, datasize, n, totalOps);
}
Expand Down Expand Up @@ -101,6 +103,29 @@ function benchIter(chunk, datasize, n, totalOps) {
})();
}

function benchIterSyncSource(chunk, datasize, n, totalOps) {
const { pipeTo } = require('stream/iter');

async function run() {
let remaining = datasize;
function* source() {
while (remaining > 0) {
const size = Math.min(remaining, chunk.length);
remaining -= size;
yield size === chunk.length ? chunk : chunk.subarray(0, size);
}
}
const writer = { write() {}, writeSync() { return true; } };
await pipeTo(source(), writer);
}

(async () => {
bench.start();
for (let i = 0; i < n; i++) await run();
bench.end(totalOps);
})();
}

function benchIterSync(chunk, datasize, n, totalOps) {
const { pipeToSync } = require('stream/iter');

Expand Down
58 changes: 53 additions & 5 deletions lib/internal/streams/iter/pull.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

const {
ArrayBufferIsView,
ArrayFromAsync,
ArrayIsArray,
ArrayPrototypePush,
ArrayPrototypeSlice,
PromisePrototypeThen,
Expand Down Expand Up @@ -38,7 +40,9 @@ const {
fromSync,
isSyncIterable,
isAsyncIterable,
isPrimitiveChunk,
isUint8ArrayBatch,
normalizeAsyncValue,
} = require('internal/streams/iter/from');

const {
Expand All @@ -51,7 +55,10 @@ const {
} = require('internal/streams/iter/utils');

const {
kValidatedSource,
kValidatedTransform,
toAsyncStreamable,
toStreamable,
} = require('internal/streams/iter/types');

// =============================================================================
Expand Down Expand Up @@ -114,6 +121,22 @@ function parsePipeToArgs(args, requiredMethod) {
};
}

function canUseSyncIterablePipeToFastPath(source, transforms, signal) {
if (signal !== undefined ||
transforms.length !== 0 ||
isPrimitiveChunk(source) ||
ArrayIsArray(source) ||
source?.[kValidatedSource] ||
!isSyncIterable(source) ||
isAsyncIterable(source)) {
return false;
}

// Preserve from()'s top-level protocol precedence for custom iterables.
return typeof source[toAsyncStreamable] !== 'function' &&
typeof source[toStreamable] !== 'function';
}

// =============================================================================
// Transform Output Flattening
// =============================================================================
Expand Down Expand Up @@ -820,12 +843,13 @@ async function pipeTo(source, ...args) {
// Check for abort
signal?.throwIfAborted();

// Normalize source via from()
const normalized = from(source);
const hasWriteSync = typeof writer.writeSync === 'function';
const useSyncIterableFastPath =
hasWriteSync && canUseSyncIterablePipeToFastPath(source, transforms, signal);
const normalized = useSyncIterableFastPath ? undefined : from(source);

let totalBytes = 0;
const hasWritev = typeof writer.writev === 'function';
const hasWriteSync = typeof writer.writeSync === 'function';
const hasWritevSync = typeof writer.writevSync === 'function';
const hasEndSync = typeof writer.endSync === 'function';
// Async fallback for writeBatch when sync write fails partway through.
Expand Down Expand Up @@ -876,8 +900,32 @@ async function pipeTo(source, ...args) {
}

try {
// Fast path: no transforms - iterate normalized source directly
if (transforms.length === 0) {
if (useSyncIterableFastPath) {
// Avoid from()'s async sync-iterable batching path. This keeps writes
// incremental for synchronous sources while preserving async
// normalization for non-primitive yielded values.
for (const value of source) {
if (isUint8ArrayBatch(value)) {
if (value.length > 0) {
const p = writeBatch(value);
if (p) await p;
}
continue;
}
if (isUint8Array(value)) {
const p = writeBatch([value]);
if (p) await p;
continue;
}

const batch = await ArrayFromAsync(normalizeAsyncValue(value));
if (batch.length > 0) {
const p = writeBatch(batch);
if (p) await p;
}
}
} else if (transforms.length === 0) {
// Fast path: no transforms - iterate normalized source directly
if (signal) {
for await (const batch of normalized) {
signal.throwIfAborted();
Expand Down
76 changes: 76 additions & 0 deletions test/parallel/test-stream-iter-pipeto.js
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,79 @@ async function testPipeToSyncMinimalWriter() {
assert.strictEqual(chunks.length > 0, true);
}

async function testPipeToSyncIterableFastPathWritesIncrementally() {
let pulled = 0;
let firstWritePulled = 0;
const chunks = [];
function* source() {
for (let i = 0; i < 3; i++) {
pulled++;
yield new Uint8Array([0x61 + i]);
}
}
const writer = {
write: common.mustNotCall(),
writeSync(chunk) {
if (firstWritePulled === 0) {
firstWritePulled = pulled;
}
chunks.push(chunk);
return true;
},
};

const totalBytes = await pipeTo(source(), writer);
assert.strictEqual(totalBytes, 3);
assert.strictEqual(firstWritePulled, 1);
assert.deepStrictEqual(chunks, [
new Uint8Array([0x61]),
new Uint8Array([0x62]),
new Uint8Array([0x63]),
]);
}

async function testPipeToSyncIterableFastPathWriteFallback() {
const asyncWrites = [];
const writer = {
writeSync(chunk) {
return chunk[0] !== 0x62;
},
async write(chunk) {
asyncWrites.push(chunk);
},
};
function* source() {
yield new Uint8Array([0x61]);
yield new Uint8Array([0x62]);
yield new Uint8Array([0x63]);
}

const totalBytes = await pipeTo(source(), writer);
assert.strictEqual(totalBytes, 3);
assert.deepStrictEqual(asyncWrites, [new Uint8Array([0x62])]);
}

async function testPipeToSyncIterableFastPathAsyncValue() {
const chunks = [];
const writer = {
write: common.mustNotCall(),
writeSync(chunk) {
chunks.push(chunk);
return true;
},
};
function* source() {
yield Promise.resolve('a');
yield new Uint8Array([0x62]);
}

const totalBytes = await pipeTo(source(), writer);
assert.strictEqual(totalBytes, 2);
const result = new TextDecoder().decode(
new Uint8Array(chunks.reduce((acc, c) => [...acc, ...c], [])));
assert.strictEqual(result, 'ab');
}

Promise.all([
testPipeToSync(),
testPipeTo(),
Expand All @@ -234,4 +307,7 @@ Promise.all([
testPipeToSyncPreventClose(),
testPipeToMinimalWriter(),
testPipeToSyncMinimalWriter(),
testPipeToSyncIterableFastPathWritesIncrementally(),
testPipeToSyncIterableFastPathWriteFallback(),
testPipeToSyncIterableFastPathAsyncValue(),
]).then(common.mustCall());
Loading