Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
stream: make stream.Readable implement the toAsyncStreamable protocol
Signed-off-by: James M Snell <jasnell@gmail.com>
Assisted-by: Opencode/Opus 4.6
  • Loading branch information
jasnell committed Apr 3, 2026
commit 0becf762fcd366df3671fb27f7904a47b2656130
7 changes: 7 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -2882,6 +2882,13 @@ An attempt was made to call [`stream.pipe()`][] on a [`Writable`][] stream.
A stream method was called that cannot complete because the stream was
destroyed using `stream.destroy()`.

<a id="ERR_STREAM_ITER_MISSING_FLAG"></a>

### `ERR_STREAM_ITER_MISSING_FLAG`

A stream/iter API was used without the `--experimental-stream-iter` CLI flag
enabled.

<a id="ERR_STREAM_NULL_VALUES"></a>

### `ERR_STREAM_NULL_VALUES`
Expand Down
2 changes: 2 additions & 0 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -1770,6 +1770,8 @@ E('ERR_STREAM_ALREADY_FINISHED',
Error);
E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error);
E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error);
E('ERR_STREAM_ITER_MISSING_FLAG',
'The stream/iter API requires the --experimental-stream-iter flag', TypeError);
E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError);
E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error);
E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error);
Expand Down
21 changes: 19 additions & 2 deletions lib/internal/streams/iter/from.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const {
} = require('internal/util/types');

const {
kTrustedSource,
toStreamable,
toAsyncStreamable,
} = require('internal/streams/iter/types');
Expand Down Expand Up @@ -483,6 +484,11 @@ function from(input) {
throw new ERR_INVALID_ARG_TYPE('input', 'a non-null value', input);
}

// Fast path: trusted source already yields valid Uint8Array[] batches
if (input[kTrustedSource]) {
return input;
}

// Check for primitives first (ByteInput)
if (isPrimitiveChunk(input)) {
const chunk = primitiveToUint8Array(input);
Expand Down Expand Up @@ -531,11 +537,22 @@ function from(input) {
// Check toAsyncStreamable protocol (takes precedence over toStreamable and
// iteration protocols)
if (typeof input[toAsyncStreamable] === 'function') {
const result = input[toAsyncStreamable]();
// Synchronous trusted source (e.g. Readable batched iterator)
if (result?.[kTrustedSource]) {
return result;
}
return {
__proto__: null,
async *[SymbolAsyncIterator]() {
const result = await input[toAsyncStreamable]();
yield* from(result)[SymbolAsyncIterator]();
// The result may be a Promise. Check trusted on both the Promise
// itself (if tagged) and the resolved value.
const resolved = await result;
if (resolved?.[kTrustedSource]) {
yield* resolved[SymbolAsyncIterator]();
return;
}
yield* from(resolved)[SymbolAsyncIterator]();
},
};
}
Expand Down
10 changes: 10 additions & 0 deletions lib/internal/streams/iter/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,19 @@ const drainableProtocol = SymbolFor('Stream.drainableProtocol');
*/
const kTrustedTransform = Symbol('kTrustedTransform');

/**
* Internal sentinel for trusted sources. An async iterable with
* [kTrustedSource] = true signals that it already yields valid
* Uint8Array[] batches - no normalizeAsyncSource wrapper needed.
* from() will return such sources directly, skipping all normalization.
* This is NOT a public protocol symbol - it uses Symbol() not Symbol.for().
*/
const kTrustedSource = Symbol('kTrustedSource');
Comment thread
jasnell marked this conversation as resolved.
Outdated

module.exports = {
broadcastProtocol,
drainableProtocol,
kTrustedSource,
kTrustedTransform,
shareProtocol,
shareSyncProtocol,
Expand Down
145 changes: 145 additions & 0 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const {
Symbol,
SymbolAsyncDispose,
SymbolAsyncIterator,
SymbolFor,
SymbolSpecies,
TypedArrayPrototypeSet,
} = primordials;
Expand All @@ -52,6 +53,8 @@ const {
} = require('internal/streams/add-abort-signal');
const { eos } = require('internal/streams/end-of-stream');

const { getOptionValue } = require('internal/options');

let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
debug = fn;
});
Expand Down Expand Up @@ -82,6 +85,7 @@ const {
ERR_INVALID_ARG_TYPE,
ERR_METHOD_NOT_IMPLEMENTED,
ERR_OUT_OF_RANGE,
ERR_STREAM_ITER_MISSING_FLAG,
ERR_STREAM_PUSH_AFTER_EOF,
ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
ERR_UNKNOWN_ENCODING,
Expand Down Expand Up @@ -1796,3 +1800,144 @@ Readable.wrap = function(src, options) {
},
}).wrap(src);
};

