Skip to content
This repository has been archived by the owner on Jul 6, 2018. It is now read-only.

Commit

Permalink
http2: eliminate unnecesary closure allocations
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed Jul 13, 2017
1 parent 112c9f4 commit 03cd164
Showing 1 changed file with 108 additions and 85 deletions.
193 changes: 108 additions & 85 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ function sessionName(type) {
}
}

// Top level to avoid creating a closure
function emit() {
this.emit.apply(this, arguments);
}

// Called when a new block of headers has been received for a given
// stream. The stream may or may not be new. If the stream is new,
// create the associated Http2Stream instance and emit the 'stream'
Expand Down Expand Up @@ -168,7 +173,7 @@ function onSessionHeaders(id, cat, flags, headers) {
'report this as a bug in Node.js');
}
streams.set(id, stream);
process.nextTick(() => owner.emit('stream', stream, obj, flags));
process.nextTick(emit.bind(owner, 'stream', stream, obj, flags));
} else {
let event;
let status;
Expand Down Expand Up @@ -201,7 +206,7 @@ function onSessionHeaders(id, cat, flags, headers) {
'report this as a bug in Node.js');
}
debug(`[${sessionName(owner[kType])}] emitting stream '${event}' event`);
process.nextTick(() => stream.emit(event, obj, flags));
process.nextTick(emit.bind(stream, event, obj, flags));
}
}

Expand Down Expand Up @@ -252,16 +257,15 @@ function onSessionStreamClose(id, code) {

if (state.fd !== undefined) {
debug(`Closing fd ${state.fd} for stream ${id}`);
fs.close(state.fd, (err) => {
if (err)
process.nextTick(() => stream.emit('error', err));
});
fs.close(state.fd, afterFDClose.bind(stream));
}

setImmediate(() => {
stream.destroy();
debug(`[${sessionName(owner[kType])}] stream ${id} is closed`);
});
setImmediate(stream.destroy.bind(stream));
}

function afterFDClose(err) {
if (err)
process.nextTick(() => this.emit('error', err));
}

// Called when an error event needs to be triggered
Expand Down Expand Up @@ -304,14 +308,21 @@ function onSettings(ack) {
const owner = this[kOwner];
debug(`[${sessionName(owner[kType])}] new settings received`);
_unrefActive(this);
let fn;
let event = 'remoteSettings';
if (ack) {
if (owner[kState].pendingAck > 0)
owner[kState].pendingAck--;
owner[kLocalSettings] = undefined;
process.nextTick(() => owner.emit('localSettings', owner.localSettings));
event = 'localSettings';
} else {
owner[kRemoteSettings] = undefined;
process.nextTick(() => owner.emit('remoteSettings', owner.remoteSettings));
}
// Only emit the event if there are listeners registered
if (owner.listenerCount(event) > 0) {
const settings = event === 'localSettings' ?
owner.localSettings : owner.remoteSettings;
process.nextTick(emit.bind(owner, event, settings));
}
}

Expand All @@ -328,7 +339,15 @@ function onPriority(id, parent, weight, exclusive) {
const stream = streams.get(id);
const emitter = stream === undefined ? owner : stream;
process.nextTick(
() => emitter.emit('priority', id, parent, weight, exclusive));
emit.bind(emitter, 'priority', id, parent, weight, exclusive));
}

function emitFrameError() {
if (!this.emit('frameError', type, code, id)) {
const err = new errors.Error('ERR_HTTP2_FRAME_ERROR', type, code, id);
err.errno = code;
this.emit('error', err);
}
}

// Called by the native layer when an error has occurred sending a
Expand All @@ -341,29 +360,25 @@ function onFrameError(id, type, code) {
const streams = owner[kState].streams;
const stream = streams.get(id);
const emitter = stream !== undefined ? stream : owner;
process.nextTick(() => {
if (!emitter.emit('frameError', type, code, id)) {
const err = new errors.Error('ERR_HTTP2_FRAME_ERROR', type, code, id);
err.errno = code;
emitter.emit('error', err);
}
});
process.nextTick(emitFrameError.bind(emitter));
}

