Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
streams: add whatwg stream interop
  • Loading branch information
devsnek committed Aug 20, 2018
commit e0117e7963aaf46d5a83056812ef72ff6c558b6a
62 changes: 62 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,23 @@ reader.pipe(writer);
reader.unpipe(writer);
```

##### writable.acquireStandardStream()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about calling these "...StandardStream"... Don't really have a better suggestion right now tho

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will definitely need more documentation about the new objects

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing to keep in mind is that the readable-streamsmodule will need to be able to implement support for this at some point. While that lags, these methods will not be implemented and will not be supported in an extremely large number of streams out there in the wild. It might be better to consider exposing these differently.

const { toWHATWGStream } = require('stream')
const myStream = toWHATWGStream(fs.createWriteStream())

This should allow it to work with any object that implements the right methods and events.

Copy link
Copy Markdown
Member Author

@devsnek devsnek Aug 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing to keep in mind is that the readable-streamsmodule will need to be able to implement support for this at some point.

i don't really understand this. a polyfill of whatwg streams shouldn't have a concept of node streams.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The readable-streams module provides an independently usable implementation of Node.js streams. This is adding new methods to the base class of require('stream').Readable and require('stream').Writable that will not be present in the implementation provided by the readable-streams module, nor will it be possible to implement those methods for every browser currently supported by readable-streams. Streams created by readable-streams today can be used within Node.js just as if they were the built-in streams.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point is this can be done in a way where this isn't even a problem by not extending the prototype.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure it can, but I'm trying to design a good API, not a easy-for-some-npm-module-to-not-worry-about API.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"good" is entirely subjective here. Others will likely have very different opinions of what is "good".

The "readable-streams" module is not just "some-npm-module", it's the most downloaded module in the npm ecosystem and is part of the Node.js project managed by the Streams WG. Given that, I'm very concerned about implementation decisions that directly impact it.

Yes, we're able to add features to Node.js core before they propagate down to the module, but we must do so carefully and deliberately, weighing each choice on the impact it may have.

Copy link
Copy Markdown
Member Author

@devsnek devsnek Aug 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hopeful we can come up with something that isn't importing separate functions. I want to have as little friction as possible for people wanting to use whatwg streams, as there are quite a lot of locations where the interop needs to happen. I'm willing to continue bikeshedding this for as long as we need to for something that works for everyone, as it would seem we both have constraints here.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some relevant comments here that are being buried a bit after rebasing... just linking them here so that they don't get lost in the conversation: 1ca4477#r211124305 and 1ca4477#r211131273

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* Returns: {WritableStream} to feed this stream.

```js
const fs = require('fs');

const stream = fs.createWriteStream('file').acquireStandardStream();
const writer = stream.getWriter();
writer.write('hi!');
```

##### writable.cork()
<!-- YAML
added: v0.11.2
Expand Down Expand Up @@ -827,6 +844,22 @@ If both `'readable'` and [`'data'`][] are used at the same time, `'readable'`
takes precedence in controlling the flow, i.e. `'data'` will be emitted
only when [`stream.read()`][stream-read] is called.

##### readable.acquireStandardStream()
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* Returns: {ReadableStream} to fully consume the stream.

```js
const fs = require('fs');

