-
-
Notifications
You must be signed in to change notification settings - Fork 35.4k
Stream pump #13506
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stream pump #13506
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,45 +1,20 @@ | ||
| // The MIT License (MIT) | ||
| // | ||
| // Copyright (c) 2014 Mathias Buus | ||
| // | ||
| // Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| // of this software and associated documentation files (the "Software"), to deal | ||
| // in the Software without restriction, including without limitation the rights | ||
| // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| // copies of the Software, and to permit persons to whom the Software is | ||
| // furnished to do so, subject to the following conditions: | ||
| // | ||
| // The above copyright notice and this permission notice shall be included in | ||
| // all copies or substantial portions of the Software. | ||
| // | ||
| // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
| // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | ||
| // THE SOFTWARE. | ||
|
|
||
| 'use strict'; | ||
|
|
||
| function noop() {} | ||
|
|
||
| function isRequest(stream) { | ||
| return stream.setHeader && isFn(stream.abort); | ||
| return stream.setHeader && typeof stream.abort === 'function'; | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we need a test that exercises http request and fs. I think you can inline the |
||
|
|
||
| function isChildProcess(stream) { | ||
| return stream.stdio && | ||
| Array.isArray(stream.stdio) && stream.stdio.length === 3; | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we need a test for child processes as well. |
||
|
|
||
| function isFn(fn) { | ||
| return typeof fn === 'function'; | ||
| } | ||
|
|
||
| function eos(stream, opts, _callback) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we might also expose this, as it is another must-have. |
||
| if (typeof opts === 'function') return eos(stream, null, opts); | ||
| if (!opts) opts = {}; | ||
|
|
||
| let callbackCalled = false; | ||
| const callback = (err) => { | ||
| if (!_callback || callbackCalled) { | ||
|
|
@@ -48,12 +23,11 @@ function eos(stream, opts, _callback) { | |
| callbackCalled = true; | ||
| _callback.call(stream, err); | ||
| }; | ||
|
|
||
| const ws = stream._writableState; | ||
| const rs = stream._readableState; | ||
| let readable = opts.readable || | ||
| (opts.readable !== false && stream.readable); | ||
| let writable = opts.writable || | ||
| (opts.writable !== false && stream.writable); | ||
| let readable = opts.readable || opts.readable !== false && stream.readable; | ||
| let writable = opts.writable || opts.writable !== false && stream.writable; | ||
|
|
||
| const onlegacyfinish = () => { | ||
| if (!stream.writable) onfinish(); | ||
|
|
@@ -71,18 +45,15 @@ function eos(stream, opts, _callback) { | |
|
|
||
| const onexit = (exitCode) => { | ||
| if (exitCode) { | ||
| callback(new Error('exited with error code: ' + exitCode)); | ||
| callback(new Error(`Exited with error code: ${exitCode}`)); | ||
| } else { | ||
| callback(); | ||
| callback(null); | ||
| } | ||
| }; | ||
|
|
||
| const onclose = () => { | ||
| if (readable && !(rs && rs.ended)) | ||
| return callback(new Error('premature close')); | ||
|
|
||
| if (writable && !(ws && ws.ended)) | ||
| return callback(new Error('premature close')); | ||
| if (readable && !(rs && rs.ended) || writable && !(ws && ws.ended)) | ||
| return callback(new Error('Premature close')); | ||
| }; | ||
|
|
||
| const onrequest = () => | ||
|
|
@@ -120,39 +91,35 @@ function eos(stream, opts, _callback) { | |
| }; | ||
| } | ||
|
|
||
| function destroyer(stream, reading, writing, _callback) { | ||
| function destroyer(stream, readable, writable, _callback) { | ||
| let callbackCalled = false; | ||
| const callback = (err) => { | ||
| if (callbackCalled) return; | ||
|
|
||
| callbackCalled = true; | ||
|
|
||
| return _callback(err); | ||
| }; | ||
| let closed = false; | ||
| stream.on('close', () => { | ||
| closed = true; | ||
| }); | ||
|
|
||
| eos(stream, {readable: reading, writable: writing}, (err) => { | ||
| eos(stream, { readable, writable }, (err) => { | ||
| if (err) return callback(err); | ||
| closed = true; | ||
| callback(); | ||
| }); | ||
|
|
||
| var destroyed = false; | ||
| let destroyed = false; | ||
| return (err) => { | ||
| if (closed) return; | ||
| if (destroyed) return; | ||
| if (closed || destroyed) return; | ||
| destroyed = true; | ||
|
|
||
| if (isRequest(stream)) | ||
| return stream.abort(); | ||
| // request.destroy just do .end - .abort is what we want | ||
|
|
||
| if (isFn(stream.destroy)) return stream.destroy(err); | ||
| if (typeof stream.destroy === 'function') return stream.destroy(err); | ||
|
|
||
| callback(err || new Error('stream was destroyed')); | ||
| callback(err || new Error('Stream was destroyed')); | ||
| }; | ||
| } | ||
|
|
||
|
|
@@ -161,27 +128,26 @@ const callErr = (err) => (fn) => fn(err); | |
| const pipe = (from, to) => from.pipe(to); | ||
|
|
||
| function pump(...streams) { | ||
| const callback = isFn(streams[streams.length - 1] || noop) && | ||
| streams.pop() || noop; | ||
| const callback = streams.pop() || noop; | ||
|
|
||
| if (Array.isArray(streams[0])) streams = streams[0]; | ||
|
|
||
| if (streams.length < 2) | ||
| throw new Error('pump requires two streams per minimum'); | ||
| throw new Error('Pump requires two streams per minimum.'); | ||
|
|
||
| let error; | ||
| let firstError; | ||
| const destroys = streams.map((stream, i) => { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would prefer not to close over |
||
| var reading = i < streams.length - 1; | ||
| var writing = i > 0; | ||
| return destroyer(stream, reading, writing, (err) => { | ||
| if (!error) error = err; | ||
| if (!firstError) firstError = err; | ||
|
|
||
| if (err) destroys.forEach(callErr(err)); | ||
|
|
||
| if (reading) return; | ||
|
|
||
| destroys.forEach(call); | ||
| callback(error); | ||
| callback(firstError); | ||
| }); | ||
| }); | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why was the license removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See: #13506 (comment)