Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
http2: reduce duplication with _unrefActive
  • Loading branch information
jasnell committed Nov 22, 2017
commit 365209980da3c759b1424a6889a9cafed600d552
72 changes: 34 additions & 38 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ const kServer = Symbol('server');
const kSession = Symbol('session');
const kState = Symbol('state');
const kType = Symbol('type');
const kUpdateTimer = Symbol('update-timer');

const kDefaultSocketTimeout = 2 * 60 * 1000;
const kRenegTest = /TLS session renegotiation disabled for this socket/;
Expand Down Expand Up @@ -149,7 +150,7 @@ function emit(self, ...args) {
function onSessionHeaders(handle, id, cat, flags, headers) {
const owner = this[kOwner];
const type = owner[kType];
_unrefActive(owner);
owner[kUpdateTimer]();
debug(`Http2Stream ${id} [Http2Session ` +
`${sessionName(type)}]: headers received`);
const streams = owner[kState].streams;
Expand Down Expand Up @@ -229,8 +230,7 @@ function onStreamTrailers() {
// Readable and Writable sides of the Duplex.
function onStreamClose(code) {
const stream = this[kOwner];
_unrefActive(stream);
_unrefActive(stream[kSession]);
stream[kUpdateTimer]();
abort(stream);
const state = stream[kState];
state.rst = true;
Expand All @@ -249,16 +249,15 @@ function afterFDClose(err) {
// Called when an error event needs to be triggered
function onSessionError(error) {
const owner = this[kOwner];
_unrefActive(owner);
owner[kUpdateTimer]();
process.nextTick(emit, owner, 'error', error);
}

// Receives a chunk of data for a given stream and forwards it on
// to the Http2Stream Duplex for processing.
function onStreamRead(nread, buf, handle) {
const stream = handle[kOwner];
_unrefActive(stream);
_unrefActive(stream[kSession]);
stream[kUpdateTimer]();
if (nread >= 0 && !stream.destroyed) {
if (!stream.push(buf)) {
handle.readStop();
Expand All @@ -274,7 +273,7 @@ function onStreamRead(nread, buf, handle) {
function onSettings(ack) {
const owner = this[kOwner];
debug(`Http2Session ${sessionName(owner[kType])}: new settings received`);
_unrefActive(owner);
owner[kUpdateTimer]();
let event = 'remoteSettings';
if (ack) {
if (owner[kState].pendingAck > 0)
Expand All @@ -297,7 +296,7 @@ function onPriority(id, parent, weight, exclusive) {
debug(`Http2Stream ${id} [Http2Session ` +
`${sessionName(owner[kType])}]: priority [parent: ${parent}, ` +
`weight: ${weight}, exclusive: ${exclusive}]`);
_unrefActive(owner);
owner[kUpdateTimer]();
const streams = owner[kState].streams;
const stream = streams.get(id);
const emitter = stream === undefined ? owner : stream;
Expand All @@ -318,7 +317,7 @@ function onFrameError(id, type, code) {
const owner = this[kOwner];
debug(`Http2Session ${sessionName(owner[kType])}: error sending frame type ` +
`${type} on stream ${id}, code: ${code}`);
_unrefActive(owner);
owner[kUpdateTimer]();
const streams = owner[kState].streams;
const stream = streams.get(id);
const emitter = stream !== undefined ? stream : owner;
Expand Down Expand Up @@ -518,7 +517,7 @@ function setupHandle(session, socket, type, options) {
function submitSettings(settings) {
const type = this[kType];
debug(`Http2Session ${sessionName(type)}: submitting settings`);
_unrefActive(this);
this[kUpdateTimer]();
this[kLocalSettings] = undefined;
updateSettingsBuffer(settings);
this[kHandle].settings();
Expand All @@ -528,8 +527,7 @@ function submitSettings(settings) {
// Note: If the silent option is true, the change will be made
// locally with no PRIORITY frame sent.
function submitPriority(options) {
_unrefActive(this);
_unrefActive(this[kSession]);
this[kUpdateTimer]();

// If the parent is the id, do nothing because a
// stream cannot be made to depend on itself.
Expand All @@ -545,8 +543,7 @@ function submitPriority(options) {
// Submit an RST-STREAM frame to be sent to the remote peer.
// This will cause the Http2Stream to be closed.
function submitRstStream(code) {
_unrefActive(this);
_unrefActive(this[kSession]);
this[kUpdateTimer]();

const state = this[kState];
if (state.rst) return;
Expand Down Expand Up @@ -726,6 +723,10 @@ class Http2Session extends EventEmitter {
debug(`Http2Session ${sessionName(type)}: created`);
}

[kUpdateTimer]() {
_unrefActive(this);
}

setNextStreamID(id) {
if (typeof id !== 'number')
throw new errors.TypeError('ERR_INVALID_ARG_TYPE', 'id', 'number');
Expand Down Expand Up @@ -979,7 +980,7 @@ class Http2Session extends EventEmitter {
handle.chunksSentSinceLastWrite : null;
if (chunksSentSinceLastWrite !== null &&
chunksSentSinceLastWrite !== handle.updateChunksSent()) {
_unrefActive(this);
this[kUpdateTimer]();
return;
}
}
Expand Down Expand Up @@ -1012,9 +1013,7 @@ class ClientHttp2Session extends Http2Session {
throw new errors.Error('ERR_HTTP2_INVALID_SESSION');
debug(`HttpSession ${sessionName(this[kType])}: initiating request`);

_unrefActive(this);
if (this[kSession])
_unrefActive(this[kSession]);
this[kUpdateTimer]();

assertIsObject(headers, 'headers');
assertIsObject(options, 'options');
Expand Down Expand Up @@ -1112,15 +1111,13 @@ function afterDoStreamWrite(status, handle, req) {
const stream = handle[kOwner];
const session = stream[kSession];

_unrefActive(stream);
stream[kUpdateTimer]();

const { bytes } = req;
stream[kState].writeQueueSize -= bytes;

if (session !== undefined) {
_unrefActive(session);
if (session !== undefined)
session[kState].writeQueueSize -= bytes;
}

if (typeof req.callback === 'function')
req.callback();
Expand Down Expand Up @@ -1217,6 +1214,12 @@ class Http2Stream extends Duplex {
}
}

[kUpdateTimer]() {
_unrefActive(this);
if (this[kSession])
_unrefActive(this[kSession]);
}

[kInit](id, handle) {
this[kID] = id;
this[async_id_symbol] = handle.getAsyncId();
Expand Down Expand Up @@ -1262,8 +1265,7 @@ class Http2Stream extends Duplex {
handle.chunksSentSinceLastWrite : null;
if (chunksSentSinceLastWrite !== null &&
chunksSentSinceLastWrite !== handle.updateChunksSent()) {
_unrefActive(this);
_unrefActive(this[kSession]);
this[kUpdateTimer]();
return;
}
}
Expand Down Expand Up @@ -1306,8 +1308,7 @@ class Http2Stream extends Duplex {
return;
}

_unrefActive(this);
_unrefActive(this[kSession]);
this[kUpdateTimer]();

if (!this[kState].headersSent)
this[kProceed]();
Expand All @@ -1331,8 +1332,7 @@ class Http2Stream extends Duplex {
return;
}

_unrefActive(this);
_unrefActive(this[kSession]);
this[kUpdateTimer]();

if (!this[kState].headersSent)
this[kProceed]();
Expand Down Expand Up @@ -1689,8 +1689,7 @@ class ServerHttp2Stream extends Http2Stream {
debug(`Http2Stream ${this[kID]} [Http2Session ` +
`${sessionName(session[kType])}]: initiating push stream`);

_unrefActive(this);
_unrefActive(this[kSession]);
this[kUpdateTimer]();

if (typeof options === 'function') {
callback = options;
Expand Down Expand Up @@ -1765,8 +1764,7 @@ class ServerHttp2Stream extends Http2Stream {
throw new errors.Error('ERR_HTTP2_INVALID_STREAM');
debug(`Http2Stream ${this[kID]} [Http2Session ` +
`${sessionName(session[kType])}]: initiating response`);
_unrefActive(this);
_unrefActive(this[kSession]);
this[kUpdateTimer]();
const state = this[kState];

if (state.headersSent)
Expand Down Expand Up @@ -1832,8 +1830,7 @@ class ServerHttp2Stream extends Http2Stream {
throw new errors.Error('ERR_HTTP2_INVALID_STREAM');
debug(`Http2Stream ${this[kID]} [Http2Session ` +
`${sessionName(session[kType])}]: initiating response`);
_unrefActive(this);
_unrefActive(this[kSession]);
this[kUpdateTimer]();
const state = this[kState];

if (state.headersSent)
Expand Down Expand Up @@ -1916,8 +1913,7 @@ class ServerHttp2Stream extends Http2Stream {
throw new errors.Error('ERR_HTTP2_INVALID_STREAM');
debug(`Http2Stream ${this[kID]} [Http2Session ` +
`${sessionName(session[kType])}]: initiating response`);
_unrefActive(this);
_unrefActive(this[kSession]);
this[kUpdateTimer]();
const state = this[kState];

if (state.headersSent)
Expand Down Expand Up @@ -1997,7 +1993,7 @@ class ServerHttp2Stream extends Http2Stream {
}
}

_unrefActive(this);
this[kUpdateTimer]();

const headersList = mapToHeaders(headers,
assertValidPseudoHeaderResponse);
Expand Down Expand Up @@ -2050,7 +2046,7 @@ const setTimeout = {
}
} else {
enroll(this, msecs);
_unrefActive(this);
this[kUpdateTimer]();
if (callback !== undefined) {
if (typeof callback !== 'function')
throw new errors.TypeError('ERR_INVALID_CALLBACK');
Expand Down