Skip to content

Commit

Permalink
feat: performance
Browse files Browse the repository at this point in the history
  • Loading branch information
dryajov authored and jacobheun committed Feb 7, 2019
1 parent d428fb6 commit 6d79751
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 94 deletions.
2 changes: 1 addition & 1 deletion profile.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ function marker (n, done) {
}
}

spawn(1000, 10000, (err) => {
spawn(1000, 1000, (err) => {
if (err) {
throw err
}
Expand Down
32 changes: 15 additions & 17 deletions src/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ class Channel extends EE {
this._endedLocal = false // local stream ended
this._reset = false

this.MSG = this._initiator
? consts.type.OUT_MESSAGE
: consts.type.IN_MESSAGE

this.END = this._initiator
? consts.type.OUT_CLOSE
: consts.type.IN_CLOSE

this.RESET = this._initiator
? consts.type.OUT_RESET
: consts.type.IN_RESET

this._log = (name, data) => {
log({
op: name,
Expand Down Expand Up @@ -132,10 +144,6 @@ class Channel extends EE {
if (this.open) { return } // chan already open

let name
if (this._name && !Buffer.isBuffer(this._name)) {
name = Buffer.from(this._name)
}

this.open = true
this._plex.push([
this._id,
Expand All @@ -151,14 +159,9 @@ class Channel extends EE {
this.openChan()
}

if (!Buffer.isBuffer(data)) {
data = Buffer.from(data)
}
this._plex.push([
this._id,
this._initiator
? consts.type.OUT_MESSAGE
: consts.type.IN_MESSAGE,
this.MSG,
data
])
}
Expand All @@ -172,10 +175,7 @@ class Channel extends EE {

this._plex.push([
this._id,
this._initiator
? consts.type.OUT_CLOSE
: consts.type.IN_CLOSE,
Buffer.from([0])
this.END
])
}

Expand All @@ -188,9 +188,7 @@ class Channel extends EE {

this._plex.push([
this._id,
this._initiator
? consts.type.OUT_RESET
: consts.type.IN_RESET
this.RESET
])
}
}
Expand Down
45 changes: 24 additions & 21 deletions src/coder.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,25 @@ const debug = require('debug')
const log = debug('pull-plex:coder')
log.err = debug('pull-plex:coder:err')

let pool = Buffer.allocUnsafe(100 * 1024)
let used = 0
const empty = Buffer.alloc(0)
exports.encode = () => {
return pull(
through(function (msg) {
const seq = [Buffer.from(varint.encode(msg[0] << 3 | msg[1]))]
const len = msg[2] ? Buffer.byteLength(msg[2]) : 0
seq.push(Buffer.from(varint.encode(len))) // send empty body
this.queue(Buffer.concat(seq)) // send header
return through(function (msg) {
const oldUsed = used
varint.encode(msg[0] << 3 | msg[1], pool, used)
used += varint.encode.bytes
varint.encode(varint.encode(msg[2] ? Buffer.byteLength(msg[2]) : 0), pool, used)
used += varint.encode.bytes
this.queue(pool.slice(oldUsed, used)) // send header

if (len) {
this.queue(msg[2])
}
})
)
if (pool.length - used < 100) {
pool = Buffer.allocUnsafe(10 * 1024)
used = 0
}

this.queue(msg[2] || empty)
})
}

let States = {
Expand All @@ -40,10 +46,9 @@ exports.decode = () => {
try {
let offset = 0
let length = 0
let buff = msg.slice()
const h = varint.decode(buff) // no bl[x] accessor :(
const h = varint.decode(msg)
offset += varint.decode.bytes
length = varint.decode(buff, offset)
length = varint.decode(msg, offset)
offset += varint.decode.bytes
const message = {
id: h >> 3,
Expand All @@ -68,21 +73,19 @@ exports.decode = () => {
let left = length - msg.length
if (left < 0) { left = 0 }
if (msg.length > 0) {
const buff = msg.slice(0, length - left)
data.append(buff)
msg = msg.slice(buff.length)
data.append(msg.slice(0, length - left))
}
if (left <= 0) { state = States.PARSING }
return [left, msg, data]
return [left, msg.slice(length - left), data]
}

return through(function (msg) {
while (msg.length) {
while (msg && msg.length) {
if (States.PARSING === state) {
if (!buffer) {
buffer = new BufferList(msg)
buffer = Buffer.from(msg)
} else {
buffer = buffer.append(msg)
buffer = Buffer.concat([buffer, msg])
}

[msg, message, length] = decode(buffer)
Expand Down
16 changes: 7 additions & 9 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,6 @@ class Mplex extends EE {
}

_newStream (id, initiator, open, name, list) {
this._log('_newStream', Array.prototype.slice.call(arguments))

if (this.chanSize >= this._maxChannels) {
this.emit('error', new Error('max channels exceeded'))
return
Expand Down Expand Up @@ -191,13 +189,13 @@ class Mplex extends EE {
_addChan (id, chan, list) {
chan.once('close', () => {
const chan = list.get(id)
this._log('deleting channel', JSON.stringify({
channel: this._name,
id: id,
endedLocal: chan._endedLocal,
endedRemote: chan._endedRemote,
initiator: chan._initiator
}))
// this._log('deleting channel', JSON.stringify({
// channel: this._name,
// id: id,
// endedLocal: chan._endedLocal,
// endedRemote: chan._endedRemote,
// initiator: chan._initiator
// }))
list.delete(id)
})

Expand Down
46 changes: 0 additions & 46 deletions src/utils.js

This file was deleted.

0 comments on commit 6d79751

Please sign in to comment.