function emitGoaway(state, code, lastStreamID, buf) {
this.emit('goaway', code, lastStreamID, buf);
// Tear down the session or destroy
if (!state.shuttingDown && !state.shutdown) {
this.shutdown({}, this.destroy.bind(this));
} else {
this.destroy();
}
}

// Called by the native layer when a goaway frame has been received
function onGoawayData(code, lastStreamID, buf) {
const owner = this[kOwner];
const state = owner[kState];
debug(`[${sessionName(owner[kType])}] goaway data received`);
process.nextTick(() => {
owner.emit('goaway', code, lastStreamID, buf);
// Tear down the session or destroy
if (!state.shuttingDown && !state.shutdown) {
owner.shutdown({}, () => { owner.destroy(); });
} else {
owner.destroy();
}
});
process.nextTick(emitGoaway.bind(owner, state, code, lastStreamID, buf));
}

// Returns the padding to use per frame. The selectPadding callback is set
Expand Down Expand Up @@ -524,7 +539,7 @@ function setupHandle(session, socket, type, options) {
options.settings : Object.create(null);

session.settings(settings);
process.nextTick(() => session.emit('connect', session, socket));
process.nextTick(emit.bind(session, 'connect', session, socket))
};
}

Expand Down Expand Up @@ -627,7 +642,7 @@ function doShutdown(options) {
process.nextTick(() => this.emit('error', err));
return;
}
process.nextTick(() => this.emit('shutdown', options));
process.nextTick(emit.bind(this, 'shutdown', options));
debug(`[${sessionName(this[kType])}] shutdown is complete`);
}

Expand Down Expand Up @@ -1207,7 +1222,7 @@ function streamOnSessionConnect() {
debug(`[${sessionName(session[kType])}] session connected. emiting stream ` +
'connect');
this[kState].connecting = false;
process.nextTick(() => this.emit('connect'));
process.nextTick(emit.bind(this, 'connect'));
}

