Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
fixup
  • Loading branch information
ronag committed Jan 18, 2020
commit f730a54a981897b47e8e1089b26bfcc71d7dd442
2 changes: 2 additions & 0 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ function ReadableState(options, stream, isDuplex) {
// Indicates whether the stream has errored.
this.errored = false;

this.closed = false;

// Crypto is kind of old and crusty. Historically, its default string
// encoding is 'binary' so we have to make this configurable.
// Everything else in the universe uses 'utf8', though.
Expand Down
2 changes: 2 additions & 0 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ function WritableState(options, stream, isDuplex) {
// is disabled we need a way to tell whether the stream has failed.
this.errored = false;

this.closed = false;

// Count buffered requests
this.bufferedRequestCount = 0;

Expand Down
9 changes: 9 additions & 0 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ function emitCloseNT(self) {
const r = self._readableState;
const w = self._writableState;

if (w) {
w.closed = true;
}
if (r) {
r.closed = true;
}

if ((w && w.emitClose) || (r && r.emitClose)) {
self.emit('close');
}
Expand Down Expand Up @@ -103,6 +110,7 @@ function undestroy() {
if (r) {
r.destroyed = false;
r.errored = false;
r.closed = false;
r.reading = false;
r.ended = false;
r.endEmitted = false;
Expand All @@ -112,6 +120,7 @@ function undestroy() {
if (w) {
w.destroyed = false;
w.errored = false;
w.closed = false;
w.ended = false;
w.ending = false;
w.finalCalled = false;
Expand Down
23 changes: 16 additions & 7 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,25 @@ function eos(stream, opts, callback) {
callback.call(stream, err);
};

let writableFinished = stream.writableFinished ||
(stream._writableState && stream._writableState.finished);
let readableEnded = stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);
const w = stream._writableState;
const r = stream._readableState;

if (writableFinished || readableEnded || stream.destroyed ||
stream.aborted) {
let writableFinished = stream.writableFinished || (w && w.finished);
let readableEnded = stream.readableEnded || (r && r.endEmitted);

const errorEmitted = (w && w.errorEmitted) || (r && r.errorEmitted);
const closed = (w && w.closed) || (r && r.closed);

if (writableFinished || readableEnded || errorEmitted || closed) {
// TODO(ronag): rethrow if errorEmitted?
// TODO(ronag): premature close if closed but not
// errored, finished or ended?

// Swallow any error past this point.
if (opts.error !== false) stream.on('error', onerror);
// A destroy(err) call emits error in nextTick.

process.nextTick(callback.bind(stream));

return () => {
stream.removeListener('error', onerror);
};
Expand Down
42 changes: 5 additions & 37 deletions test/parallel/test-stream-finished.js
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,14 @@ const { promisify } = require('util');
{
// Completed if readable-like is ended before.

let ticked = false;
const streamLike = new EE();
streamLike.readableEnded = true;
streamLike.readable = true;
finished(streamLike, common.mustCall());
finished(streamLike, common.mustCall(() => {
assert.strictEqual(ticked, true);
}));
ticked = true;
}

{
Expand All @@ -215,42 +219,6 @@ const { promisify } = require('util');
streamLike.emit('close');
}

{
// Completed if writable-like is destroyed before.

const streamLike = new EE();
streamLike.destroyed = true;
streamLike.writable = true;
finished(streamLike, common.mustCall());
}

{
// Completed if readable-like is aborted before.

const streamLike = new EE();
streamLike.destroyed = true;
streamLike.readable = true;
finished(streamLike, common.mustCall());
}

{
// Completed if writable-like is aborted before.

const streamLike = new EE();
streamLike.aborted = true;
streamLike.writable = true;
finished(streamLike, common.mustCall());
}

{
// Completed if readable-like is aborted before.

const streamLike = new EE();
streamLike.aborted = true;
streamLike.readable = true;
finished(streamLike, common.mustCall());
}

{
// Completed if streamlike is finished before.

Expand Down