-
-
Notifications
You must be signed in to change notification settings - Fork 35.4k
stream: add stream.pipeline and stream.onEnd #19828
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,96 @@ | ||
| // Ported from https://github.com/mafintosh/end-of-stream with | ||
| // permission from the author, Mathias Buus (@mafintosh). | ||
|
|
||
| 'use strict'; | ||
|
|
||
| const { | ||
| ERR_STREAM_PREMATURE_CLOSE | ||
| } = require('internal/errors').codes; | ||
|
|
||
| function noop() {} | ||
|
|
||
| function isRequest(stream) { | ||
| return stream.setHeader && typeof stream.abort === 'function'; | ||
| } | ||
|
|
||
| function once(callback) { | ||
| let called = false; | ||
| return function(err) { | ||
| if (called) return; | ||
| called = true; | ||
| callback.call(this, err); | ||
| }; | ||
| } | ||
|
|
||
| function eos(stream, opts, callback) { | ||
| if (typeof opts === 'function') return eos(stream, null, opts); | ||
| if (!opts) opts = {}; | ||
|
|
||
| callback = once(callback || noop); | ||
|
|
||
| const ws = stream._writableState; | ||
| const rs = stream._readableState; | ||
| let readable = opts.readable || (opts.readable !== false && stream.readable); | ||
| let writable = opts.writable || (opts.writable !== false && stream.writable); | ||
|
|
||
| const onlegacyfinish = () => { | ||
| if (!stream.writable) onfinish(); | ||
| }; | ||
|
|
||
| const onfinish = () => { | ||
| writable = false; | ||
| if (!readable) callback.call(stream); | ||
| }; | ||
|
|
||
| const onend = () => { | ||
| readable = false; | ||
| if (!writable) callback.call(stream); | ||
| }; | ||
|
|
||
| const onerror = (err) => { | ||
| callback.call(stream, err); | ||
| }; | ||
|
|
||
| const onclose = () => { | ||
| if (readable && !(rs && rs.ended)) { | ||
| return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); | ||
| } | ||
| if (writable && !(ws && ws.ended)) { | ||
| return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); | ||
| } | ||
| }; | ||
|
|
||
| const onrequest = () => { | ||
| stream.req.on('finish', onfinish); | ||
| }; | ||
|
|
||
| if (isRequest(stream)) { | ||
| stream.on('complete', onfinish); | ||
| stream.on('abort', onclose); | ||
| if (stream.req) onrequest(); | ||
| else stream.on('request', onrequest); | ||
| } else if (writable && !ws) { // legacy streams | ||
| stream.on('end', onlegacyfinish); | ||
| stream.on('close', onlegacyfinish); | ||
| } | ||
|
|
||
| stream.on('end', onend); | ||
| stream.on('finish', onfinish); | ||
| if (opts.error !== false) stream.on('error', onerror); | ||
| stream.on('close', onclose); | ||
|
|
||
| return function() { | ||
| stream.removeListener('complete', onfinish); | ||
| stream.removeListener('abort', onclose); | ||
| stream.removeListener('request', onrequest); | ||
| if (stream.req) stream.req.removeListener('finish', onfinish); | ||
| stream.removeListener('end', onlegacyfinish); | ||
| stream.removeListener('close', onlegacyfinish); | ||
| stream.removeListener('finish', onfinish); | ||
| stream.removeListener('end', onend); | ||
| stream.removeListener('error', onerror); | ||
| stream.removeListener('close', onclose); | ||
| }; | ||
| } | ||
|
|
||
| module.exports = eos; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,95 @@ | ||
| // Ported from https://github.com/mafintosh/pump with | ||
| // permission from the author, Mathias Buus (@mafintosh). | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if the original code has a copyright/license, then it should likely either be included here or embedded in the LICENSE file.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It does, but I did mention here I'm fine waiving copyright, mafintosh/pump#17 (comment)
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (I'm fine with whatever is easiest btw) |
||
|
|
||
| 'use strict'; | ||
|
|
||
| const eos = require('internal/streams/end-of-stream'); | ||
|
|
||
| const { | ||
| ERR_MISSING_ARGS, | ||
| ERR_STREAM_DESTROYED | ||
| } = require('internal/errors').codes; | ||
|
|
||
| function once(callback) { | ||
| let called = false; | ||
| return function(err) { | ||
| if (called) return; | ||
| called = true; | ||
| callback(err); | ||
| }; | ||
| } | ||
|
|
||
| function noop() {} | ||
|
|
||
| function isRequest(stream) { | ||
| return stream.setHeader && typeof stream.abort === 'function'; | ||
| } | ||
|
|
||
| function destroyer(stream, reading, writing, callback) { | ||
| callback = once(callback); | ||
|
|
||
| let closed = false; | ||
| stream.on('close', () => { | ||
| closed = true; | ||
| }); | ||
|
|
||
| eos(stream, { readable: reading, writable: writing }, (err) => { | ||
| if (err) return callback(err); | ||
| closed = true; | ||
| callback(); | ||
| }); | ||
|
|
||
| let destroyed = false; | ||
| return (err) => { | ||
| if (closed) return; | ||
| if (destroyed) return; | ||
| destroyed = true; | ||
|
|
||
| // request.destroy just do .end - .abort is what we want | ||
| if (isRequest(stream)) return stream.abort(); | ||
| if (typeof stream.destroy === 'function') return stream.destroy(); | ||
|
|
||
| callback(err || new ERR_STREAM_DESTROYED('pipe')); | ||
| }; | ||
| } | ||
|
|
||
| function call(fn) { | ||
| fn(); | ||
| } | ||
|
|
||
| function pipe(from, to) { | ||
| return from.pipe(to); | ||
| } | ||
|
|
||
| function popCallback(streams) { | ||
| if (!streams.length) return noop; | ||
| if (typeof streams[streams.length - 1] !== 'function') return noop; | ||
| return streams.pop(); | ||
| } | ||
|
|
||
| function pipeline(...streams) { | ||
| const callback = popCallback(streams); | ||
|
|
||
| if (Array.isArray(streams[0])) streams = streams[0]; | ||
|
|
||
| if (streams.length < 2) { | ||
| throw new ERR_MISSING_ARGS('streams'); | ||
| } | ||
|
|
||
| let error; | ||
| const destroys = streams.map(function(stream, i) { | ||
| const reading = i < streams.length - 1; | ||
| const writing = i > 0; | ||
| return destroyer(stream, reading, writing, function(err) { | ||
| if (!error) error = err; | ||
| if (err) destroys.forEach(call); | ||
| if (reading) return; | ||
| destroys.forEach(call); | ||
| callback(error); | ||
| }); | ||
| }); | ||
|
|
||
| return streams.reduce(pipe); | ||
| } | ||
|
|
||
| module.exports = pipeline; | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
`stream.onEnd` and `stream.pipeline`->`stream.onEnd()` and `stream.pipeline()`?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be
stream.finished()and notstream.onEnd(), since the former is what is currently used in this PR.