Skip to content

Commit b03845b

Browse files
ronagmcollina
authored andcommitted
stream: make finished call the callback if the stream is closed
Make stream.finished callback invoked if stream is already closed/destroyed. PR-URL: nodejs#28748 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Trivikram Kamat <trivikr.dev@gmail.com>
1 parent d62d2b4 commit b03845b

4 files changed

Lines changed: 266 additions & 30 deletions

File tree

lib/internal/streams/async_iterator.js

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -112,16 +112,6 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
112112
return() {
113113
return new Promise((resolve, reject) => {
114114
const stream = this[kStream];
115-
116-
// TODO(ronag): Remove this check once finished() handles
117-
// already ended and/or destroyed streams.
118-
const ended = stream.destroyed || stream.readableEnded ||
119-
(stream._readableState && stream._readableState.endEmitted);
120-
if (ended) {
121-
resolve(createIterResult(undefined, true));
122-
return;
123-
}
124-
125115
finished(stream, (err) => {
126116
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
127117
reject(err);

lib/internal/streams/end-of-stream.js

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,43 +28,49 @@ function eos(stream, opts, callback) {
2828

2929
callback = once(callback);
3030

31+
const onerror = (err) => {
32+
callback.call(stream, err);
33+
};
34+
35+
let writableFinished = stream.writableFinished ||
36+
(stream._writableState && stream._writableState.finished);
37+
let readableEnded = stream.readableEnded ||
38+
(stream._readableState && stream._readableState.endEmitted);
39+
40+
if (writableFinished || readableEnded || stream.destroyed ||
41+
stream.aborted) {
42+
if (opts.error !== false) stream.on('error', onerror);
43+
// A destroy(err) call emits error in nextTick.
44+
process.nextTick(callback.bind(stream));
45+
return () => {
46+
stream.removeListener('error', onerror);
47+
};
48+
}
49+
3150
let readable = opts.readable || (opts.readable !== false && stream.readable);
3251
let writable = opts.writable || (opts.writable !== false && stream.writable);
3352

3453
const onlegacyfinish = () => {
3554
if (!stream.writable) onfinish();
3655
};
3756

38-
var writableEnded = stream._writableState && stream._writableState.finished;
3957
const onfinish = () => {
4058
writable = false;
41-
writableEnded = true;
59+
writableFinished = true;
4260
if (!readable) callback.call(stream);
4361
};
4462

45-
var readableEnded = stream.readableEnded ||
46-
(stream._readableState && stream._readableState.endEmitted);
4763
const onend = () => {
4864
readable = false;
4965
readableEnded = true;
5066
if (!writable) callback.call(stream);
5167
};
5268

53-
const onerror = (err) => {
54-
callback.call(stream, err);
55-
};
56-
5769
const onclose = () => {
58-
let err;
5970
if (readable && !readableEnded) {
60-
if (!stream._readableState || !stream._readableState.ended)
61-
err = new ERR_STREAM_PREMATURE_CLOSE();
62-
return callback.call(stream, err);
63-
}
64-
if (writable && !writableEnded) {
65-
if (!stream._writableState || !stream._writableState.ended)
66-
err = new ERR_STREAM_PREMATURE_CLOSE();
67-
return callback.call(stream, err);
71+
callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
72+
} else if (writable && !writableFinished) {
73+
callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
6874
}
6975
};
7076

test/parallel/test-http-client-finished.js

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,109 @@ const { finished } = require('stream');
2525
.end();
2626
}));
2727
}
28+
29+
{
30+
// Test abort before finished.
31+
32+
const server = http.createServer(function(req, res) {
33+
});
34+
35+
server.listen(0, common.mustCall(function() {
36+
const req = http.request({
37+
port: this.address().port
38+
}, common.mustNotCall());
39+
req.abort();
40+
finished(req, common.mustCall(() => {
41+
server.close();
42+
}));
43+
}));
44+
}
45+
46+
{
47+
// Test abort after request.
48+
49+
const server = http.createServer(function(req, res) {
50+
});
51+
52+
server.listen(0, common.mustCall(function() {
53+
const req = http.request({
54+
port: this.address().port
55+
}).end();
56+
finished(req, (err) => {
57+
common.expectsError({
58+
type: Error,
59+
code: 'ERR_STREAM_PREMATURE_CLOSE'
60+
})(err);
61+
finished(req, common.mustCall(() => {
62+
server.close();
63+
}));
64+
});
65+
req.abort();
66+
}));
67+
}
68+
69+
{
70+
// Test abort before end.
71+
72+
const server = http.createServer(function(req, res) {
73+
res.write('test');
74+
});
75+
76+
server.listen(0, common.mustCall(function() {
77+
const req = http.request({
78+
port: this.address().port
79+
}).on('response', common.mustCall((res) => {
80+
req.abort();
81+
finished(res, common.mustCall(() => {
82+
finished(res, common.mustCall(() => {
83+
server.close();
84+
}));
85+
}));
86+
})).end();
87+
}));
88+
}
89+
90+
{
91+
// Test destroy before end.
92+
93+
const server = http.createServer(function(req, res) {
94+
res.write('test');
95+
});
96+
97+
server.listen(0, common.mustCall(function() {
98+
http.request({
99+
port: this.address().port
100+
}).on('response', common.mustCall((res) => {
101+
// TODO(ronag): Bug? Won't emit 'close' unless read.
102+
res.on('data', () => {});
103+
res.destroy();
104+
finished(res, common.mustCall(() => {
105+
finished(res, common.mustCall(() => {
106+
server.close();
107+
}));
108+
}));
109+
})).end();
110+
}));
111+
}
112+
113+
{
114+
// Test finish after end.
115+
116+
const server = http.createServer(function(req, res) {
117+
res.end('asd');
118+
});
119+
120+
server.listen(0, common.mustCall(function() {
121+
http.request({
122+
port: this.address().port
123+
}).on('response', common.mustCall((res) => {
124+
// TODO(ronag): Bug? Won't emit 'close' unless read.
125+
res.on('data', () => {});
126+
finished(res, common.mustCall(() => {
127+
finished(res, common.mustCall(() => {
128+
server.close();
129+
}));
130+
}));
131+
})).end();
132+
}));
133+
}

test/parallel/test-stream-finished.js

Lines changed: 137 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,18 @@ const { promisify } = require('util');
9797
}));
9898
}
9999

