Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
fixup: comments and minor cleanup
  • Loading branch information
BridgeAR committed Feb 17, 2018
commit 556e7c7de6fd5e9012f018574cd3cd3a4e04d4f8
10 changes: 7 additions & 3 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ There are four fundamental stream types within Node.js:
* [Transform][] - Duplex streams that can modify or transform the data as it
is written and read (for example [`zlib.createDeflate()`][]).

Additionally this module inclues a utility function [pump][].
Additionally this module includes the utility function [pump][].

### Object Mode

Expand Down Expand Up @@ -1247,9 +1247,13 @@ implementors should not override this method, but instead implement
The default implementation of `_destroy` for `Transform` also emit `'close'`.

#### Class Method: stream.pump(...streams[, callback])
<!-- YAML
added: REPLACEME
-->

* two or more streams to pipe between
* optional callback
* `...streams` {Stream} Two or more streams to pipe between.
* `callback` {Function} A callback function that takes an optional error
argument.

A class method to pipe between streams forwarding errors and properly cleaning
up.
Expand Down
74 changes: 20 additions & 54 deletions lib/_stream_pump.js
Original file line number Diff line number Diff line change
@@ -1,45 +1,20 @@
// The MIT License (MIT)
//
// Copyright (c) 2014 Mathias Buus
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was the license removed?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'use strict';

function noop() {}

function isRequest(stream) {
return stream.setHeader && isFn(stream.abort);
return stream.setHeader && typeof stream.abort === 'function';
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need a test that exercises http request and fs.

I think you can inline the typeof fs === 'function' check.


function isChildProcess(stream) {
return stream.stdio &&
Array.isArray(stream.stdio) && stream.stdio.length === 3;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need a test for child processes as well.


function isFn(fn) {
return typeof fn === 'function';
}

function eos(stream, opts, _callback) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might also expose this, as it is another must-have.

if (typeof opts === 'function') return eos(stream, null, opts);
if (!opts) opts = {};

let callbackCalled = false;
const callback = (err) => {
if (!_callback || callbackCalled) {
Expand All @@ -48,12 +23,11 @@ function eos(stream, opts, _callback) {
callbackCalled = true;
_callback.call(stream, err);
};

const ws = stream._writableState;
const rs = stream._readableState;
let readable = opts.readable ||
(opts.readable !== false && stream.readable);
let writable = opts.writable ||
(opts.writable !== false && stream.writable);
let readable = opts.readable || opts.readable !== false && stream.readable;
let writable = opts.writable || opts.writable !== false && stream.writable;

const onlegacyfinish = () => {
if (!stream.writable) onfinish();
Expand All @@ -71,18 +45,15 @@ function eos(stream, opts, _callback) {

const onexit = (exitCode) => {
if (exitCode) {
callback(new Error('exited with error code: ' + exitCode));
callback(new Error(`Exited with error code: ${exitCode}`));
} else {
callback();
callback(null);
}
};

const onclose = () => {
if (readable && !(rs && rs.ended))
return callback(new Error('premature close'));

if (writable && !(ws && ws.ended))
return callback(new Error('premature close'));
if (readable && !(rs && rs.ended) || writable && !(ws && ws.ended))
return callback(new Error('Premature close'));
};

const onrequest = () =>
Expand Down Expand Up @@ -120,39 +91,35 @@ function eos(stream, opts, _callback) {
};
}

function destroyer(stream, reading, writing, _callback) {
function destroyer(stream, readable, writable, _callback) {
let callbackCalled = false;
const callback = (err) => {
if (callbackCalled) return;

callbackCalled = true;

return _callback(err);
};
let closed = false;
stream.on('close', () => {
closed = true;
});

eos(stream, {readable: reading, writable: writing}, (err) => {
eos(stream, { readable, writable }, (err) => {
if (err) return callback(err);
closed = true;
callback();
});

var destroyed = false;
let destroyed = false;
return (err) => {
if (closed) return;
if (destroyed) return;
if (closed || destroyed) return;
destroyed = true;

if (isRequest(stream))
return stream.abort();
// request.destroy just do .end - .abort is what we want

if (isFn(stream.destroy)) return stream.destroy(err);
if (typeof stream.destroy === 'function') return stream.destroy(err);

callback(err || new Error('stream was destroyed'));
callback(err || new Error('Stream was destroyed'));
};
}

Expand All @@ -161,27 +128,26 @@ const callErr = (err) => (fn) => fn(err);
const pipe = (from, to) => from.pipe(to);

function pump(...streams) {
const callback = isFn(streams[streams.length - 1] || noop) &&
streams.pop() || noop;
const callback = streams.pop() || noop;

if (Array.isArray(streams[0])) streams = streams[0];

if (streams.length < 2)
throw new Error('pump requires two streams per minimum');
throw new Error('Pump requires two streams per minimum.');

let error;
let firstError;
const destroys = streams.map((stream, i) => {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer not to close over this  here, as it might be the global Stream  object.

var reading = i < streams.length - 1;
var writing = i > 0;
return destroyer(stream, reading, writing, (err) => {
if (!error) error = err;
if (!firstError) firstError = err;

if (err) destroys.forEach(callErr(err));

if (reading) return;

destroys.forEach(call);
callback(error);
callback(firstError);
});
});

Expand Down