const stream = fs.createReadStream('file').acquireStandardStream();
const reader = stream.getReader();
```

##### readable.destroy([error])
<!-- YAML
added: v8.0.0
Expand Down Expand Up @@ -1266,6 +1299,26 @@ Examples of `Duplex` streams include:
* [zlib streams][zlib]
* [crypto streams][crypto]

##### duplex.acquireStandardStream
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* Returns {Object}
* `readable` {ReadableStream}
* `writable` {WritableStream}

Creates a WHATWG stream pair to represent this duplex stream.

```js
const stream = getDuplexSomehow();
const { readable, writable } = stream.acquireStandardStream();
readable.getReader();
writable.getWriter();
```

#### Class: stream.Transform
<!-- YAML
added: v0.9.4
Expand Down Expand Up @@ -2200,6 +2253,15 @@ by [`stream._transform()`][stream-_transform]. The `'end'` event is emitted
after all data has been output, which occurs after the callback in
[`transform._flush()`][stream-_flush] has been called.

##### transform.acquireStandardStream()
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* Returns: {TransformStream}

#### transform.\_flush(callback)

* `callback` {Function} A callback function (optionally with an error
Expand Down
8 changes: 8 additions & 0 deletions lib/_stream_duplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ module.exports = Duplex;
const util = require('util');
const Readable = require('_stream_readable');
const Writable = require('_stream_writable');
const { emitExperimentalWarning } = require('internal/util');

util.inherits(Duplex, Readable);

Expand Down Expand Up @@ -66,6 +67,13 @@ function Duplex(options) {
}
}

Duplex.prototype.acquireStandardStream = function() {
emitExperimentalWarning('Duplex.acquireStandardStream');
const readable = Readable.prototype.acquireStandardStream.call(this);
const writable = Writable.prototype.acquireStandardStream.call(this);
return { readable, writable };
};

Object.defineProperty(Duplex.prototype, 'writableHighWaterMark', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
Expand Down
23 changes: 23 additions & 0 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,29 @@ Readable.prototype[Symbol.asyncIterator] = function() {
return new ReadableAsyncIterator(this);
};

Readable.prototype.acquireStandardStream = function() {
emitExperimentalWarning('Readable.acquireStandardStream');
return new ReadableStream({
start: (controller) => {
this.pause();
this.on('data', (chunk) => {
controller.enqueue(chunk);
this.pause();
});
this.once('end', () => controller.close());
this.once('error', (e) => controller.error(e));
},
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcollina... is this the best way of doing this? Should this use readableinstead?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO we should use the 'readable'  event.

pull: () => {
this.resume();
},
cancel: () => {
this.destroy();
},
}, {
highWaterMark: this.readableHighWaterMark,
});
};

Object.defineProperty(Readable.prototype, 'readableHighWaterMark', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
Expand Down
27 changes: 27 additions & 0 deletions lib/_stream_transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const {
} = require('internal/errors').codes;
const Duplex = require('_stream_duplex');
const util = require('util');
const { emitExperimentalWarning } = require('internal/util');
util.inherits(Transform, Duplex);


Expand Down Expand Up @@ -202,6 +203,32 @@ Transform.prototype._destroy = function(err, cb) {
});
};

Transform.prototype.acquireStandardStream = function() {
emitExperimentalWarning('Transform.acquireStandardStream');
return new TransformStream({
start: (controller) => {
this.on('data', (chunk) => {
controller.enqueue(chunk);
});
this.once('end', () => controller.close());
this.once('error', (e) => controller.error(e));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listening to the error event with once  is not correct. 'error' could be emitted multiple times by descendants.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whatwg streams only error once

},
transform: (chunk) => {
return new Promise((resolve) => {
const underHighWaterMark = this.write(chunk);
if (!underHighWaterMark) {
this.once('drain', resolve);
} else {
resolve();
}
});
},
flush: () => {
this.end();
},
});
};


function done(stream, er, data) {
if (er)
Expand Down
27 changes: 27 additions & 0 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,33 @@ Writable.prototype.end = function(chunk, encoding, cb) {
return this;
};

Writable.prototype.acquireStandardStream = function() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are errors propagated here? There's nothing listening to the error event.

internalUtil.emitExperimentalWarning('Writable.acquireStandardStream');
return new WritableStream({
start: (controller) => {
this.once('error', (e) => controller.error(e));
},
write: (chunk) => {
return new Promise((resolve) => {
const underHighWaterMark = this.write(chunk);
if (!underHighWaterMark) {
this.once('drain', resolve);
} else {
resolve();
}
});
},
close: (controller) => {
this.end();
},
abort: (reason) => {
this.destroy(reason);
},
}, {
highWaterMark: this.writableHighWaterMark,
});
};

Object.defineProperty(Writable.prototype, 'writableLength', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
Expand Down
4 changes: 4 additions & 0 deletions tools/doc/type-parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ const customTypesMap = {

'readline.Interface': 'readline.html#readline_class_interface',

'ReadableStream': 'https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream',
'WritableStream': 'https://developer.mozilla.org/en-US/docs/Web/API/WritableStream',
'TransformStream': 'https://streams.spec.whatwg.org/#ts-class',

'Stream': 'stream.html#stream_stream',
'stream.Duplex': 'stream.html#stream_class_stream_duplex',
'stream.Readable': 'stream.html#stream_class_stream_readable',
Expand Down