Skip to content

Commit

Permalink
fix: correctly read chunked data
Browse files Browse the repository at this point in the history
  • Loading branch information
dryajov authored and jacobheun committed Feb 7, 2019
1 parent 0db5abf commit 9fd3d28
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 37 deletions.
15 changes: 12 additions & 3 deletions src/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class Channel extends EE {
constructor (id, name, plex, initiator, open) {
super()
this._id = id
this._name = name || this._id.toString()
this._name = name
this._plex = plex
this._open = open
this._initiator = initiator
Expand Down Expand Up @@ -118,11 +118,16 @@ class Channel extends EE {
openChan () {
this._log('openChan')

let name
if (this._name && !Buffer.isBuffer(this._name)) {
name = Buffer.from(this._name)
}

this.open = true
this._plex.push([
this._id,
consts.type.NEW,
this._name
name != this._id.toString() ? name : null
])
}

Expand All @@ -133,6 +138,9 @@ class Channel extends EE {
this.openChan()
}

if (!Buffer.isBuffer(data)) {
data = Buffer.from(data)
}
this._plex.push([
this._id,
this._initiator
Expand All @@ -153,7 +161,8 @@ class Channel extends EE {
this._id,
this._initiator
? consts.type.IN_CLOSE
: consts.type.OUT_CLOSE
: consts.type.OUT_CLOSE,
Buffer.from([0])
])
}

Expand Down
61 changes: 35 additions & 26 deletions src/coder.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

const pull = require('pull-stream')
const varint = require('varint')
const lp = require('pull-length-prefixed')
const cat = require('pull-cat')
const through = require('pull-through')

const debug = require('debug')
Expand All @@ -15,15 +13,13 @@ exports.encode = () => {
return pull(
through(function (msg) {
const seq = [Buffer.from(varint.encode(msg[0] << 3 | msg[1]))]
const len = msg[2] ? Buffer.byteLength(msg[2]) : 0
seq.push(Buffer.from(varint.encode(len))) // send empty body
this.queue(Buffer.concat(seq)) // send header

if (msg[2]) {
seq.push(Buffer.from(varint.encode(Buffer.byteLength(msg[2]))))
seq.push(Buffer.from(msg[2]))
} else {
seq.push(Buffer.from(varint.encode(0)))
if (len) {
this.queue(msg[2])
}

this.queue(Buffer.concat(seq))
})
)
}
Expand All @@ -35,45 +31,57 @@ let States = {
let state = States.PARSING
exports.decode = () => {
const decode = (msg) => {
let offset = 0
const h = varint.decode(msg)
offset += varint.decode.bytes
let length, data
try {
let offset = 0
let length = 0
const h = varint.decode(msg)
offset += varint.decode.bytes
length = varint.decode(msg, offset)
offset += varint.decode.bytes
const message = {
id: h >> 3,
type: h & 7,
data: Buffer.alloc(length) // instead of allocating a new buff use a mem pool here
}

state = States.READING
return [msg.slice(offset), message, length]
} catch (err) {
log.err(err) // ignore if data is empty
return [msg, undefined, undefined]
}

const message = {
id: h >> 3,
type: h & 7,
data: Buffer.alloc(length) // instead of allocating a new buff use a mem pool here
}

state = States.READING
return [msg.slice(offset), message, length]
}

let pos = 0
const read = (msg, data, length) => {
let left = length - msg.length
if (msg.length > 0) {
const buff = left > 0 ? msg.slice() : msg.slice(0, length)
buff.copy(data)
const buff = msg.slice(0, length - left)
pos += buff.copy(data, pos)
msg = msg.slice(buff.length)
}
if (left <= 0) { state = States.PARSING }
return [left, msg, data]
}

let offset = 0
let message = {}
let message = null
let length = 0
let buffer = null
return through(function (msg) {
while (msg.length) {
if (States.PARSING === state) {
[msg, message, length] = decode(msg)
if (!buffer) {
buffer = Buffer.from(msg)
} else {
buffer = Buffer.concat([buffer, msg])
}

[msg, message, length] = decode(buffer)
if (!message && !length) {
return // read more
}
buffer = null
}

if (States.READING === state) {
Expand All @@ -83,6 +91,7 @@ exports.decode = () => {
offset = 0
message = {}
length = 0
pos = 0
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class Plex extends EE {
}

this._initiator = !!initiator
this._chanId = this._initiator ? 1 : 0
this._chanId = this._initiator ? 0 : 1
this._channels = {}
this._endedRemote = false // remote stream ended
this._endedLocal = false // local stream ended
Expand Down Expand Up @@ -119,6 +119,9 @@ class Plex extends EE {
}

createStream (name) {
if (typeof name === 'number') {
name = name.toString()
}
return this._newStream(null, this._initiator, false, name)
}

Expand All @@ -136,9 +139,6 @@ class Plex extends EE {
}

id = typeof id === 'number' ? id : this._nextChanId(initiator)
name = typeof name === 'number' ? name.toString() : name
name = name == null ? id.toString() : name
name = !name.length ? id.toString() : name
const chan = new Channel(id,
name,
this,
Expand Down
61 changes: 57 additions & 4 deletions test/old-mplex-interop.js
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ describe('node stream multiplex interop', () => {
})
})

it('chunks', (done) => {
it('new2old: chunks', (done) => {
let times = 100
;(function chunk () {
const collect = collector(function () {
Expand All @@ -276,13 +276,65 @@ describe('node stream multiplex interop', () => {
}
})

const plex1 = new MplexCore()
const pullPlex = new Plex(true)
const plex1 = toStream(pullPlex)
const stream1 = toStream(pullPlex.createStream())
const stream2 = toStream(pullPlex.createStream())

const plex2 = new MplexCore(function onStream (stream, id) {
stream.pipe(collect())
})

plex1.pipe(through(function (buf, enc, next) {
const bufs = chunky(buf)
for (let i = 0; i < bufs.length; i++) this.push(bufs[i])
next()
})).pipe(plex2)

stream1.write(Buffer.from('hello'))
stream2.write(Buffer.from('world'))
stream1.end()
stream2.end()
})()

function collector (cb) {
let pending = 2
const results = []

return function () {
return concat(function (data) {
results.push(data.toString())
if (--pending === 0) {
results.sort()
expect(results[0].toString()).to.equal('hello')
expect(results[1].toString()).to.equal('world')
cb()
}
})
}
}
})

it('old2new: chunks', (done) => {
let times = 100
;(function chunk () {
const collect = collector(function () {
if (--times === 0) {
done()
} else {
chunk()
}
})

const plex1 = new MplexCore({ initiator: true })
const stream1 = plex1.createStream()
const stream2 = plex1.createStream()

const plex2 = new MplexCore(function onStream (stream, id) {
const pullStream = new Plex(false, function onStream (pullStream, id) {
const stream = toStream(pullStream)
stream.pipe(collect())
})
const plex2 = toStream(pullStream)

plex1.pipe(through(function (buf, enc, next) {
const bufs = chunky(buf)
Expand Down Expand Up @@ -315,7 +367,8 @@ describe('node stream multiplex interop', () => {
})

it('prefinish + corking', (done) => {
const plex = new MplexCore()
const pullPlex = new Plex(true)
const plex = toStream(pullPlex)
let async = false

plex.on('prefinish', function () {
Expand Down

0 comments on commit 9fd3d28

Please sign in to comment.