diff --git a/lib/http.js b/lib/http.js index 0376f3dc..a96b4c2c 100644 --- a/lib/http.js +++ b/lib/http.js @@ -234,6 +234,7 @@ function IncomingMessage(stream) { this.httpVersion = '2.0'; this.httpVersionMajor = 2; this.httpVersionMinor = 0; + this._closed = false; // * `this.headers` will store the regular headers (and none of the special colon headers) this.headers = {}; @@ -243,6 +244,17 @@ function IncomingMessage(stream) { // * Other metadata is filled in when the headers arrive. stream.once('headers', this._onHeaders.bind(this)); stream.once('end', this._onEnd.bind(this)); + + // * Forwarding error and close event from the stream + stream.once('error', function() { + this._closed = true; + this.emit('error'); + }.bind(this)); + + stream.once('close', function() { + this._closed = true; + this.emit('close'); + }.bind(this)); } IncomingMessage.prototype = Object.create(PassThrough.prototype, { constructor: { value: IncomingMessage } }); @@ -423,6 +435,7 @@ function Server(options) { } }); this._server.on('request', this.emit.bind(this, 'request')); + this._server.on('listening', this.emit.bind(this, 'listening')); } // HTTP2 over plain TCP @@ -645,13 +658,25 @@ function OutgoingResponse(stream) { this.stream = stream; this.statusCode = 200; this.sendDate = true; + this._closed = false; this.stream.once('headers', this._onRequestHeaders.bind(this)); + + // * Forwarding error and close event from the stream + this.stream.on('error', function(err) { + this._closed = true; + this.emit('error', err); + }.bind(this)); + + this.stream.on('close', function() { + this._closed = true; + this.emit('close'); + }.bind(this)); } OutgoingResponse.prototype = Object.create(OutgoingMessage.prototype, { constructor: { value: OutgoingResponse } }); OutgoingResponse.prototype.writeHead = function writeHead(statusCode, reasonPhrase, headers) { - if (this.headersSent) { + if (this.headersSent || this._closed) { return; } @@ -685,11 +710,19 @@ OutgoingResponse.prototype._implicitHeaders = function _implicitHeaders() { }; OutgoingResponse.prototype.write = function write() { + if (this._closed) { + return; + } + this._implicitHeaders(); return OutgoingMessage.prototype.write.apply(this, arguments); }; OutgoingResponse.prototype.end = function end() { + if (this._closed) { + return; + } + this._implicitHeaders(); return OutgoingMessage.prototype.end.apply(this, arguments); }; @@ -699,6 +732,10 @@ OutgoingResponse.prototype._onRequestHeaders = function _onRequestHeaders(header }; OutgoingResponse.prototype.push = function push(options) { + if (this._closed) { + return null; + } + if (typeof options === 'string') { options = url.parse(options); } @@ -727,6 +764,11 @@ OutgoingResponse.prototype.altsvc = function altsvc(host, port, protocolID, maxA if (origin === undefined) { origin = ""; } + + if (this._closed && origin === "") { + return; + } + this.stream.altsvc(host, port, protocolID, maxAge, origin); }; @@ -996,6 +1038,10 @@ OutgoingRequest.prototype._start = function _start(stream, options) { response.once('ready', this.emit.bind(this, 'response', response)); this.stream.on('promise', this._onPromise.bind(this)); + + // * Forwarding error and close event from the stream + this.stream.on('error', this.emit.bind(this, 'error')); + this.stream.on('close', this.emit.bind(this, 'close')); }; OutgoingRequest.prototype._fallback = function _fallback(request) { diff --git a/lib/protocol/connection.js b/lib/protocol/connection.js index be14b331..0f72a807 100644 --- a/lib/protocol/connection.js +++ b/lib/protocol/connection.js @@ -18,19 +18,22 @@ exports.Connection = Connection; // // * **Event: 'error' (type)**: signals a connection level error made by the other end // -// * **Event: 'peerError' (type)**: signals the receipt of a GOAWAY frame that contains an error -// code other than NO_ERROR +// * **Event: 'goaway' ([type])**: signals the receipt of a GOAWAY frame with an optional error code // // * **Event: 'stream' (stream)**: signals that there's an incoming stream // -// * **createStream(): stream**: initiate a new stream +// * **createStream(): stream**: initiate a new stream (or return null if it's not possible) // // * **set(settings, callback)**: change the value of one or more settings according to the // key-value pairs of `settings`. The callback is called after the peer acknowledged the changes. // // * **ping([callback])**: send a ping and call callback when the answer arrives // -// * **close([error])**: close the stream with an error code +// * **goaway([error])**: forbids creation of new stream for the remote endpoint and possibly +// indicates an error +// +// * **close([error])**: send GOAWAY, and then close the underlying stream when there's no more +// active stream // Constructor // ----------- @@ -96,6 +99,8 @@ Connection.prototype = Object.create(Flow.prototype, { constructor: { value: Con var Stream = require('./stream').Stream; +var MAX_STREAM_ID = require('./framer').MAX_STREAM_ID; + // Initialization: Connection.prototype._initializeStreamManagement = function _initializeStreamManagement(firstStreamId) { // * streams are stored in two data structures: @@ -109,13 +114,17 @@ Connection.prototype._initializeStreamManagement = function _initializeStreamMan this._lastIncomingStream = 0; // * Calling `_writeControlFrame` when there's an incoming stream with 0 as stream ID - this._streamIds[0] = { upstream: { write: this._writeControlFrame.bind(this) } }; + this._streamIds[0] = { upstream: { + write: this._writeControlFrame.bind(this), + end: function noop() {} + } }; // * By default, the number of concurrent outbound streams is not limited. The `_streamLimit` can // be set by the SETTINGS_MAX_CONCURRENT_STREAMS setting. this._streamSlotsFree = Infinity; this._streamLimit = Infinity; this.on('RECEIVING_SETTINGS_MAX_CONCURRENT_STREAMS', this._updateStreamLimit); + this.on('stream_count_change', this._onStreamCountChange) }; // `_writeControlFrame` is called when there's an incoming frame in the `_control` stream. It @@ -142,7 +151,7 @@ Connection.prototype._updateStreamLimit = function _updateStreamLimit(newStreamL } }; -Connection.prototype._changeStreamCount = function _changeStreamCount(change) { +Connection.prototype._onStreamCountChange = function _onStreamCountChange(change) { if (change) { this._log.trace({ free: this._streamSlotsFree, change: change }, 'Changing active stream count.'); @@ -163,10 +172,20 @@ Connection.prototype._changeStreamCount = function _changeStreamCount(change) { // Allocating an ID to a stream Connection.prototype._allocateId = function _allocateId(stream, id) { + // * Can't allocate a new ID if the remote end doesn't want new incoming streams, or we reached + // the maximum stream ID + if ((id === undefined) && (this._goneawayRemote)) { + return undefined + } + // * initiated stream without definite ID if (id === undefined) { id = this._nextStreamId; this._nextStreamId += 2; + if (this._nextStreamId > MAX_STREAM_ID) { + this._log.trace('Reached maxmimum outbound stream ID. Going into closed_remote state.'); + this._goneawayRemote = true; + } } // * incoming stream with a legitim ID (larger than any previous and different parity than ours) @@ -231,7 +250,9 @@ Connection.prototype._createIncomingStream = function _createIncomingStream(id) this._log.debug({ stream_id: id }, 'New incoming stream.'); var stream = new Stream(this._log); - this._allocateId(stream, id); + if (!this._allocateId(stream, id)) { + return null; + } this._allocatePriority(stream); this.emit('stream', stream, id); @@ -240,6 +261,11 @@ Connection.prototype._createIncomingStream = function _createIncomingStream(id) // Creating an *outbound* stream Connection.prototype.createStream = function createStream() { + if (this._goneawayRemote) { + this._log.trace('Trying to create outbound stream after the other end closed the connection'); + return null; + } + this._log.trace('Creating new outbound stream.'); // * Receiving is enabled immediately, and an ID gets assigned to the stream @@ -254,6 +280,7 @@ Connection.prototype.createStream = function createStream() { Connection.prototype._initializeMultiplexing = function _initializeMultiplexing() { this.on('window_update', this.emit.bind(this, 'wakeup')); + this.on('finish', this._onFinish); this._sendScheduled = false; this._firstFrameReceived = false; }; @@ -261,11 +288,6 @@ Connection.prototype._initializeMultiplexing = function _initializeMultiplexing( // The `_send` method is a virtual method of the [Flow class](flow.html) that has to be implemented // by child classes. It reads frames from streams and pushes them to the output buffer. Connection.prototype._send = function _send(immediate) { - // * Do not do anything if the connection is already closed - if (this._closed) { - return; - } - // * Collapsing multiple calls in a turn into a single deferred call if (immediate) { this._sendScheduled = false; @@ -312,6 +334,17 @@ priority_loop: if (frame.stream === undefined) { frame.stream = stream.id || this._allocateId(stream); + if (frame.stream === undefined) { + this._log.trace({ s: stream, frame: frame }, 'Unsuccessful ID allocation, ' + + 'terminating stream with REFUSED_STREAM.'); + stream.write({ + type: 'RST_STREAM', + flags: {}, + stream: undefined, + error: 'REFUSED_STREAM' + }); + continue; + } } if (frame.type === 'PUSH_PROMISE') { @@ -321,7 +354,7 @@ priority_loop: this._log.trace({ s: stream, frame: frame }, 'Forwarding outgoing frame'); var moreNeeded = this.push(frame); - this._changeStreamCount(frame.count_change); + this.emit('stream_count_change', frame.count_change); assert(moreNeeded !== null); // The frame shouldn't be unforwarded if (moreNeeded === false) { @@ -345,7 +378,14 @@ priority_loop: // The `_receive` method is another virtual method of the [Flow class](flow.html) that has to be // implemented by child classes. It forwards the given frame to the appropriate stream: Connection.prototype._receive = function _receive(frame, done) { - this._log.trace({ frame: frame }, 'Forwarding incoming frame'); + this._log.trace({ frame: frame }, 'Incoming frame'); + + // * ignoring incoming frames that would initiate new stream if the connection is being closed + if (this._goneawayLocal && !(frame.stream in this._streamIds)) { + this._log.trace({ frame: frame }, 'Ignoring incoming frame for a new stream since the ' + + 'connection is being closed by us'); + return; + } // * first frame needs to be checked by the `_onFirstFrameReceived` method if (!this._firstFrameReceived) { @@ -359,14 +399,20 @@ Connection.prototype._receive = function _receive(frame, done) { // * or creates one if it's not in `this.streams` if (!stream) { stream = this._createIncomingStream(frame.stream); + if (!stream) { + return done(); + } } // * in case of PUSH_PROMISE, replaces the promised stream id with a new incoming stream if (frame.type === 'PUSH_PROMISE') { frame.promised_stream = this._createIncomingStream(frame.promised_stream); + if (!frame.promised_stream) { + return done(); + } } - frame.count_change = this._changeStreamCount.bind(this); + frame.count_change = this.emit.bind(this, 'stream_count_change'); // * and writes it to the `stream`'s `upstream` stream.upstream.write(frame); @@ -477,7 +523,12 @@ Connection.prototype._initializeLifecycleManagement = function _initializeLifecy this._pings = {}; this.on('PING', this._receivePing); this.on('GOAWAY', this._receiveGoaway); + this._goneawayLocal = false; + this._goneawayRemote = false; + this._closing = false; this._closed = false; + this._activeStreamCount = 0; + this.on('stream_count_change', this._onStreamCountChange2); }; // Generating a string of length 16 with random hexadecimal digits @@ -536,14 +587,11 @@ Connection.prototype._receivePing = function _receivePing(frame) { } }; -// Terminating the connection -Connection.prototype.close = function close(error) { - if (this._closed) { - this._log.warn('Trying to close an already closed connection'); - return; - } - - this._log.debug({ error: error }, 'Closing the connection'); +// Sending GOAWAY, which instructs the other side not to initiate new stream anymore. This can +// be called multiple times if a new error comes up. +Connection.prototype.goaway = function goaway(error) { + this._log.debug({ error: error }, 'Sending GOAWAY'); + this._goneawayLocal = true; this.push({ type: 'GOAWAY', flags: {}, @@ -551,16 +599,61 @@ Connection.prototype.close = function close(error) { last_stream: this._lastIncomingStream, error: error || 'NO_ERROR' }); - this.push(null); - this._closed = true; }; +// Receiving GOAWAY forbids initiating new streams Connection.prototype._receiveGoaway = function _receiveGoaway(frame) { - this._log.debug({ error: frame.error }, 'Other end closed the connection'); - this.push(null); + this._log.debug({ error: frame.error }, 'Received GOAWAY'); + this._goneawayRemote = true; + + // * Frames that are were in flight when the remote endpoint sent the GOAWAY must be terminated. + // We simulate this by creating a syntetic RST_STREAM with REFUSED_STREAM error code that + // indicates that the stream was not processed by the other end at all. + for (var id in this._streamIds) { + if (id > frame.last_stream) { + this.write({ + type: 'RST_STREAM', + flags: {}, + stream: id, + error: 'REFUSED_STREAM' + }); + } + } + + // * Emitting en event to inform the user about the GOAWAY frame + this.emit('goaway', (frame.error === 'NO_ERROR') ? undefined : frame.error); +}; + +// If the other end closes the TCP connection, this means: +// * It will never send a frame again so we can call `end()` on every stream (the finish event is +// called when everything is flushed to the streams so it is safe to call `end()`) +// * If there are streams that expected more frames, then they will handle this by emitting error +// * Generally, HTTP2 is unusable with a half closed underlying stream, so we close too +Connection.prototype._onFinish = function _onFinish() { + for (var id in this._streamIds) { + this._streamIds[id].upstream.end(); + } this._closed = true; - if (frame.error !== 'NO_ERROR') { - this.emit('peerError', frame.error); + this.push(null); +}; + +// Sending a GOAWAY, and closing the connection when there's no more connection +Connection.prototype.close = function close(error) { + if (!this._closing && !this._closed) { + this.goaway(error); + this._closing = true; + this._onStreamCountChange2(0); + } +}; + +// Counting the active streams +Connection.prototype._onStreamCountChange2 = function _onStreamCountChange2(change) { + this._activeStreamCount += change; + if (this._closing && (this._activeStreamCount === 0) && !this._sendScheduled) { + this._log.debug({}, 'No more active streams, closing underlying connection stream.'); + this._closing = false; + this._closed = true; + this.push(null); } }; diff --git a/lib/protocol/endpoint.js b/lib/protocol/endpoint.js index a218db04..2a9269c2 100644 --- a/lib/protocol/endpoint.js +++ b/lib/protocol/endpoint.js @@ -41,7 +41,8 @@ exports.Endpoint = Endpoint; // // * **createStream(): Stream**: initiate a new stream (forwarded to the underlying Connection) // -// * **close([error])**: close the connection with an error code +// * **close([error])**: send GOAWAY, and then close the underlying stream when there's no more +// active stream // Constructor // ----------- @@ -194,6 +195,9 @@ Endpoint.prototype._initializeDataFlow = function _initializeDataFlow(role, sett this._decompressor.setTableSizeLimit.bind(this._decompressor)); this._connection.on('RECEIVING_SETTINGS_HEADER_TABLE_SIZE', this._compressor.setTableSizeLimit.bind(this._compressor)); + + this.on('finish', this._onFinish); + this._serializer.on('end', this._onEnd.bind(this)); }; var noread = {}; @@ -213,6 +217,14 @@ Endpoint.prototype._write = function _write(chunk, encoding, done) { this._deserializer.write(chunk, encoding, done); }; +Endpoint.prototype._onFinish = function _onFinish() { + this._deserializer.end(); +}; + +Endpoint.prototype._onEnd = function _onFinish() { + this.push(null); +}; + // Management // -------------- @@ -247,6 +259,10 @@ Endpoint.prototype.close = function close(error) { this._connection.close(error); }; +Endpoint.prototype.goaway = function close(error) { + this._connection.goaway(error); +}; + // Bunyan serializers // ------------------ diff --git a/lib/protocol/framer.js b/lib/protocol/framer.js index 2ac98ab0..455f4d8b 100644 --- a/lib/protocol/framer.js +++ b/lib/protocol/framer.js @@ -13,6 +13,7 @@ var logData = Boolean(process.env.HTTP2_LOG_DATA); var MAX_PAYLOAD_SIZE = 16384; var WINDOW_UPDATE_PAYLOAD_SIZE = 4; +exports.MAX_STREAM_ID = Math.pow(2, 31) - 1; // Serializer // ---------- diff --git a/lib/protocol/stream.js b/lib/protocol/stream.js index a174af20..6ec920cc 100644 --- a/lib/protocol/stream.js +++ b/lib/protocol/stream.js @@ -148,7 +148,7 @@ Stream.prototype.reset = function reset(error) { type: 'RST_STREAM', flags: {}, stream: this.id, - error: error + error: error || 'NO_ERROR' }); } }; @@ -219,6 +219,7 @@ Stream.prototype._initializeDataFlow = function _initializeDataFlow() { this.upstream._receive = this._receive.bind(this); this.upstream.write = this._writeUpstream.bind(this); this.upstream.on('error', this.emit.bind(this, 'error')); + this.upstream.on('finish', this._onUpstreamFinish.bind(this)); this.on('finish', this._finishing); }; @@ -350,6 +351,17 @@ Stream.prototype._finishing = function _finishing() { } }; +// If the other end signals that it will never send a frame again (e.g. half-closing TCP): +// * If we expected more frames then emit error (in practice, this almost always means +// that the other end won't receive our frames so emit error except in CLOSED state) +// * Signal that there will be no more frames +Stream.prototype._onUpstreamFinish = function _onUpstreamFinish() { + if (this.state !== 'CLOSED') { + this.emit('close'); + } + this.push(null); +}; + // [Stream States](http://tools.ietf.org/html/draft-ietf-httpbis-http2-14#section-5.1) // ---------------- // @@ -595,6 +607,12 @@ Stream.prototype._transition = function transition(sending, frame) { frame.promised_stream._initiated = sending; } + // Receiving and RST_STREAM that closes the stream + if (receiving && RST_STREAM && (this.state == 'CLOSED') && (previousState !== 'CLOSED')) { + this.emit('error', frame.error); + this.push(null); + } + // Signaling how sending/receiving this frame changes the active stream count (-1, 0 or +1) if (this._initiated) { var change = (activeState(this.state) - activeState(previousState)); diff --git a/test/connection.js b/test/connection.js index cc544162..4342d4f3 100644 --- a/test/connection.js +++ b/test/connection.js @@ -82,9 +82,9 @@ describe('connection.js', function() { describe('test scenario', function() { var c, s; beforeEach(function() { - c = new Connection(util.log.child({ role: 'client' }), 1, settings); - s = new Connection(util.log.child({ role: 'client' }), 2, settings); - c.pipe(s).pipe(c); + c = new Connection(util.clientLog, 1, settings); + s = new Connection(util.serverLog, 2, settings); + c.pipe(new util.CloneStream()).pipe(s).pipe(new util.CloneStream()).pipe(c); }); describe('connection setup', function() { @@ -224,13 +224,61 @@ describe('connection.js', function() { }); }); }); - describe('closing the connection on one end', function() { - it('should result in closed streams on both ends', function(done) { - done = util.callNTimes(2, done); - c.on('end', done); - s.on('end', done); + describe('receiving GOAWAY', function() { + it('should make creating new streams impossible', function() { + // Before and after the server sends GOAWAY + expect(c.createStream()).to.not.equal(null); + s.goaway(); + expect(c.createStream()).to.equal(null); + // Before and after the client sends GOAWAY + expect(s.createStream()).to.not.equal(null); + c.goaway(); + expect(s.createStream()).to.equal(null); + }); + }); + describe('making a request and then closing the connection', function() { + it('should gracefully end the connection', function(done) { + // Request and response data + var request_headers = { + ':method': 'GET', + ':path': '/' + }; + var request_data = new Buffer(0); + var response_headers = { + ':status': '200' + }; + var response_data = new Buffer('12345678', 'hex'); + + // Setting up server + s.on('stream', function(server_stream) { + server_stream.on('headers', function(headers) { + expect(headers).to.deep.equal(request_headers); + server_stream.headers(response_headers); + server_stream.end(response_data); + }); + }); + + // Sending request + var client_stream = c.createStream(); + client_stream.headers(request_headers); + client_stream.end(request_data); + + // Closing the connection c.close(); + + // Waiting for answer and then for the TCP connection to end gracefully + done = util.callNTimes(4, done); + client_stream.on('headers', function(headers) { + expect(headers).to.deep.equal(response_headers); + done(); + }); + client_stream.on('data', function(data) { + expect(data).to.deep.equal(response_data); + done(); + }); + c.on('finish', done); + s.on('finish', done); }); }); }); diff --git a/test/http.js b/test/http.js index f1e3b238..773adfbc 100644 --- a/test/http.js +++ b/test/http.js @@ -120,7 +120,7 @@ describe('http.js', function() { } else { called = true; } - }, once: util.noop }; + }, once: util.noop, on: util.noop }; var response = new http2.OutgoingResponse(stream); response.writeHead(200); @@ -434,5 +434,45 @@ describe('http.js', function() { }); }); }); + describe('closing client connection socket', function() { + it('should close streams on server', function(done) { + var server = http2.createServer(options, function(request, response) { + response.once('close', done); + response.writeHead(200); + }); + + var agent = new http2.Agent(); + server.listen(1243, function() { + http2.get({ + scheme: 'https:', + host: 'localhost', + port: 1243, + path: '/foo', + agent: agent + }, function(response) { + agent.endpoints['false:localhost:1243'].socket.destroy(); + }); + }); + }); + it('should close streams on client', function(done) { + var server = http2.createServer(options, function(request, response) { + response.writeHead(200); + }); + + var agent = new http2.Agent(); + server.listen(1244, function() { + http2.get({ + scheme: 'https:', + host: 'localhost', + port: 1244, + path: '/foo', + agent: agent + }, function(response) { + response.once('close', done); + agent.endpoints['false:localhost:1244'].socket.destroy(); + }); + }); + }); + }); }); }); diff --git a/test/stream.js b/test/stream.js index 00ece8ca..27dc7c14 100644 --- a/test/stream.js +++ b/test/stream.js @@ -232,8 +232,11 @@ describe('stream.js', function() { [true, false].forEach(function(sending) { var stream = createStream(); stream.state = state; + var errorEmitted = false; + stream.on('error', function() { errorEmitted = true; }); stream._transition(sending, { type: 'RST_STREAM', flags: {} }); expect(stream.state).to.be.equal('CLOSED'); + expect(sending ^ errorEmitted).to.equal(1); }); }); }); diff --git a/test/util.js b/test/util.js index aea8fe70..37b9ad13 100644 --- a/test/util.js +++ b/test/util.js @@ -1,6 +1,7 @@ var path = require('path'); var fs = require('fs'); var spawn = require('child_process').spawn; +var Transform = require('stream').Transform; function noop() {} exports.noop = noop; @@ -87,3 +88,19 @@ exports.shuffleBuffers = function shuffleBuffers(buffers) { return output; } + +function CloneStream() { + Transform.call(this, { objectMode: true }); +} +CloneStream.prototype = Object.create(Transform.prototype); + +CloneStream.prototype._transform = function(object, encoding, done) { + var clone = {}; + for (var name in object) { + clone[name] = object[name]; + } + this.push(clone); + done(); +}; + +exports.CloneStream = CloneStream;