Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
stream: add the pipeline destoryDestOnError option
  • Loading branch information
rickyes committed Jul 8, 2020
commit 9fc636967e1773a544671dd2f4e7e1f8ea74ff5c
10 changes: 9 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1627,11 +1627,15 @@ const cleanup = finished(rs, (err) => {
});
```

### `stream.pipeline(source[, ...transforms], destination, callback)`
### `stream.pipeline(source[, ...transforms], destination[,options], callback)`
### `stream.pipeline(streams, callback)`

<!-- YAML
added: v10.0.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/
description: Add support for destroyDestOnError options.
- version: v13.10.0
pr-url: https://github.com/nodejs/node/pull/31223
description: Add support for async generators.
Expand All @@ -1652,6 +1656,10 @@ changes:
* `destination` {Stream|Function}
* `source` {AsyncIterable}
* Returns: {AsyncIterable|Promise}
* `options` {Object}
* `options.destroyDestOnError` {Boolean} If value is false, the destination
is not destroyed when a stream error occurs.
**Default:** `true`.
* `callback` {Function} Called when the pipeline is fully done.
* `err` {Error}
* `val` Resolved value of `Promise` returned by `destination`.
Expand Down
24 changes: 20 additions & 4 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ let EE;
let PassThrough;
let createReadableStreamAsyncIterator;

function destroyer(stream, reading, writing, callback) {
function destroyer(stream, reading, writing, destroyDestOnError, callback) {
callback = once(callback);

let finished = false;
Expand Down Expand Up @@ -63,7 +63,12 @@ function destroyer(stream, reading, writing, callback) {
return (err) => {
if (finished) return;
finished = true;
destroyImpl.destroyer(stream, err);
// When the reading is false and the writing is true, it
// can be considered a destination stream.
// If destroyDestOnError is false then stream is not destroyed.
if (!(reading === false && writing && destroyDestOnError === false)) {
destroyImpl.destroyer(stream, err);
}
callback(err || new ERR_STREAM_DESTROYED('pipe'));
};
}
Expand All @@ -77,6 +82,13 @@ function popCallback(streams) {
return streams.pop();
}

function popOptions(streams) {
const opts = streams[streams.length - 1];
if (!isStream(opts) &&
typeof opts === 'object' && opts !== null)
return streams.pop();
}

function isPromise(obj) {
return !!(obj && typeof obj.then === 'function');
}
Expand Down Expand Up @@ -142,13 +154,16 @@ async function pump(iterable, writable, finish) {

function pipeline(...streams) {
const callback = once(popCallback(streams));
const opts = popOptions(streams) || {};

if (ArrayIsArray(streams[0])) streams = streams[0];

if (streams.length < 2) {
throw new ERR_MISSING_ARGS('streams');
}

const destroyDestOnError = opts.destroyDestOnError;

let error;
let value;
const destroys = [];
Expand Down Expand Up @@ -183,7 +198,8 @@ function pipeline(...streams) {

if (isStream(stream)) {
finishCount++;
destroys.push(destroyer(stream, reading, writing, finish));
destroys.push(destroyer(stream, reading, writing,
destroyDestOnError, finish));
}

if (i === 0) {
Expand Down Expand Up @@ -241,7 +257,7 @@ function pipeline(...streams) {
ret = pt;

finishCount++;
destroys.push(destroyer(ret, false, true, finish));
destroys.push(destroyer(ret, false, true, destroyDestOnError, finish));
}
} else if (isStream(stream)) {
if (isReadable(ret)) {
Expand Down
1 change: 1 addition & 0 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict';

const fs = require('fs');
const common = require('../common');
const {
Stream,
Expand Down