Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
stream: add pipeline and onEnd
  • Loading branch information
mafintosh committed Apr 9, 2018
commit b636f8bfcdf1585d40bca5c58819ed16cbc79433
6 changes: 6 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -1423,6 +1423,12 @@ An attempt was made to call [`stream.pipe()`][] on a [`Writable`][] stream.

An attempt was made to call [`stream.write()`][] with a `null` chunk.

<a id="ERR_STREAM_PREMATURE_CLOSE"></a>
### ERR_STREAM_PREMATURE_CLOSE

An error returned by `stream.onEnd` and `stream.pipeline`, when a stream
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: `stream.onEnd` and `stream.pipeline` -> `stream.onEnd()` and `stream.pipeline()`?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be stream.finished() and not stream.onEnd(), since the former is what is currently used in this PR.

or a pipeline ends non gracefully with no explicit error.

<a id="ERR_STREAM_PUSH_AFTER_EOF"></a>
### ERR_STREAM_PUSH_AFTER_EOF

Expand Down
1 change: 1 addition & 0 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,7 @@ E('ERR_STDOUT_CLOSE', 'process.stdout cannot be closed', Error);
E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error);
E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error);
E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError);
E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error);
E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error);
E('ERR_STREAM_UNSHIFT_AFTER_END_EVENT',
'stream.unshift() after end event', Error);
Expand Down
96 changes: 96 additions & 0 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Ported from https://github.com/mafintosh/end-of-stream with
// permission from the author, Mathias Buus (@mafintosh).

'use strict';

const {
ERR_STREAM_PREMATURE_CLOSE
} = require('internal/errors').codes;

function noop() {}

function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function';
}

function once(callback) {
let called = false;
return function(err) {
if (called) return;
called = true;
callback.call(this, err);
};
}

function eos(stream, opts, callback) {
if (typeof opts === 'function') return eos(stream, null, opts);
if (!opts) opts = {};

callback = once(callback || noop);

const ws = stream._writableState;
const rs = stream._readableState;
let readable = opts.readable || (opts.readable !== false && stream.readable);
let writable = opts.writable || (opts.writable !== false && stream.writable);

const onlegacyfinish = () => {
if (!stream.writable) onfinish();
};

const onfinish = () => {
writable = false;
if (!readable) callback.call(stream);
};

const onend = () => {
readable = false;
if (!writable) callback.call(stream);
};

const onerror = (err) => {
callback.call(stream, err);
};

const onclose = () => {
if (readable && !(rs && rs.ended)) {
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
}
if (writable && !(ws && ws.ended)) {
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
}
};

const onrequest = () => {
stream.req.on('finish', onfinish);
};

if (isRequest(stream)) {
stream.on('complete', onfinish);
stream.on('abort', onclose);
if (stream.req) onrequest();
else stream.on('request', onrequest);
} else if (writable && !ws) { // legacy streams
stream.on('end', onlegacyfinish);
stream.on('close', onlegacyfinish);
}

stream.on('end', onend);
stream.on('finish', onfinish);
if (opts.error !== false) stream.on('error', onerror);
stream.on('close', onclose);

return function() {
stream.removeListener('complete', onfinish);
stream.removeListener('abort', onclose);
stream.removeListener('request', onrequest);
if (stream.req) stream.req.removeListener('finish', onfinish);
stream.removeListener('end', onlegacyfinish);
stream.removeListener('close', onlegacyfinish);
stream.removeListener('finish', onfinish);
stream.removeListener('end', onend);
stream.removeListener('error', onerror);
stream.removeListener('close', onclose);
};
}

module.exports = eos;
95 changes: 95 additions & 0 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Ported from https://github.com/mafintosh/pump with
// permission from the author, Mathias Buus (@mafintosh).
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the original code has a copyright/license, then it should likely either be included here or embedded in the LICENSE file.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does, but I did mention here I'm fine waiving copyright, mafintosh/pump#17 (comment)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I'm fine with whatever is easiest btw)


'use strict';

const eos = require('internal/streams/end-of-stream');

const {
ERR_MISSING_ARGS,
ERR_STREAM_DESTROYED
} = require('internal/errors').codes;

function once(callback) {
let called = false;
return function(err) {
if (called) return;
called = true;
callback(err);
};
}

function noop() {}

function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function';
}

function destroyer(stream, reading, writing, callback) {
callback = once(callback);

let closed = false;
stream.on('close', () => {
closed = true;
});

eos(stream, { readable: reading, writable: writing }, (err) => {
if (err) return callback(err);
closed = true;
callback();
});

let destroyed = false;
return (err) => {
if (closed) return;
if (destroyed) return;
destroyed = true;

// request.destroy just do .end - .abort is what we want
if (isRequest(stream)) return stream.abort();
if (typeof stream.destroy === 'function') return stream.destroy();

callback(err || new ERR_STREAM_DESTROYED('pipe'));
};
}

function call(fn) {
fn();
}

function pipe(from, to) {
return from.pipe(to);
}

function popCallback(streams) {
if (!streams.length) return noop;
if (typeof streams[streams.length - 1] !== 'function') return noop;
return streams.pop();
}

function pipeline(...streams) {
const callback = popCallback(streams);

if (Array.isArray(streams[0])) streams = streams[0];

if (streams.length < 2) {
throw new ERR_MISSING_ARGS('streams');
}

let error;
const destroys = streams.map(function(stream, i) {
const reading = i < streams.length - 1;
const writing = i > 0;
return destroyer(stream, reading, writing, function(err) {
if (!error) error = err;
if (err) destroys.forEach(call);
if (reading) return;
destroys.forEach(call);
callback(error);
});
});

return streams.reduce(pipe);
}

module.exports = pipeline;
5 changes: 5 additions & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
'use strict';

const { Buffer } = require('buffer');
const pipeline = require('internal/streams/pipeline');
const eos = require('internal/streams/end-of-stream');

// Note: export Stream before Readable/Writable/Duplex/...
// to avoid a cross-reference(require) issues
Expand All @@ -33,6 +35,9 @@ Stream.Duplex = require('_stream_duplex');
Stream.Transform = require('_stream_transform');
Stream.PassThrough = require('_stream_passthrough');

Stream.pipeline = pipeline;
Stream.onEnd = eos;

// Backwards-compat with node 0.4.x
Stream.Stream = Stream;

Expand Down
2 changes: 2 additions & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@
'lib/internal/streams/legacy.js',
'lib/internal/streams/destroy.js',
'lib/internal/streams/state.js',
'lib/internal/streams/pipeline.js',
'lib/internal/streams/end-of-stream.js',
'lib/internal/wrap_js_stream.js',
'deps/v8/tools/splaytree.js',
'deps/v8/tools/codemap.js',
Expand Down