function streamOnceReady() {
Expand Down Expand Up @@ -1318,7 +1333,7 @@ class Http2Stream extends Duplex {

_write(data, encoding, cb) {
if (this[kID] === undefined) {
this.once('ready', () => this._write(data, encoding, cb));
this.once('ready', this._write.bind(this, data, encoding, cb));
return;
}
_unrefActive(this);
Expand All @@ -1341,7 +1356,7 @@ class Http2Stream extends Duplex {

_writev(data, cb) {
if (this[kID] === undefined) {
this.once('ready', () => this._writev(data, cb));
this.once('ready', this._writev.bindthis, data, cb);
return;
}
_unrefActive(this);
Expand All @@ -1368,7 +1383,7 @@ class Http2Stream extends Duplex {

_read(nread) {
if (this[kID] === undefined) {
this.once('ready', () => this._read(nread));
this.once('ready', this._read.bind(this, nread));
return;
}
if (this.destroyed) {
Expand Down Expand Up @@ -1397,7 +1412,7 @@ class Http2Stream extends Duplex {
if (this[kID] === undefined) {
debug(
`[${sessionName(session[kType])}] queuing rstStream for new stream`);
this.once('ready', () => this.rstStream(code));
this.once('ready', this.rstStream.bind(this, code));
return;
}
debug(`[${sessionName(session[kType])}] sending rstStream for stream ` +
Expand Down Expand Up @@ -1438,7 +1453,7 @@ class Http2Stream extends Duplex {
const session = this[kSession];
if (this[kID] === undefined) {
debug(`[${sessionName(session[kType])}] queuing priority for new stream`);
this.once('ready', () => this.priority(options));
this.once('ready', this.priority.bind(this, options));
return;
}
debug(`[${sessionName(session[kType])}] sending priority for stream ` +
Expand Down Expand Up @@ -1479,10 +1494,7 @@ class Http2Stream extends Duplex {
// Unenroll the timer
unenroll(this);

setImmediate(() => {
if (handle !== undefined)
handle.destroyStream(this[kID]);
});
setImmediate(finishStreamDestroy.bind(this, handle));
session[kState].streams.delete(this[kID]);
delete this[kSession];

Expand All @@ -1493,12 +1505,17 @@ class Http2Stream extends Duplex {
const err = new errors.Error('ERR_HTTP2_STREAM_ERROR', code);
process.nextTick(() => this.emit('error', err));
}
process.nextTick(() => this.emit('streamClosed', code));
process.nextTick(emit.bind(this, 'streamClosed', code));
debug(`[${sessionName(session[kType])}] stream ${this[kID]} destroyed`);
callback(err);
}
}

function finishStreamDestroy(handle) {
if (handle !== undefined)
handle.destroyStream(this[kID]);
}

function processHeaders(headers) {
assertIsObject(headers, 'headers');
headers = Object.assign(Object.create(null), headers);
Expand Down Expand Up @@ -1545,6 +1562,53 @@ function processRespondWithFD(fd, headers) {
}
}

function doSendFD(session, options, fd, headers, err, stat) {
if (this.destroyed || session.destroyed) {
abort(this);
return;
}
if (err) {
process.nextTick(() => this.emit('error', err));
return;
}
if (!stat.isFile()) {
err = new errors.Error('ERR_HTTP2_SEND_FILE');
process.nextTick(() => this.emit('error', err));
return;
}

// Set the content-length by default
headers[HTTP2_HEADER_CONTENT_LENGTH] = stat.size;
if (typeof options.statCheck === 'function' &&
options.statCheck.call(this, stat, headers) === false) {
return;
}

const headersList = mapToHeaders(headers,
assertValidPseudoHeaderResponse);
if (!Array.isArray(headersList)) {
throw headersList;
}

processRespondWithFD.call(this, fd, headersList);
}

function afterOpen(session, options, headers, err, fd) {
const state = this[kState];
if (this.destroyed || session.destroyed) {
abort(this);
return;
}
if (err) {
process.nextTick(() => this.emit('error', err));
return;
}
state.fd = fd;

fs.fstat(fd, doSendFD.bind(this, session, options, fd, headers));
}


class ServerHttp2Stream extends Http2Stream {
constructor(session, id, options, headers) {
super(session, options);
Expand Down Expand Up @@ -1795,48 +1859,7 @@ class ServerHttp2Stream extends Http2Stream {
throw new errors.Error('ERR_HTTP2_PAYLOAD_FORBIDDEN', statusCode);
}

fs.open(path, 'r', (err, fd) => {
if (this.destroyed || session.destroyed) {
abort(this);
return;
}
if (err) {
process.nextTick(() => this.emit('error', err));
return;
}
state.fd = fd;

fs.fstat(fd, (err, stat) => {
if (this.destroyed || session.destroyed) {
abort(this);
return;
}
if (err) {
process.nextTick(() => this.emit('error', err));
return;
}
if (!stat.isFile()) {
err = new errors.Error('ERR_HTTP2_SEND_FILE');
process.nextTick(() => this.emit('error', err));
return;
}

// Set the content-length by default
headers[HTTP2_HEADER_CONTENT_LENGTH] = stat.size;
if (typeof options.statCheck === 'function' &&
options.statCheck.call(this, stat, headers) === false) {
return;
}

const headersList = mapToHeaders(headers,
assertValidPseudoHeaderResponse);
if (!Array.isArray(headersList)) {
throw headersList;
}

processRespondWithFD.call(this, fd, headersList);
});
});
fs.open(path, 'r', afterOpen.bind(this, session, options, headers));
}

// Sends a block of informational headers. In theory, the HTTP/2 spec
Expand Down Expand Up @@ -2088,7 +2111,7 @@ function connectionListener(socket) {

socket[kServer] = this;

process.nextTick(() => this.emit('session', session));
process.nextTick(emit.bind(this, 'session', session));
}

function initializeOptions(options) {
Expand Down

0 comments on commit 03cd164

Please sign in to comment.