-
-
Notifications
You must be signed in to change notification settings - Fork 35.4k
http2: add session tracking and graceful server shutdown of http2 server #57586
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
bb71a11
7a852db
6bc96f1
dd751ae
be55b6d
8ee2e3e
90acf08
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
This change adds proper tracking of HTTP / 2 server sessions to ensure they are gracefully closed when the server is shut down.It implements: - A new kSessions symbol for tracking active sessions - Adding/removing sessions from a SafeSet in the server - A closeAllSessions helper function to close active sessions - Updates to Http2Server and Http2SecureServer close methods Breaking Change: any client trying to create new requests on existing connections will not be able to do so once server close is initiated Refs: https://datatracker.ietf.org/doc/html/rfc7540\#section-9.1 Refs: https://nodejs.org/api/http.html\#serverclosecallback
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -251,6 +251,7 @@ const kServer = Symbol('server'); | |
| const kState = Symbol('state'); | ||
| const kType = Symbol('type'); | ||
| const kWriteGeneric = Symbol('write-generic'); | ||
| const kSessions = Symbol('sessions'); | ||
|
|
||
| const { | ||
| kBitfield, | ||
|
|
@@ -1125,9 +1126,13 @@ function emitClose(self, error) { | |
| function cleanupSession(session) { | ||
| const socket = session[kSocket]; | ||
| const handle = session[kHandle]; | ||
| const server = session[kServer]; | ||
| session[kProxySocket] = undefined; | ||
| session[kSocket] = undefined; | ||
| session[kHandle] = undefined; | ||
| if (server) { | ||
| server[kSessions].delete(session); | ||
| } | ||
| session[kNativeFields] = trackAssignmentsTypedArray( | ||
| new Uint8Array(kSessionUint8FieldCount)); | ||
| if (handle) | ||
|
|
@@ -1644,6 +1649,9 @@ class ServerHttp2Session extends Http2Session { | |
| constructor(options, socket, server) { | ||
| super(NGHTTP2_SESSION_SERVER, options, socket); | ||
| this[kServer] = server; | ||
| if (server) { | ||
| server[kSessions].add(this); | ||
| } | ||
| // This is a bit inaccurate because it does not reflect changes to | ||
| // number of listeners made after the session was created. This should | ||
| // not be an issue in practice. Additionally, the 'priority' event on | ||
|
|
@@ -3168,11 +3176,25 @@ function onErrorSecureServerSession(err, socket) { | |
| socket.destroy(err); | ||
| } | ||
|
|
||
| /** | ||
| * This function closes all active sessions gracefully. | ||
| * @param {*} server the underlying server whose sessions to be closed | ||
| */ | ||
| function closeAllSessions(server) { | ||
| const sessions = server[kSessions]; | ||
| if (sessions.size > 0) { | ||
| sessions.forEach((session) => { | ||
| session.close(); | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| class Http2SecureServer extends TLSServer { | ||
| constructor(options, requestListener) { | ||
| options = initializeTLSOptions(options); | ||
| super(options, connectionListener); | ||
| this[kOptions] = options; | ||
| this[kSessions] = new SafeSet(); | ||
| this.timeout = 0; | ||
| this.on('newListener', setupCompat); | ||
| if (options.allowHTTP1 === true) { | ||
|
|
@@ -3205,6 +3227,7 @@ class Http2SecureServer extends TLSServer { | |
| if (this[kOptions].allowHTTP1 === true) { | ||
| httpServerPreClose(this); | ||
| } | ||
| closeAllSessions(this); | ||
| ReflectApply(TLSServer.prototype.close, this, arguments); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same race condition as below. We might also need to do this before
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey @pimterry could you please review again |
||
| } | ||
|
|
||
|
|
@@ -3220,6 +3243,7 @@ class Http2Server extends NETServer { | |
| options = initializeOptions(options); | ||
| super(options, connectionListener); | ||
| this[kOptions] = options; | ||
| this[kSessions] = new SafeSet(); | ||
| this.timeout = 0; | ||
| this.on('newListener', setupCompat); | ||
| if (typeof requestListener === 'function') | ||
|
|
@@ -3241,6 +3265,11 @@ class Http2Server extends NETServer { | |
| this[kOptions].settings = { ...this[kOptions].settings, ...settings }; | ||
| } | ||
|
|
||
| close() { | ||
| closeAllSessions(this); | ||
| ReflectApply(NETServer.prototype.close, this, arguments); | ||
| } | ||
|
|
||
| async [SymbolAsyncDispose]() { | ||
| return promisify(super.close).call(this); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,217 @@ | ||
| 'use strict'; | ||
| const common = require('../common'); | ||
| if (!common.hasCrypto) { common.skip('missing crypto'); }; | ||
| const fixtures = require('../common/fixtures'); | ||
| // This test ensure that the server will not accept any new request | ||
| // after server close is called. | ||
| const assert = require('assert'); | ||
| const http2 = require('http2'); | ||
|
|
||
| const { test } = require('node:test'); | ||
|
|
||
| /** | ||
| * Create and manage an HTTP/2 client stream with controlled write patterns | ||
| * @param {http2.ClientHttp2Session} client - The HTTP/2 client session | ||
| * @param {string} clientId - Identifier for the client (e.g., '1', '2') | ||
| * @param {number} writeCount - Number of writes to perform | ||
| * @param {number} writeInterval - Interval between writes in ms | ||
| * @returns {object} - Object containing stream, status tracking, and functions | ||
| */ | ||
| function createClientStream(client, clientId, writeCount, writeInterval = 100) { | ||
| let currentWriteCount = 0; | ||
| let intervalId = null; | ||
| let streamClosed = false; | ||
|
|
||
| // Create the request | ||
| const req = client.request({ | ||
| ':path': `/client${clientId}`, | ||
| ':method': 'POST', | ||
| 'client-id': clientId, | ||
| 'content-type': 'text/plain' | ||
| }); | ||
|
|
||
| // Set up event handlers | ||
| req.on('response', (_) => {}); | ||
|
|
||
| req.on('data', (_) => {}); | ||
|
|
||
| req.on('end', () => { | ||
| streamClosed = true; | ||
| }); | ||
|
|
||
| req.on('close', () => { | ||
| streamClosed = true; | ||
| if (intervalId) { | ||
| clearInterval(intervalId); | ||
| intervalId = null; | ||
| } | ||
| }); | ||
|
|
||
| req.on('error', (err) => { | ||
| if (intervalId) { | ||
| clearInterval(intervalId); | ||
| intervalId = null; | ||
| } | ||
| }); | ||
|
|
||
| // Start the write interval | ||
| intervalId = setInterval(() => { | ||
| currentWriteCount++; | ||
| if (currentWriteCount > writeCount) { | ||
| if (intervalId) { | ||
| clearInterval(intervalId); | ||
| intervalId = null; | ||
| } | ||
| req.close(); | ||
| return; | ||
| } | ||
|
|
||
| req.write(`Client ${clientId} write #${currentWriteCount}\n`); | ||
| }, writeInterval); | ||
|
|
||
| // Return object with stream, status tracking, and cleanup function | ||
| return { | ||
| stream: req, | ||
| getWriteCount: () => currentWriteCount, | ||
| isActive: () => !streamClosed && !req.destroyed && !req.closed, | ||
| }; | ||
| } | ||
|
|
||
| // This test start a server and create a client. Client open a request and | ||
| // send 20 writes at interval of 100ms and then close at 2000ms from server start. | ||
| // Server close is fired after 1000ms from server start. | ||
| // Same client open another request after 1500ms from server start and tries to | ||
| // send 10 writes at interval of 100ms but failed to connect as server close is already fired at 1000ms. | ||
| // Request 1 from client is gracefully closed after accepting all 20 writes as it started before server close fired. | ||
| // server successfully closes gracefully after receiving all 20 writes from client and also server refused to accept any new request. | ||
| test('HTTP/2 server close with existing and new requests', async () => { | ||
|
|
||
| // Server setup | ||
| const server = http2.createSecureServer({ | ||
| key: fixtures.readKey('agent1-key.pem'), | ||
| cert: fixtures.readKey('agent1-cert.pem') | ||
| }); | ||
|
|
||
| // Track server events | ||
| let serverStart = 0; | ||
| let serverCloseTime = 0; | ||
| let requestsReceived = 0; | ||
| let writesReceived = 0; | ||
| let req1Complete = false; | ||
| let req2Error = null; | ||
|
|
||
| // Handle streams on the server | ||
| server.on('stream', (stream, headers) => { | ||
| requestsReceived++; | ||
|
|
||
| stream.respond({ | ||
| ':status': 200, | ||
| 'content-type': 'text/plain' | ||
| }); | ||
|
|
||
| // Count writes from clients | ||
| stream.on('data', (chunk) => { | ||
| writesReceived++; | ||
| stream.write(`Echo: ${chunk.toString().trim()}`); | ||
| }); | ||
|
|
||
| stream.on('end', () => { | ||
| stream.end('Server: Stream closed'); | ||
| }); | ||
| }); | ||
|
|
||
| // Start the server | ||
| await new Promise((resolve) => server.listen(0, () => { | ||
| serverStart = Date.now(); | ||
| resolve(); | ||
| })); | ||
| const port = server.address().port; | ||
|
|
||
| // Create client | ||
| const client = http2.connect(`https://localhost:${port}`, { | ||
| rejectUnauthorized: false | ||
| }); | ||
|
|
||
| // Create first request that will start immediately and write 20 times eache write at interval of 100ms | ||
| // The request will be closed at 2000ms after 20 writes | ||
| const request1 = createClientStream(client, '1', 20, 100); | ||
|
|
||
| // wait 1000ms before closing the server | ||
| await new Promise((resolve) => setTimeout(resolve, common.platformTimeout(1000))); | ||
|
|
||
| // close the server | ||
| await new Promise((resolve) => { | ||
| server.close(() => { | ||
| serverCloseTime = Date.now(); | ||
| resolve(); | ||
| }); | ||
| }); | ||
|
|
||
| // Wait 500ms before creating the second request | ||
| await new Promise((resolve) => setTimeout(resolve, common.platformTimeout(500))); | ||
|
|
||
| // Try to create the second request after 1500ms of server start - should fail | ||
| try { | ||
| const request2 = createClientStream(client, '2', 10, 100); | ||
| // If we get here without error, wait to see if an error event happens | ||
| request2.stream.on('error', (err) => { | ||
| req2Error = err; | ||
| }); | ||
|
|
||
| } catch (err) { | ||
| // Should fail synchronously with ERR_HTTP2_INVALID_SESSION | ||
| req2Error = err; | ||
| } | ||
|
|
||
| // Wait for request 1 to complete gracefully (should be around 2000ms) | ||
| await new Promise((resolve) => { | ||
| const checkComplete = () => { | ||
| if (!request1.isActive()) { | ||
| req1Complete = true; | ||
| resolve(); | ||
| } else { | ||
| // Check again in 100ms | ||
| setTimeout(checkComplete, common.platformTimeout(100)); | ||
| } | ||
| }; | ||
|
|
||
| // Set a timeout to prevent hanging if request never completes | ||
| setTimeout(() => { | ||
| resolve(); | ||
| }, common.platformTimeout(1500)); | ||
|
|
||
| checkComplete(); | ||
| }); | ||
|
|
||
| // Ensure client is closed | ||
| client.close(); | ||
|
|
||
| // Wait for cleanup | ||
| await new Promise((resolve) => setTimeout(resolve, common.platformTimeout(200))); | ||
|
|
||
| // Verify test expectations | ||
|
|
||
| // Request 1 should have completed | ||
| assert.ok(req1Complete, 'Request 1 should complete gracefully'); | ||
| assert.ok(request1.getWriteCount() > 0, 'Request 1 should have written data'); | ||
| // Request 1 should have written 20 times and request 2 written 0 times | ||
| assert.strictEqual(writesReceived, 20); | ||
|
|
||
| // Request 2 fails with ERR_HTTP2_INVALID_SESSION because the server | ||
| // fired close at 1000ms which stops accepting any new request. | ||
| // Since Request 2 starts at 1500ms, it fails. | ||
| assert.ok(req2Error, 'Request 2 should have an error'); | ||
| // Request 2 should fail with ERR_HTTP2_INVALID_SESSION | ||
| assert.strictEqual(req2Error.code, 'ERR_HTTP2_INVALID_SESSION'); | ||
|
|
||
| // Server should have received only the first request as 2nd request received after server close fired. | ||
| assert.strictEqual(requestsReceived, 1); | ||
| assert.ok( | ||
| serverCloseTime - serverStart >= 2000, | ||
| `Server should fully close after 2000ms of server start when all streams complete (actual: ${serverCloseTime - serverStart}ms)` | ||
| ); | ||
| assert.ok( | ||
| (serverCloseTime - serverStart) - 2000 < 200, | ||
| `Server should fully close just after all streams complete` | ||
| ); | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please use a
for...ofloop?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done