Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -2807,7 +2807,7 @@ added:

Returns whether the stream is readable.

### `stream.Readable.toWeb(streamReadable)`
### `stream.Readable.toWeb(streamReadable[, options])`

<!-- YAML
added: v17.0.0
Expand All @@ -2816,6 +2816,10 @@ added: v17.0.0
> Stability: 1 - Experimental

* `streamReadable` {stream.Readable}
* `options` {Object}
* `strategy` {Object}
* `highWaterMark` {number}
* `size` {Function}
* Returns: {ReadableStream}

### `stream.Writable.fromWeb(writableStream[, options])`
Expand Down
6 changes: 4 additions & 2 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -1405,8 +1405,10 @@ Readable.fromWeb = function(readableStream, options) {
options);
};

Readable.toWeb = function(streamReadable) {
return lazyWebStreams().newReadableStreamFromStreamReadable(streamReadable);
Readable.toWeb = function(streamReadable, options) {
return lazyWebStreams().newReadableStreamFromStreamReadable(
streamReadable,
options);
};

Readable.wrap = function(src, options) {
Expand Down
13 changes: 11 additions & 2 deletions lib/internal/webstreams/adapters.js
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,14 @@ function newStreamWritableFromWritableStream(writableStream, options = kEmptyObj
}

/**
* @typedef {import('./queuingstrategies').QueuingStrategy} QueuingStrategy
* @param {Readable} streamReadable
* @param {{
* strategy : QueuingStrategy
* }} [options]
* @returns {ReadableStream}
*/
function newReadableStreamFromStreamReadable(streamReadable) {
function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObject) {
// Not using the internal/streams/utils isReadableNodeStream utility
// here because it will return false if streamReadable is a Duplex
// whose readable option is false. For a Duplex that is not readable,
Expand All @@ -386,7 +390,12 @@ function newReadableStreamFromStreamReadable(streamReadable) {
// back to a minimal strategy that just specifies the highWaterMark
// and no size algorithm. Using a ByteLengthQueuingStrategy here
// is unnecessary.
const strategy =

// If there is a strategy available, use it. Otherwise, fallback to default behavior
const queuingStrategy = options?.strategy;

const strategy = queuingStrategy ?
Comment thread
Warkanlock marked this conversation as resolved.
Outdated
queuingStrategy :
objectMode ?
new CountQueuingStrategy({ highWaterMark }) :
{ highWaterMark };
Expand Down
48 changes: 48 additions & 0 deletions test/parallel/test-stream-readable-strategy-option.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
'use strict';
const common = require('../common');
const { Readable } = require('stream');
const assert = require('assert');

{
// Strategy 2
const streamData = ['a', 'b', 'c', null];

// Fulfill a Readable object
const readable = new Readable({
read: common.mustCall(() => {
process.nextTick(() => {
readable.push(streamData.shift());
});
}, streamData.length),
});

// Use helper to convert it to a Web ReadableStream using ByteLength strategy
const readableStream = Readable.toWeb(readable, {
strategy: new ByteLengthQueuingStrategy({ highWaterMark: 1 }),
});

assert(!readableStream.locked);
readableStream.getReader().read().then(common.mustCall());
}

{
// Strategy 2
const streamData = ['a', 'b', 'c', null];

// Fulfill a Readable object
const readable = new Readable({
read: common.mustCall(() => {
process.nextTick(() => {
readable.push(streamData.shift());
});
}, streamData.length),
});

// Use helper to convert it to a Web ReadableStream using Count strategy
const readableStream = Readable.toWeb(readable, {
strategy: new CountQueuingStrategy({ highWaterMark: 1 }),
});

assert(!readableStream.locked);
readableStream.getReader().read().then(common.mustCall());
}