Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
stream: fix stream.finished on Duplex
finished would incorrectly believe that a Duplex is already
closed if either the readable or writable side has completed.

Fixes: #33130

PR-URL: #33133
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
ronag committed Apr 30, 2020
commit d84f1312915fe45fe0febe888db692c74894c382
16 changes: 12 additions & 4 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,25 @@ function eos(stream, opts, callback) {
if (opts.error !== false) stream.on('error', onerror);
stream.on('close', onclose);

const closed = (wState && wState.closed) || (rState && rState.closed) ||
(wState && wState.errorEmitted) || (rState && rState.errorEmitted) ||
(wState && wState.finished) || (rState && rState.endEmitted) ||
(rState && stream.req && stream.aborted);
const closed = (
(wState && wState.closed) ||
(rState && rState.closed) ||
(wState && wState.errorEmitted) ||
(rState && rState.errorEmitted) ||
(rState && stream.req && stream.aborted) ||
(
(!writable || (wState && wState.finished)) &&
(!readable || (rState && rState.endEmitted))
)
);

if (closed) {
// TODO(ronag): Re-throw error if errorEmitted?
// TODO(ronag): Throw premature close as if finished was called?
// before being closed? i.e. if closed but not errored, ended or finished.
// TODO(ronag): Throw some kind of error? Does it make sense
// to call finished() on a "finished" stream?
// TODO(ronag): willEmitClose?
process.nextTick(() => {
callback();
});
Expand Down
86 changes: 85 additions & 1 deletion test/parallel/test-stream-finished.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
'use strict';

const common = require('../common');
const { Writable, Readable, Transform, finished, Duplex } = require('stream');
const {
Writable,
Readable,
Transform,
finished,
Duplex,
PassThrough
} = require('stream');
const assert = require('assert');
const EE = require('events');
const fs = require('fs');
Expand Down Expand Up @@ -396,3 +403,80 @@ testClosed((opts) => new Writable({ write() {}, ...opts }));
r.destroyed = true;
r.push(null);
}

{
// Regression https://github.com/nodejs/node/issues/33130
const response = new PassThrough();

class HelloWorld extends Duplex {
constructor(response) {
super({
autoDestroy: false
});

this.response = response;
this.readMore = false;

response.once('end', () => {
this.push(null);
});

response.on('readable', () => {
if (this.readMore) {
this._read();
}
});
}

_read() {
const { response } = this;

this.readMore = true;

if (response.readableLength) {
this.readMore = false;
}

let data;
while ((data = response.read()) !== null) {
this.push(data);
}
}
}

const instance = new HelloWorld(response);
instance.setEncoding('utf8');
instance.end();

(async () => {
await EE.once(instance, 'finish');

setImmediate(() => {
response.write('chunk 1');
response.write('chunk 2');
response.write('chunk 3');
response.end();
});

let res = '';
for await (const data of instance) {
res += data;
}

assert.strictEqual(res, 'chunk 1chunk 2chunk 3');
})().then(common.mustCall());
}

{
const p = new PassThrough();
p.end();
finished(p, common.mustNotCall());
}

{
const p = new PassThrough();
p.end();
p.on('finish', common.mustCall(() => {
finished(p, common.mustNotCall());
}));
}