Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
http: invoke callback with ERR_STREAM_DESTROYED if the socket is dest…
…royed

Fixes: #36673
Refs: #29227 (comment)
  • Loading branch information
Lxxyx committed Jan 12, 2021
commit 2a0ecd6eb584ea33487a6f74632f9558d85d518e
129 changes: 54 additions & 75 deletions lib/_http_outgoing.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ const { Buffer } = require('buffer');
const common = require('_http_common');
const checkIsHttpToken = common._checkIsHttpToken;
const checkInvalidHeaderChar = common._checkInvalidHeaderChar;
const {
defaultTriggerAsyncIdScope,
symbols: { async_id_symbol }
} = require('internal/async_hooks');
const {
codes: {
ERR_HTTP_HEADERS_SENT,
Expand Down Expand Up @@ -341,17 +337,21 @@ OutgoingMessage.prototype._send = function _send(data, encoding, callback) {
OutgoingMessage.prototype._writeRaw = _writeRaw;
function _writeRaw(data, encoding, callback) {
const conn = this.socket;
if (conn && conn.destroyed) {
// The socket was destroyed. If we're still trying to write to it,
// then we haven't gotten the 'close' event yet.
return false;
}

if (typeof encoding === 'function') {
callback = encoding;
encoding = null;
}

if (conn?.destroyed) {
if (typeof callback === 'function') {
process.nextTick(callback, new ERR_STREAM_DESTROYED('write'));
}
// The socket was destroyed. If we're still trying to write to it,
// then we haven't gotten the 'close' event yet.
return false;
}

if (conn && conn._httpMessage === this && conn.writable) {
// There might be pending data in the this.output buffer.
if (this.outputData.length) {
Expand Down Expand Up @@ -689,23 +689,6 @@ OutgoingMessage.prototype.write = function write(chunk, encoding, callback) {
return ret;
};

function onError(msg, err, callback) {
const triggerAsyncId = msg.socket ? msg.socket[async_id_symbol] : undefined;
defaultTriggerAsyncIdScope(triggerAsyncId,
process.nextTick,
emitErrorNt,
msg,
err,
callback);
}

function emitErrorNt(msg, err, callback) {
callback(err);
if (typeof msg.emit === 'function' && !msg._closed) {
msg.emit('error', err);
}
}

function write_(msg, chunk, encoding, callback, fromEnd) {
if (typeof callback !== 'function')
callback = nop;
Expand All @@ -730,11 +713,8 @@ function write_(msg, chunk, encoding, callback, fromEnd) {
}

if (err) {
if (!msg.destroyed) {
onError(msg, err, callback);
} else {
process.nextTick(callback, err);
}
process.nextTick(callback, err);
msg.destroy(err);
return false;
}

Expand Down Expand Up @@ -804,62 +784,65 @@ OutgoingMessage.prototype.addTrailers = function addTrailers(headers) {
}
};

function onFinish(outmsg) {
if (outmsg && outmsg.socket && outmsg.socket._hadError) return;
outmsg.emit('finish');
function onFinish(err) {
if (err || this.socket?._hadError) return;
this.emit('finish');
}

OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
OutgoingMessage.prototype.end = function end(chunk, encoding, cb) {
if (typeof chunk === 'function') {
callback = chunk;
cb = chunk;
chunk = null;
encoding = null;
} else if (typeof encoding === 'function') {
callback = encoding;
cb = encoding;
encoding = null;
}

if (chunk) {
if (this.finished) {
onError(this,
new ERR_STREAM_WRITE_AFTER_END(),
typeof callback !== 'function' ? nop : callback);
return this;
}
if (this.socket) {
this.socket.cork();
}

if (this.socket) {
this.socket.cork();
}
if (chunk !== null && chunk !== undefined)
this.write(chunk, encoding);

write_(this, chunk, encoding, null, true);
} else if (this.finished) {
if (typeof callback === 'function') {
if (!this.writableFinished) {
this.on('finish', callback);
} else {
callback(new ERR_STREAM_ALREADY_FINISHED('end'));
}
let err;
if (!this.finished) {
if (!this._header) {
this._contentLength = 0;
this._implicitHeader();
}
return this;
} else if (!this._header) {
if (this.socket) {
this.socket.cork();

const finish = FunctionPrototypeBind(onFinish, this);

if (this._hasBody && this.chunkedEncoding) {
this._send('0\r\n' + this._trailer + '\r\n', 'latin1', finish);
} else {
// Force a flush, HACK.
this._send('', 'latin1', finish);
}

this._contentLength = 0;
this._implicitHeader();
this.finished = true; // aka. WritableState.ended
} else if (this.writableFinished) {
err = new ERR_STREAM_ALREADY_FINISHED('end');
} else if (this.destroyed) {
err = new ERR_STREAM_DESTROYED('end');
}

if (typeof callback === 'function')
this.once('finish', callback);

const finish = FunctionPrototypeBind(onFinish, undefined, this);
if (typeof cb === 'function') {
if (err || this.writableFinished) {
process.nextTick(cb, err);
} else {
// TODO (fix): What if error? See kOnFinished in writable.js.
this.once('finish', cb);
}
}

if (this._hasBody && this.chunkedEncoding) {
this._send('0\r\n' + this._trailer + '\r\n', 'latin1', finish);
} else {
// Force a flush, HACK.
this._send('', 'latin1', finish);
if (err) {
if (this.socket) {
this.socket.uncork();
}
return this;
}

if (this.socket) {
Expand All @@ -869,14 +852,10 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
}
this[kCorked] = 0;

this.finished = true;

// There is the first message on the outgoing queue, and we've sent
// everything to the socket.
debug('outgoing message end.');
if (this.outputData.length === 0 &&
this.socket &&
this.socket._httpMessage === this) {
if (this.outputData.length === 0 && this.socket?._httpMessage === this) {
this._finish();
}

Expand Down
61 changes: 61 additions & 0 deletions test/parallel/test-http-outgoing-socket-destroyed.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
'use strict';

const common = require('../common');
const { createServer, request } = require('http');

{
const server = createServer((req, res) => {
server.close();

req.socket.destroy();

res.write('hello', common.expectsError({
code: 'ERR_STREAM_DESTROYED'
}));
});

server.listen(0, common.mustCall(() => {
const req = request({
host: 'localhost',
port: server.address().port
});

req.on('response', common.mustNotCall());
req.on('error', common.expectsError({
code: 'ECONNRESET'
}));

req.end();
}));
}

{
const server = createServer((req, res) => {
res.write('hello');
req.resume();

const onError = common.expectsError({
code: 'ERR_STREAM_DESTROYED'
});

res.on('close', () => {
res.write('world', common.mustCall((err) => {
onError(err);
server.close();
}));
});
});

server.listen(0, common.mustCall(() => {
const req = request({
host: 'localhost',
port: server.address().port
});

req.on('response', common.mustCall((res) => {
res.socket.destroy();
}));

req.end();
}));
}