Skip to content

Commit 327b6e3

Browse files
committed
stream: Don't emit 'end' unless read() called
This solves the problem of calling `readable.pipe(writable)` after the readable stream has already emitted 'end', as often is the case when writing simple HTTP proxies. The spirit of streams2 is that things will work properly, even if you don't set them up right away on the first tick. This approach breaks down, however, because pipe()ing from an ended readable will just do nothing. No more data will ever arrive, and the writable will hang open forever never being ended. However, that does not solve the case of adding a `on('end')` listener after the stream has received the EOF chunk, if it was the first chunk received (and thus, length was 0, and 'end' got emitted). So, with this, we defer the 'end' event emission until the read() function is called. Also, in pipe(), if the source has emitted 'end' already, we call the cleanup/onend function on nextTick. Piping from an already-ended stream is thus the same as piping from a stream that is in the process of ending. Updates many tests that were relying on 'end' coming immediately, even though they never read() from the req. Fix nodejs#4942
1 parent cd2b9f5 commit 327b6e3

15 files changed

Lines changed: 131 additions & 22 deletions

lib/_stream_readable.js

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ function ReadableState(options, stream) {
4949
this.endEmitted = false;
5050
this.reading = false;
5151

52+
// In streams that never have any data, and do push(null) right away,
53+
// the consumer can miss the 'end' event if they do some I/O before
54+
// consuming the stream. So, we don't emit('end') until some reading
55+
// happens.
56+
this.calledRead = false;
57+
5258
// a flag to be able to tell if the onwrite cb is called immediately,
5359
// or on a later tick. We set this to true at first, becuase any
5460
// actions that shouldn't happen until "later" should generally also
@@ -218,6 +224,7 @@ function howMuchToRead(n, state) {
218224
// you can override either this method, or the async _read(n) below.
219225
Readable.prototype.read = function(n) {
220226
var state = this._readableState;
227+
state.calledRead = true;
221228
var nOrig = n;
222229

223230
if (typeof n !== 'number' || n > 0)
@@ -430,13 +437,15 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
430437
}
431438
state.pipesCount += 1;
432439

433-
if ((!pipeOpts || pipeOpts.end !== false) &&
434-
dest !== process.stdout &&
435-
dest !== process.stderr) {
436-
src.once('end', onend);
437-
} else {
438-
src.once('end', cleanup);
439-
}
440+
var doEnd = (!pipeOpts || pipeOpts.end !== false) &&
441+
dest !== process.stdout &&
442+
dest !== process.stderr;
443+
444+
var endFn = doEnd ? onend : cleanup;
445+
if (state.endEmitted)
446+
process.nextTick(endFn);
447+
else
448+
src.once('end', endFn);
440449

441450
dest.on('unpipe', onunpipe);
442451
function onunpipe(readable) {
@@ -853,12 +862,12 @@ function endReadable(stream) {
853862
if (state.length > 0)
854863
throw new Error('endReadable called on non-empty stream');
855864

856-
if (state.endEmitted)
857-
return;
858-
state.ended = true;
859-
state.endEmitted = true;
860-
process.nextTick(function() {
861-
stream.readable = false;
862-
stream.emit('end');
863-
});
865+
if (!state.endEmitted && state.calledRead) {
866+
state.ended = true;
867+
state.endEmitted = true;
868+
process.nextTick(function() {
869+
stream.readable = false;
870+
stream.emit('end');
871+
});
872+
}
864873
}

lib/http.js

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -416,13 +416,13 @@ IncomingMessage.prototype._addHeaderLine = function(field, value) {
416416
// Call this instead of resume() if we want to just
417417
// dump all the data to /dev/null
418418
IncomingMessage.prototype._dump = function() {
419-
if (this._dumped)
420-
return;
421-
422-
this._dumped = true;
423-
if (this.socket.parser) this.socket.parser.incoming = null;
424-
this.push(null);
425-
readStart(this.socket);
419+
if (!this._dumped) {
420+
this._dumped = true;
421+
if (this.socket.parser) this.socket.parser.incoming = null;
422+
this.push(null);
423+
readStart(this.socket);
424+
this.read();
425+
}
426426
};
427427

428428

test/simple/test-http-allow-req-after-204-res.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ function nextRequest() {
5858
//process.nextTick(nextRequest);
5959
}
6060
});
61+
response.resume();
6162
});
6263
request.end();
6364
}

test/simple/test-http-client-response-domain.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ function test() {
6060
res.on('end', function() {
6161
res.emit('error', new Error('should be caught by domain'));
6262
});
63+
res.resume();
6364
});
6465
req.end();
6566
}

test/simple/test-http-contentLength0.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ s.listen(common.PORT, function() {
3636
var request = http.request({ port: common.PORT }, function(response) {
3737
console.log('STATUS: ' + response.statusCode);
3838
s.close();
39+
response.resume();
3940
});
4041

4142
request.end();

test/simple/test-http-head-request.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ server.listen(common.PORT, function() {
4747
common.error('response end');
4848
gotEnd = true;
4949
});
50+
response.resume();
5051
});
5152
request.end();
5253
});

test/simple/test-http-head-response-has-no-body-end.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ server.on('listening', function() {
5151
server.close();
5252
responseComplete = true;
5353
});
54+
res.resume();
5455
});
5556
common.error('req');
5657
req.end();

test/simple/test-http-head-response-has-no-body.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ server.on('listening', function() {
4848
server.close();
4949
responseComplete = true;
5050
});
51+
res.resume();
5152
});
5253
common.error('req');
5354
req.end();

test/simple/test-http-legacy.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ var server = http.createServer(function(req, res) {
6161
res.end();
6262
responses_sent += 1;
6363
});
64+
req.resume();
6465

6566
//assert.equal('127.0.0.1', res.connection.remoteAddress);
6667
});

test/simple/test-http-max-headers-count.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ server.listen(common.PORT, function() {
7575
server.close();
7676
}
7777
});
78+
res.resume();
7879
});
7980
req.maxHeadersCount = max;
8081
req.end();

0 commit comments

Comments
 (0)