// Efficient interop with the stream/iter API via toAsyncStreamable protocol.
// Provides a batched async iterator that drains the internal buffer into
// Uint8Array[] batches, avoiding the per-chunk Promise overhead of the
// standard Symbol.asyncIterator path.
//
// The flag cannot be checked at module load time (readable.js loads during
// bootstrap before options are available). Instead, toAsyncStreamable is
// always defined but lazily initializes on first call - throwing if the
// flag is not set, or installing the real implementation if it is.
{
const toAsyncStreamable = SymbolFor('Stream.toAsyncStreamable');
let kTrustedSource;
let normalizeAsyncValue;
let isU8;

// Maximum chunks to drain into a single batch. Bounds peak memory when
// _read() synchronously pushes many chunks into the buffer.
const MAX_DRAIN_BATCH = 128;

function lazyInit() {
if (kTrustedSource !== undefined) return;
if (!getOptionValue('--experimental-stream-iter')) {
throw new ERR_STREAM_ITER_MISSING_FLAG();
}
({ kTrustedSource } = require('internal/streams/iter/types'));
({ normalizeAsyncValue } = require('internal/streams/iter/from'));
({ isUint8Array: isU8 } = require('internal/util/types'));
}

// Normalize a batch of raw chunks from an object-mode or encoded
// Readable into Uint8Array values. Returns the normalized batch,
// or null if normalization produced no output.
async function normalizeBatch(raw) {
const batch = [];
for (let i = 0; i < raw.length; i++) {
const value = raw[i];
if (isU8(value)) {
batch.push(value);
} else {
// normalizeAsyncValue may await for async protocols (e.g.
// toAsyncStreamable on yielded objects). Stream events during
// the suspension are queued, not lost - errors will surface
// on the next loop iteration after this yield completes.
for await (const normalized of normalizeAsyncValue(value)) {
batch.push(normalized);
}
}
}
return batch.length > 0 ? batch : null;
}

// Batched async iterator for Readable streams. Same mechanism as
// createAsyncIterator (same event setup, same stream.read() to
// trigger _read(), same teardown) but drains all currently buffered
// chunks into a single Uint8Array[] batch per yield, amortizing the
// Promise/microtask cost across multiple chunks.
//
// When normalize is provided (object-mode / encoded streams), each
// drained batch is passed through it to convert chunks to Uint8Array.
// When normalize is null (byte-mode), chunks are already Buffers
// (Uint8Array subclass) and are yielded directly.
async function* createBatchedAsyncIterator(stream, normalize) {
let callback = nop;

function next(resolve) {
if (this === stream) {
callback();
callback = nop;
} else {
callback = resolve;
}
}

stream.on('readable', next);

let error;
const cleanup = eos(stream, { writable: false }, (err) => {
error = err ? aggregateTwoErrors(error, err) : null;
callback();
callback = nop;
});

try {
while (true) {
const chunk = stream.destroyed ? null : stream.read();
if (chunk !== null) {
// Drain any additional already-buffered chunks into the same
// batch. The first read() may trigger _read() which
// synchronously pushes more data into the buffer. We drain
// that buffered data without issuing unbounded _read() calls -
// once state.length hits 0 or MAX_DRAIN_BATCH is reached, we
// stop and yield what we have.
const batch = [chunk];
while (batch.length < MAX_DRAIN_BATCH &&
stream._readableState.length > 0) {
const c = stream.read();
if (c === null) break;
batch.push(c);
}
if (normalize !== null) {
const result = await normalize(batch);
if (result !== null) {
yield result;
}
} else {
yield batch;
}
} else if (error) {
throw error;
} else if (error === null) {
return;
} else {
await new Promise(next);
}
}
} catch (err) {
error = aggregateTwoErrors(error, err);
throw error;
} finally {
if (error === undefined || stream._readableState.autoDestroy) {
destroyImpl.destroyer(stream, null);
} else {
stream.off('readable', next);
cleanup();
}
}
}

Readable.prototype[toAsyncStreamable] = function() {
lazyInit();
const state = this._readableState;
const normalize = (state.objectMode || state.encoding) ?
normalizeBatch :
null;
const iter = createBatchedAsyncIterator(this, normalize);
iter[kTrustedSource] = true;
iter.stream = this;
return iter;
};
}
44 changes: 44 additions & 0 deletions test/parallel/test-stream-iter-readable-interop-disabled.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
'use strict';

// Tests that toAsyncStreamable throws ERR_STREAM_ITER_MISSING_FLAG
// when --experimental-stream-iter is not enabled.

const common = require('../common');
const assert = require('assert');
const { spawnPromisified } = common;

async function testToAsyncStreamableWithoutFlag() {
const { stderr, code } = await spawnPromisified(process.execPath, [
'-e',
`
const { Readable } = require('stream');
const r = new Readable({ read() {} });
r[Symbol.for('Stream.toAsyncStreamable')]();
`,
]);
assert.notStrictEqual(code, 0);
assert.match(stderr, /ERR_STREAM_ITER_MISSING_FLAG/);
}

async function testToAsyncStreamableWithFlag() {
const { code } = await spawnPromisified(process.execPath, [
'--experimental-stream-iter',
'-e',
`
const { Readable } = require('stream');
const r = new Readable({
read() { this.push(Buffer.from('ok')); this.push(null); }
});
const sym = Symbol.for('Stream.toAsyncStreamable');
const iter = r[sym]();
// Should not throw, and should have stream property
if (!iter.stream) process.exit(1);
`,
]);
assert.strictEqual(code, 0);
}

Promise.all([
testToAsyncStreamableWithoutFlag(),
testToAsyncStreamableWithFlag(),
]).then(common.mustCall());
Loading