Skip to content

Commit 2c96024

Browse files
committed
stream: first error wins and cannot be overriden
The first stream error is the only one that gets emitted as 'error' or forwarded in callbacks. Also it cannot be override by _destroy. Refs: #30979
1 parent 67ed526 commit 2c96024

File tree

8 files changed

+94
-53
lines changed

8 files changed

+94
-53
lines changed

lib/_stream_readable.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,8 @@ function ReadableState(options, stream, isDuplex) {
140140
// Has it been destroyed
141141
this.destroyed = false;
142142

143-
// Indicates whether the stream has errored.
144-
this.errored = false;
143+
// Indicates whether the stream has errored. Contains the error.
144+
this.errored = null;
145145

146146
// Crypto is kind of old and crusty. Historically, its default string
147147
// encoding is 'binary' so we have to make this configurable.

lib/_stream_writable.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,8 @@ function WritableState(options, stream, isDuplex) {
171171
// Indicates whether the stream has errored. When true all write() calls
172172
// should return false. This is needed since when autoDestroy
173173
// is disabled we need a way to tell whether the stream has failed.
174-
this.errored = false;
174+
// Contains the error.
175+
this.errored = null;
175176

176177
// Count buffered requests
177178
this.bufferedRequestCount = 0;
@@ -488,7 +489,12 @@ function onwrite(stream, er) {
488489
state.writelen = 0;
489490

490491
if (er) {
491-
state.errored = true;
492+
if (!state.errored) {
493+
state.errored = er;
494+
}
495+
if (stream._readableState && !stream._readableState.errored) {
496+
stream._readableState.errored = er;
497+
}
492498
if (sync) {
493499
process.nextTick(onwriteError, stream, state, er, cb);
494500
} else {

lib/internal/streams/destroy.js

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,19 @@ function destroy(err, cb) {
88
const w = this._writableState;
99

1010
if (err) {
11-
if (w) {
12-
w.errored = true;
11+
if (w && !w.errored) {
12+
w.errored = err;
1313
}
14-
if (r) {
15-
r.errored = true;
14+
if (r && !r.errored) {
15+
r.errored = err;
1616
}
1717
}
1818

1919
if ((w && w.destroyed) || (r && r.destroyed)) {
2020
if (cb) {
21-
cb(err);
22-
} else if (err) {
23-
process.nextTick(emitErrorNT, this, err);
21+
// TODO(ronag): Wait until 'close' is emitted (if not already emitted).
22+
cb((w && w.errored) || (r && r.errored));
2423
}
25-
2624
return this;
2725
}
2826

@@ -38,32 +36,30 @@ function destroy(err, cb) {
3836

3937
this._destroy(err || null, (err) => {
4038
if (err) {
41-
if (w) {
42-
w.errored = true;
39+
if (w && !w.errored) {
40+
w.errored = err;
4341
}
44-
if (r) {
45-
r.errored = true;
42+
if (r && !r.errored) {
43+
r.errored = err;
4644
}
4745
}
4846

4947
if (cb) {
5048
// Invoke callback before scheduling emitClose so that callback
5149
// can schedule before.
52-
cb(err);
50+
cb((w && w.errored) || (r && r.errored));
5351
// Don't emit 'error' if passed a callback.
5452
process.nextTick(emitCloseNT, this);
55-
} else if (err) {
56-
process.nextTick(emitErrorCloseNT, this, err);
5753
} else {
58-
process.nextTick(emitCloseNT, this);
54+
process.nextTick(emitErrorCloseNT, this);
5955
}
6056
});
6157

6258
return this;
6359
}
6460

65-
function emitErrorCloseNT(self, err) {
66-
emitErrorNT(self, err);
61+
function emitErrorCloseNT(self) {
62+
emitErrorNT(self);
6763
emitCloseNT(self);
6864
}
6965

@@ -76,10 +72,16 @@ function emitCloseNT(self) {
7672
}
7773
}
7874

79-
function emitErrorNT(self, err) {
75+
function emitErrorNT(self) {
8076
const r = self._readableState;
8177
const w = self._writableState;
8278

79+
const err = (w && w.errored) || (r && r.errored);
80+
81+
if (!err) {
82+
return;
83+
}
84+
8385
if ((w && w.errorEmitted) || (r && r.errorEmitted)) {
8486
return;
8587
}
@@ -100,7 +102,7 @@ function undestroy() {
100102

101103
if (r) {
102104
r.destroyed = false;
103-
r.errored = false;
105+
r.errored = null;
104106
r.reading = false;
105107
r.ended = false;
106108
r.endEmitted = false;
@@ -109,7 +111,7 @@ function undestroy() {
109111

110112
if (w) {
111113
w.destroyed = false;
112-
w.errored = false;
114+
w.errored = null;
113115
w.ended = false;
114116
w.ending = false;
115117
w.finalCalled = false;
@@ -132,13 +134,13 @@ function errorOrDestroy(stream, err) {
132134
if ((r && r.autoDestroy) || (w && w.autoDestroy))
133135
stream.destroy(err);
134136
else if (err) {
135-
if (w) {
136-
w.errored = true;
137+
if (w && !w.errored) {
138+
w.errored = err;
137139
}
138-
if (r) {
139-
r.errored = true;
140+
if (r && !r.errored) {
141+
r.errored = err;
140142
}
141-
emitErrorNT(stream, err);
143+
emitErrorNT(stream);
142144
}
143145
}
144146

test/parallel/test-stream-auto-destroy.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,27 +86,29 @@ const assert = require('assert');
8686
{
8787
const r = new stream.Readable({
8888
read() {
89-
r2.emit('error', new Error('fail'));
89+
r2.destroy(new Error('fail'));
9090
}
9191
});
9292
const r2 = new stream.Readable({
9393
autoDestroy: true,
9494
destroy: common.mustCall((err, cb) => cb())
9595
});
96+
r2.on('error', common.mustCall());
9697

9798
r.pipe(r2);
9899
}
99100

100101
{
101102
const r = new stream.Readable({
102103
read() {
103-
w.emit('error', new Error('fail'));
104+
w.destroy(new Error('fail'));
104105
}
105106
});
106107
const w = new stream.Writable({
107108
autoDestroy: true,
108109
destroy: common.mustCall((err, cb) => cb())
109110
});
111+
w.on('error', common.mustCall());
110112

111113
r.pipe(w);
112114
}

test/parallel/test-stream-duplex-destroy.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ const assert = require('assert');
7676
duplex.on('end', common.mustNotCall('no end event'));
7777
duplex.on('finish', common.mustNotCall('no finish event'));
7878

79-
// Error is swallowed by the custom _destroy
80-
duplex.on('error', common.mustNotCall('no error event'));
79+
// Error is NOT swallowed by the custom _destroy
80+
duplex.on('error', common.mustCall());
8181
duplex.on('close', common.mustCall());
8282

8383
duplex.destroy(expected);

test/parallel/test-stream-readable-destroy.js

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ const assert = require('assert');
6969

7070
read.on('end', common.mustNotCall('no end event'));
7171

72-
// Error is swallowed by the custom _destroy
73-
read.on('error', common.mustNotCall('no error event'));
72+
// Error is NOT swallowed by the custom _destroy
73+
read.on('error', common.mustCall());
7474
read.on('close', common.mustCall());
7575

7676
read.destroy(expected);
@@ -134,13 +134,13 @@ const assert = require('assert');
134134
read.on('error', common.mustCall((err) => {
135135
assert.strictEqual(ticked, true);
136136
assert.strictEqual(read._readableState.errorEmitted, true);
137-
assert.strictEqual(read._readableState.errored, true);
137+
assert.strictEqual(read._readableState.errored, expected);
138138
assert.strictEqual(err, expected);
139139
}));
140140

141141
read.destroy();
142142
assert.strictEqual(read._readableState.errorEmitted, false);
143-
assert.strictEqual(read._readableState.errored, true);
143+
assert.strictEqual(read._readableState.errored, expected);
144144
assert.strictEqual(read.destroyed, true);
145145
ticked = true;
146146
}
@@ -190,15 +190,15 @@ const assert = require('assert');
190190
// destroy(err, callback);
191191
read.on('error', common.mustNotCall());
192192

193-
assert.strictEqual(read._readableState.errored, false);
193+
assert.strictEqual(read._readableState.errored, null);
194194
assert.strictEqual(read._readableState.errorEmitted, false);
195195

196196
read.destroy(expected, common.mustCall(function(err) {
197-
assert.strictEqual(read._readableState.errored, true);
197+
assert.strictEqual(read._readableState.errored, expected);
198198
assert.strictEqual(err, expected);
199199
}));
200200
assert.strictEqual(read._readableState.errorEmitted, false);
201-
assert.strictEqual(read._readableState.errored, true);
201+
assert.strictEqual(read._readableState.errored, expected);
202202
ticked = true;
203203
}
204204

@@ -223,14 +223,15 @@ const assert = require('assert');
223223

224224
readable.destroy();
225225
assert.strictEqual(readable.destroyed, true);
226-
assert.strictEqual(readable._readableState.errored, false);
226+
assert.strictEqual(readable._readableState.errored, null);
227227
assert.strictEqual(readable._readableState.errorEmitted, false);
228228

229229
// Test case where `readable.destroy()` is called again with an error before
230230
// the `_destroy()` callback is called.
231-
readable.destroy(new Error('kaboom 2'));
231+
const expected = new Error('kaboom 2');
232+
readable.destroy(expected);
232233
assert.strictEqual(readable._readableState.errorEmitted, false);
233-
assert.strictEqual(readable._readableState.errored, true);
234+
assert.strictEqual(readable._readableState.errored, expected);
234235

235236
ticked = true;
236237
}

test/parallel/test-stream-transform-destroy.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ const assert = require('assert');
7272
transform.on('close', common.mustCall());
7373
transform.on('finish', common.mustNotCall('no finish event'));
7474

75-
// Error is swallowed by the custom _destroy
76-
transform.on('error', common.mustNotCall('no error event'));
75+
// Error is NOT swallowed by the custom _destroy
76+
transform.on('error', common.mustCall());
7777

7878
transform.destroy(expected);
7979
}

test/parallel/test-stream-writable-destroy.js

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,33 @@ const assert = require('assert');
8383
write.on('finish', common.mustNotCall('no finish event'));
8484
write.on('close', common.mustCall());
8585

86-
// Error is swallowed by the custom _destroy
87-
write.on('error', common.mustNotCall('no error event'));
86+
// Error is NOT swallowed by the custom _destroy
87+
write.on('error', common.mustCall((err) => {
88+
assert.strictEqual(err, expected);
89+
}));
90+
91+
write.destroy(expected);
92+
assert.strictEqual(write.destroyed, true);
93+
}
94+
95+
{
96+
const write = new Writable({
97+
write(chunk, enc, cb) { cb(); },
98+
destroy: common.mustCall(function(err, cb) {
99+
assert.strictEqual(err, expected);
100+
cb(new Error('not this error'));
101+
})
102+
});
103+
104+
const expected = new Error('kaboom');
105+
106+
write.on('finish', common.mustNotCall('no finish event'));
107+
write.on('close', common.mustCall());
108+
109+
// Error is NOT overriden by the custom _destroy
110+
write.on('error', common.mustCall((err) => {
111+
assert.strictEqual(err, expected);
112+
}));
88113

89114
write.destroy(expected);
90115
assert.strictEqual(write.destroyed, true);
@@ -167,9 +192,10 @@ const assert = require('assert');
167192
assert.strictEqual(write._writableState.errorEmitted, true);
168193
}));
169194

170-
write.destroy(new Error('kaboom 1'));
195+
const expected = new Error('kaboom 1');
196+
write.destroy(expected);
171197
write.destroy(new Error('kaboom 2'));
172-
assert.strictEqual(write._writableState.errored, true);
198+
assert.strictEqual(write._writableState.errored, expected);
173199
assert.strictEqual(write._writableState.errorEmitted, false);
174200
assert.strictEqual(write.destroyed, true);
175201
ticked = true;
@@ -198,14 +224,15 @@ const assert = require('assert');
198224

199225
writable.destroy();
200226
assert.strictEqual(writable.destroyed, true);
201-
assert.strictEqual(writable._writableState.errored, false);
227+
assert.strictEqual(writable._writableState.errored, null);
202228
assert.strictEqual(writable._writableState.errorEmitted, false);
203229

204230
// Test case where `writable.destroy()` is called again with an error before
205231
// the `_destroy()` callback is called.
206-
writable.destroy(new Error('kaboom 2'));
232+
const expected = new Error('kaboom 2');
233+
writable.destroy(expected);
207234
assert.strictEqual(writable._writableState.errorEmitted, false);
208-
assert.strictEqual(writable._writableState.errored, true);
235+
assert.strictEqual(writable._writableState.errored, expected);
209236

210237
ticked = true;
211238
}
@@ -249,6 +276,9 @@ const assert = require('assert');
249276
write.destroy(expected, common.mustCall(function(err) {
250277
assert.strictEqual(err, expected);
251278
}));
279+
write.on('error', common.mustCall((err) => {
280+
assert.strictEqual(err, expected);
281+
}));
252282
}
253283

254284
{

0 commit comments

Comments
 (0)