100+
{
101+
const rs = new Readable();
102+
103+
finished(rs, common.mustCall((err) => {
104+
assert(err, 'premature close error');
105+
}));
106+
107+
rs.push(null);
108+
rs.emit('close');
109+
rs.resume();
110+
}
111+
100112
{
101113
const rs = new Readable();
102114

@@ -105,7 +117,9 @@ const { promisify } = require('util');
105117
}));
106118

107119
rs.push(null);
108-
rs.emit('close'); // Should not trigger an error
120+
rs.on('end', common.mustCall(() => {
121+
rs.emit('close'); // Should not trigger an error
122+
}));
109123
rs.resume();
110124
}
111125

@@ -155,8 +169,9 @@ const { promisify } = require('util');
155169
rs.resume();
156170
}
157171

158-
// Test that calling returned function removes listeners
159172
{
173+
// Nothing happens if disposed.
174+
160175
const ws = new Writable({
161176
write(data, env, cb) {
162177
cb();
@@ -168,6 +183,8 @@ const { promisify } = require('util');
168183
}
169184

170185
{
186+
// Nothing happens if disposed.
187+
171188
const rs = new Readable();
172189
const removeListeners = finished(rs, common.mustNotCall());
173190
removeListeners();
@@ -178,9 +195,126 @@ const { promisify } = require('util');
178195
}
179196

180197
{
198+
// Completed if readable-like is ended before.
199+
181200
const streamLike = new EE();
182201
streamLike.readableEnded = true;
183202
streamLike.readable = true;
184-
finished(streamLike, common.mustCall);
203+
finished(streamLike, common.mustCall());
204+
}
205+
206+
{
207+
// Completed if readable-like is never ended.
208+
209+
const streamLike = new EE();
210+
streamLike.readableEnded = false;
211+
streamLike.readable = true;
212+
finished(streamLike, common.expectsError({
213+
code: 'ERR_STREAM_PREMATURE_CLOSE'
214+
}));
215+
streamLike.emit('close');
216+
}
217+
218+
{
219+
// Completed if writable-like is destroyed before.
220+
221+
const streamLike = new EE();
222+
streamLike.destroyed = true;
223+
streamLike.writable = true;
224+
finished(streamLike, common.mustCall());
225+
}
226+
227+
{
228+
// Completed if readable-like is aborted before.
229+
230+
const streamLike = new EE();
231+
streamLike.destroyed = true;
232+
streamLike.readable = true;
233+
finished(streamLike, common.mustCall());
234+
}
235+
236+
{
237+
// Completed if writable-like is aborted before.
238+
239+
const streamLike = new EE();
240+
streamLike.aborted = true;
241+
streamLike.writable = true;
242+
finished(streamLike, common.mustCall());
243+
}
244+
245+
{
246+
// Completed if readable-like is aborted before.
247+
248+
const streamLike = new EE();
249+
streamLike.aborted = true;
250+
streamLike.readable = true;
251+
finished(streamLike, common.mustCall());
252+
}
253+
254+
{
255+
// Completed if streamlike is finished before.
256+
257+
const streamLike = new EE();
258+
streamLike.writableFinished = true;
259+
streamLike.writable = true;
260+
finished(streamLike, common.mustCall());
261+
}
262+
263+
{
264+
// Premature close if stream is not finished.
265+
266+
const streamLike = new EE();
267+
streamLike.writableFinished = false;
268+
streamLike.writable = true;
269+
finished(streamLike, common.expectsError({
270+
code: 'ERR_STREAM_PREMATURE_CLOSE'
271+
}));
272+
streamLike.emit('close');
273+
}
274+
275+
{
276+
// Premature close if stream never emitted 'finish'
277+
// even if writableFinished says something else.
278+
279+
const streamLike = new EE();
280+
streamLike.writable = true;
281+
finished(streamLike, common.expectsError({
282+
code: 'ERR_STREAM_PREMATURE_CLOSE'
283+
}));
284+
streamLike.writableFinished = true;
285+
streamLike.emit('close');
286+
}
287+
288+
289+
{
290+
// Premature close if stream never emitted 'end'
291+
// even if readableEnded says something else.
292+
293+
const streamLike = new EE();
294+
streamLike.readable = true;
295+
finished(streamLike, common.expectsError({
296+
code: 'ERR_STREAM_PREMATURE_CLOSE'
297+
}));
298+
streamLike.readableEnded = true;
185299
streamLike.emit('close');
186300
}
301+
302+
{
303+
// Completes if already finished.
304+
305+
const w = new Writable();
306+
finished(w, common.mustCall(() => {
307+
finished(w, common.mustCall());
308+
}));
309+
w.destroy();
310+
}
311+
312+
{
313+
// Completes if already ended.
314+
315+
const r = new Readable();
316+
finished(r, common.mustCall(() => {
317+
finished(r, common.mustCall());
318+
}));
319+
r.destroy();
320+
}

0 commit comments

Comments
 (0)