Skip to content

Commit

Permalink
feat: correct encoding/decoding
Browse files Browse the repository at this point in the history
  • Loading branch information
dryajov authored and jacobheun committed Feb 7, 2019
1 parent e9f9917 commit 2e608fd
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 18 deletions.
3 changes: 2 additions & 1 deletion src/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class Channel extends EE {
endedLocal: this._endedLocal,
endedRemote: this._endedRemote,
initiator: this._initiator,
data: data || ''
data: (data && data.toString()) || ''
})
}

Expand Down Expand Up @@ -101,6 +101,7 @@ class Channel extends EE {
push (data) {
this._log('push', data)
this._msgs.push(data)
log('buffer', this._msgs.buffer)
}

// close for reading
Expand Down
21 changes: 16 additions & 5 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,23 @@ class Plex extends EE {
this._initiator = !!initiator
this._chanId = this._initiator ? 1 : 0
this._channels = {}
this._destroyed = false

this._log = (name, data) => {
log({
src: 'channel.js',
src: 'index.js',
op: name,
channel: this._name,
id: this._id,
localEnded: this._endedLocal,
remoteEnded: this._endedRemote,
initiator: this._initiator,
data: data || ''
data: (data && data.toString()) || ''
})
}

this._chandata = pushable((err) => {
if (this._destroyed) { return }
this.destroy(err || new Error('Underlying stream has been closed'))
})

Expand All @@ -51,7 +53,7 @@ class Plex extends EE {
utils.decode(),
(read) => {
const next = (end, data) => {
if (end === true) { return }
if (end === true || this._destroyed) { return }
if (end) { return this.destroy(end) }
this._handle(data)
return read(null, next)
Expand All @@ -75,19 +77,28 @@ class Plex extends EE {
.keys(this._channels)
.forEach((id) => {
const chan = this._channels[id]
chan.reset(err)
if (err) {
chan.reset(err)
} else {
chan.close()
}
delete this._channels[id]
})

this._destroyed = true
this._chandata.end(err) // close source

if (err) {
return setImmediate(() => this.emit('error', err))
}

this.emit('close')
this.emit('close', err)
}

push (data) {
log('push', data)
this._chandata.push(data)
log('buffer', this._chandata.buffer)
}

_nextChanId () {
Expand Down
35 changes: 23 additions & 12 deletions src/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,40 @@ const cat = require('pull-cat')
const through = require('pull-through')

exports.encode = () => {
return through(function (msg) {
const data = Buffer.concat([
Buffer.from(varint.encode(msg[0] << 3 | msg[1])),
Buffer.from(varint.encode(Buffer.byteLength(msg[2]))),
Buffer.from(msg[2])
])
this.queue(data)
})
return pull(
through(function (msg) {
const data = Buffer.concat([
Buffer.from(varint.encode(msg[0] << 3 | msg[1])),
Buffer.from(varint.encode(Buffer.byteLength(msg[2]))),
Buffer.from(msg[2])
])
this.queue(data)
})
)
}

exports.decode = () => {
return through(function (msg) {
const decode = (msg) => {
let offset = 0
const h = varint.decode(msg)
offset += varint.decode.bytes
const length = varint.decode(msg.slice(offset))
const length = varint.decode(msg, offset)
offset += varint.decode.bytes
const decoded = {
id: h >> 3,
type: h & 7,
data: msg.slice(offset /*, length*/) // somehow length gets offset and truncates the buffer
data: msg.slice(offset, offset + length)
}

this.queue(decoded)
return [msg.slice(offset + length), decoded]
}

return through(function (msg) {
let offset = 0
let decoded
while (msg.length) {
[msg, decoded] = decode(msg)
this.queue(decoded)
}
})
}
31 changes: 31 additions & 0 deletions test/utils.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,35 @@ describe('utils', () => {
})
)
})

it('encodes several msgs into buffer', () => {
pull(
pull.values([
[17, 0, Buffer.from('17')],
[19, 0, Buffer.from('19')],
[21, 0, Buffer.from('21')]
]),
utils.encode(),
pull.collect((err, data) => {
expect(err).to.not.exist()
expect(Buffer.concat(data)).to.be.eql(Buffer.from('88010231379801023139a801023231', 'hex'))
})
)
})

it('decodes msgs from buffer', () => {
pull(
pull.values([Buffer.from('88010231379801023139a801023231', 'hex')]),
utils.decode(),
pull.collect((err, data) => {
expect(err).to.not.exist()
expect(data).to.be.deep.eql([
{ id: 17, type: 0, data: Buffer.from('17') },
{ id: 19, type: 0, data: Buffer.from('19') },
{ id: 21, type: 0, data: Buffer.from('21') }
])
})
)
})

})

0 comments on commit 2e608fd

Please sign in to comment.