From 2e608fd6246cbc51f2a68ca88f0ab7bb6fa69ddc Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 3 Apr 2018 11:17:06 -0600 Subject: [PATCH] feat: correct encoding/decoding --- src/channel.js | 3 ++- src/index.js | 21 ++++++++++++++++----- src/utils.js | 35 +++++++++++++++++++++++------------ test/utils.spec.js | 31 +++++++++++++++++++++++++++++++ 4 files changed, 72 insertions(+), 18 deletions(-) diff --git a/src/channel.js b/src/channel.js index bd9c078..8945c7d 100644 --- a/src/channel.js +++ b/src/channel.js @@ -31,7 +31,7 @@ class Channel extends EE { endedLocal: this._endedLocal, endedRemote: this._endedRemote, initiator: this._initiator, - data: data || '' + data: (data && data.toString()) || '' }) } @@ -101,6 +101,7 @@ class Channel extends EE { push (data) { this._log('push', data) this._msgs.push(data) + log('buffer', this._msgs.buffer) } // close for reading diff --git a/src/index.js b/src/index.js index 5d750d8..61efc51 100644 --- a/src/index.js +++ b/src/index.js @@ -20,21 +20,23 @@ class Plex extends EE { this._initiator = !!initiator this._chanId = this._initiator ? 1 : 0 this._channels = {} + this._destroyed = false this._log = (name, data) => { log({ - src: 'channel.js', + src: 'index.js', op: name, channel: this._name, id: this._id, localEnded: this._endedLocal, remoteEnded: this._endedRemote, initiator: this._initiator, - data: data || '' + data: (data && data.toString()) || '' }) } this._chandata = pushable((err) => { + if (this._destroyed) { return } this.destroy(err || new Error('Underlying stream has been closed')) }) @@ -51,7 +53,7 @@ class Plex extends EE { utils.decode(), (read) => { const next = (end, data) => { - if (end === true) { return } + if (end === true || this._destroyed) { return } if (end) { return this.destroy(end) } this._handle(data) return read(null, next) @@ -75,19 +77,28 @@ class Plex extends EE { .keys(this._channels) .forEach((id) => { const chan = this._channels[id] - chan.reset(err) + if (err) { + chan.reset(err) + } else { + chan.close() + } delete this._channels[id] }) + this._destroyed = true + this._chandata.end(err) // close source + if (err) { return setImmediate(() => this.emit('error', err)) } - this.emit('close') + this.emit('close', err) } push (data) { + log('push', data) this._chandata.push(data) + log('buffer', this._chandata.buffer) } _nextChanId () { diff --git a/src/utils.js b/src/utils.js index 368e421..537bb0a 100644 --- a/src/utils.js +++ b/src/utils.js @@ -7,29 +7,40 @@ const cat = require('pull-cat') const through = require('pull-through') exports.encode = () => { - return through(function (msg) { - const data = Buffer.concat([ - Buffer.from(varint.encode(msg[0] << 3 | msg[1])), - Buffer.from(varint.encode(Buffer.byteLength(msg[2]))), - Buffer.from(msg[2]) - ]) - this.queue(data) - }) + return pull( + through(function (msg) { + const data = Buffer.concat([ + Buffer.from(varint.encode(msg[0] << 3 | msg[1])), + Buffer.from(varint.encode(Buffer.byteLength(msg[2]))), + Buffer.from(msg[2]) + ]) + this.queue(data) + }) + ) } exports.decode = () => { - return through(function (msg) { + const decode = (msg) => { let offset = 0 const h = varint.decode(msg) offset += varint.decode.bytes - const length = varint.decode(msg.slice(offset)) + const length = varint.decode(msg, offset) offset += varint.decode.bytes const decoded = { id: h >> 3, type: h & 7, - data: msg.slice(offset /*, length*/) // somehow length gets offset and truncates the buffer + data: msg.slice(offset, offset + length) } - this.queue(decoded) + return [msg.slice(offset + length), decoded] + } + + return through(function (msg) { + let offset = 0 + let decoded + while (msg.length) { + [msg, decoded] = decode(msg) + this.queue(decoded) + } }) } diff --git a/test/utils.spec.js b/test/utils.spec.js index fc873f8..7acbb65 100644 --- a/test/utils.spec.js +++ b/test/utils.spec.js @@ -34,4 +34,35 @@ describe('utils', () => { }) ) }) + + it('encodes several msgs into buffer', () => { + pull( + pull.values([ + [17, 0, Buffer.from('17')], + [19, 0, Buffer.from('19')], + [21, 0, Buffer.from('21')] + ]), + utils.encode(), + pull.collect((err, data) => { + expect(err).to.not.exist() + expect(Buffer.concat(data)).to.be.eql(Buffer.from('88010231379801023139a801023231', 'hex')) + }) + ) + }) + + it('decodes msgs from buffer', () => { + pull( + pull.values([Buffer.from('88010231379801023139a801023231', 'hex')]), + utils.decode(), + pull.collect((err, data) => { + expect(err).to.not.exist() + expect(data).to.be.deep.eql([ + { id: 17, type: 0, data: Buffer.from('17') }, + { id: 19, type: 0, data: Buffer.from('19') }, + { id: 21, type: 0, data: Buffer.from('21') } + ]) + }) + ) + }) + }) \ No newline at end of file