Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
const {
ArrayIsArray,
SymbolAsyncIterator,
SymbolIterator
SymbolIterator,
Promise
} = primordials;

let eos;
Expand Down Expand Up @@ -113,6 +114,10 @@ function isStream(obj) {
return isReadable(obj) || isWritable(obj);
}

function isStdio(obj) {
return obj === process.stdout || obj === process.stderr;
}

function isIterable(obj, isAsync) {
if (!obj) return false;
if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function';
Expand Down Expand Up @@ -147,13 +152,29 @@ async function pump(iterable, writable, finish) {
}
let error;
try {
for await (const chunk of iterable) {
if (!writable.write(chunk)) {
if (writable.destroyed) return;
await EE.once(writable, 'drain');
if (!isStdio(writable)) {
for await (const chunk of iterable) {
if (!writable.write(chunk)) {
if (writable.destroyed) return;
await EE.once(writable, 'drain');
}
}
writable.end();
} else {
const errorPromise = new Promise((resolve, reject) => {
writable.on('error', reject);
});
for await (const chunk of iterable) {
await Promise.race([
errorPromise,
new Promise((resolve, reject) => {
writable.write(chunk, null, (err) => {
err ? reject(err) : resolve();
});
})
]);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is likely faster to use the callbacks and manually handle everything. It could be wrapped in a single promise to allow to await the result for simplicity. Alternatively just also handle the finish call separately.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is likely faster to use the callbacks and manually handle everything. It could be wrapped in a single promise to allow to await the result for simplicity

Yes, but that makes the implementation more complicated... at least as far as I can determine.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BridgeAR PTAL

}
}
writable.end();
} catch (err) {
error = err;
} finally {
Expand Down Expand Up @@ -202,7 +223,7 @@ function pipeline(...streams) {
const reading = i < streams.length - 1;
const writing = i > 0;

if (isStream(stream)) {
if (isStream(stream) && !isStdio(stream)) {
finishCount++;
destroys.push(destroyer(stream, reading, writing, !reading, finish));
}
Expand Down Expand Up @@ -263,7 +284,7 @@ function pipeline(...streams) {
destroys.push(destroyer(ret, false, true, true, finish));
}
} else if (isStream(stream)) {
if (isReadable(ret)) {
if (isReadable(ret) && !isStdio(stream)) {
ret.pipe(stream);
} else {
ret = makeAsyncIterable(ret);
Expand Down
27 changes: 27 additions & 0 deletions test/parallel/test-stream-pipeline-process.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
'use strict';

const common = require('../common');
const assert = require('assert');

if (process.argv[2] === 'child') {
const { pipeline } = require('stream');
pipeline(
process.stdin,
process.stdout,
common.mustCall((err) => {
assert.ifError(err);
})
);
} else {
const cp = require('child_process');
cp.exec([
'echo',
'"hello"',
'|',
`"${process.execPath}"`,
`"${__filename}"`,
'child'
].join(' '), common.mustCall((err) => {
assert.ifError(err);
}));
}