From 6d7975181d42e2f43d717d2e2d59a66291382910 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Sat, 21 Apr 2018 02:35:19 -0600 Subject: [PATCH] feat: performance --- profile.js | 2 +- src/channel.js | 32 +++++++++++++++----------------- src/coder.js | 45 ++++++++++++++++++++++++--------------------- src/index.js | 16 +++++++--------- src/utils.js | 46 ---------------------------------------------- 5 files changed, 47 insertions(+), 94 deletions(-) delete mode 100644 src/utils.js diff --git a/profile.js b/profile.js index 40d909e..aeac4a7 100644 --- a/profile.js +++ b/profile.js @@ -79,7 +79,7 @@ function marker (n, done) { } } -spawn(1000, 10000, (err) => { +spawn(1000, 1000, (err) => { if (err) { throw err } diff --git a/src/channel.js b/src/channel.js index 7eab9ab..00fae5d 100644 --- a/src/channel.js +++ b/src/channel.js @@ -26,6 +26,18 @@ class Channel extends EE { this._endedLocal = false // local stream ended this._reset = false + this.MSG = this._initiator + ? consts.type.OUT_MESSAGE + : consts.type.IN_MESSAGE + + this.END = this._initiator + ? consts.type.OUT_CLOSE + : consts.type.IN_CLOSE + + this.RESET = this._initiator + ? consts.type.OUT_RESET + : consts.type.IN_RESET + this._log = (name, data) => { log({ op: name, @@ -132,10 +144,6 @@ class Channel extends EE { if (this.open) { return } // chan already open let name - if (this._name && !Buffer.isBuffer(this._name)) { - name = Buffer.from(this._name) - } - this.open = true this._plex.push([ this._id, @@ -151,14 +159,9 @@ class Channel extends EE { this.openChan() } - if (!Buffer.isBuffer(data)) { - data = Buffer.from(data) - } this._plex.push([ this._id, - this._initiator - ? consts.type.OUT_MESSAGE - : consts.type.IN_MESSAGE, + this.MSG, data ]) } @@ -172,10 +175,7 @@ class Channel extends EE { this._plex.push([ this._id, - this._initiator - ? consts.type.OUT_CLOSE - : consts.type.IN_CLOSE, - Buffer.from([0]) + this.END ]) } @@ -188,9 +188,7 @@ class Channel extends EE { this._plex.push([ this._id, - this._initiator - ? consts.type.OUT_RESET - : consts.type.IN_RESET + this.RESET ]) } } diff --git a/src/coder.js b/src/coder.js index 48a5ef7..f4ea5b4 100644 --- a/src/coder.js +++ b/src/coder.js @@ -10,19 +10,25 @@ const debug = require('debug') const log = debug('pull-plex:coder') log.err = debug('pull-plex:coder:err') +let pool = Buffer.allocUnsafe(100 * 1024) +let used = 0 +const empty = Buffer.alloc(0) exports.encode = () => { - return pull( - through(function (msg) { - const seq = [Buffer.from(varint.encode(msg[0] << 3 | msg[1]))] - const len = msg[2] ? Buffer.byteLength(msg[2]) : 0 - seq.push(Buffer.from(varint.encode(len))) // send empty body - this.queue(Buffer.concat(seq)) // send header + return through(function (msg) { + const oldUsed = used + varint.encode(msg[0] << 3 | msg[1], pool, used) + used += varint.encode.bytes + varint.encode(varint.encode(msg[2] ? Buffer.byteLength(msg[2]) : 0), pool, used) + used += varint.encode.bytes + this.queue(pool.slice(oldUsed, used)) // send header - if (len) { - this.queue(msg[2]) - } - }) - ) + if (pool.length - used < 100) { + pool = Buffer.allocUnsafe(10 * 1024) + used = 0 + } + + this.queue(msg[2] || empty) + }) } let States = { @@ -40,10 +46,9 @@ exports.decode = () => { try { let offset = 0 let length = 0 - let buff = msg.slice() - const h = varint.decode(buff) // no bl[x] accessor :( + const h = varint.decode(msg) offset += varint.decode.bytes - length = varint.decode(buff, offset) + length = varint.decode(msg, offset) offset += varint.decode.bytes const message = { id: h >> 3, @@ -68,21 +73,19 @@ exports.decode = () => { let left = length - msg.length if (left < 0) { left = 0 } if (msg.length > 0) { - const buff = msg.slice(0, length - left) - data.append(buff) - msg = msg.slice(buff.length) + data.append(msg.slice(0, length - left)) } if (left <= 0) { state = States.PARSING } - return [left, msg, data] + return [left, msg.slice(length - left), data] } return through(function (msg) { - while (msg.length) { + while (msg && msg.length) { if (States.PARSING === state) { if (!buffer) { - buffer = new BufferList(msg) + buffer = Buffer.from(msg) } else { - buffer = buffer.append(msg) + buffer = Buffer.concat([buffer, msg]) } [msg, message, length] = decode(buffer) diff --git a/src/index.js b/src/index.js index 48921cc..10cdc70 100644 --- a/src/index.js +++ b/src/index.js @@ -154,8 +154,6 @@ class Mplex extends EE { } _newStream (id, initiator, open, name, list) { - this._log('_newStream', Array.prototype.slice.call(arguments)) - if (this.chanSize >= this._maxChannels) { this.emit('error', new Error('max channels exceeded')) return @@ -191,13 +189,13 @@ class Mplex extends EE { _addChan (id, chan, list) { chan.once('close', () => { const chan = list.get(id) - this._log('deleting channel', JSON.stringify({ - channel: this._name, - id: id, - endedLocal: chan._endedLocal, - endedRemote: chan._endedRemote, - initiator: chan._initiator - })) + // this._log('deleting channel', JSON.stringify({ + // channel: this._name, + // id: id, + // endedLocal: chan._endedLocal, + // endedRemote: chan._endedRemote, + // initiator: chan._initiator + // })) list.delete(id) }) diff --git a/src/utils.js b/src/utils.js deleted file mode 100644 index f399bfb..0000000 --- a/src/utils.js +++ /dev/null @@ -1,46 +0,0 @@ -'use strict' - -const pull = require('pull-stream') -const varint = require('varint') -const lp = require('pull-length-prefixed') -const cat = require('pull-cat') -const through = require('pull-through') - -exports.encodeMsg = (id, type, data, cb) => { - return pull( - cat([ - pull.values([varint.encode(id << 3 | type)]), - pull( - pull.values([Buffer.from(data)]), - lp.encode() - ) - ]), - pull.flatten(), - pull.collect((err, data) => { - if (err) { return cb(err) } - cb(null, Buffer.from(data)) - }) - ) -} - -exports.decodeMsg = (msg, cb) => { - return pull( - cat([ - pull( - pull.values([msg.slice(0, 1)]), - through(function (h) { - const header = varint.decode(h) - this.queue({ id: header >> 3, type: header & 7 }) - }) - ), - pull( - pull.values([msg.slice(1)]), - lp.decode() - ) - ]), - pull.collect((err, data) => { - if (err) { return cb(err) } - cb(null, data) - }) - ) -}