Skip to content

Commit

Permalink
feat: interop with old libp2p-mplex
Browse files Browse the repository at this point in the history
  • Loading branch information
dryajov authored and jacobheun committed Feb 7, 2019
1 parent 0341fee commit 0db5abf
Show file tree
Hide file tree
Showing 9 changed files with 585 additions and 62 deletions.
8 changes: 7 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,14 @@
"aegir": "^13.0.6",
"async": "^2.6.0",
"chai-checkmark": "^1.0.1",
"chunky": "0.0.0",
"concat-stream": "^1.6.2",
"libp2p-mplex": "^0.7.0",
"pull-abortable": "^4.1.1",
"pull-generate": "^2.2.0"
"pull-generate": "^2.2.0",
"pull-stream-to-stream": "^1.3.4",
"pump": "^3.0.0",
"through2": "^2.0.3"
},
"repository": {
"type": "git",
Expand Down
2 changes: 1 addition & 1 deletion profile.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ function marker (n, done) {
}


spawn(10000, 10000, (err) => {
spawn(1000, 1000, (err) => {
if (err) {
throw err
}
Expand Down
15 changes: 8 additions & 7 deletions src/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class Channel extends EE {
this._msgs = pushable((err) => {
this._log('source closed', err)
if (this._reset) { return } // don't try closing the channel on reset
if (err) { setImmediate(() => this.emit('error', err)) }

this.endChan()
})
Expand Down Expand Up @@ -110,7 +111,7 @@ class Channel extends EE {

reset (err) {
this._log('reset', err)
this._reset = err || new Error('channel reset!')
this._reset = err || 'channel reset!'
this.close(this._reset)
}

Expand All @@ -135,8 +136,8 @@ class Channel extends EE {
this._plex.push([
this._id,
this._initiator
? consts.type.OUT_MESSAGE
: consts.type.IN_MESSAGE,
? consts.type.IN_MESSAGE
: consts.type.OUT_MESSAGE,
data
])
}
Expand All @@ -151,8 +152,8 @@ class Channel extends EE {
this._plex.push([
this._id,
this._initiator
? consts.type.OUT_CLOSE
: consts.type.IN_CLOSE
? consts.type.IN_CLOSE
: consts.type.OUT_CLOSE
])
}

Expand All @@ -166,8 +167,8 @@ class Channel extends EE {
this._plex.push([
this._id,
this._initiator
? consts.type.OUT_RESET
: consts.type.IN_RESET
? consts.type.IN_RESET
: consts.type.OUT_RESET
])
}
}
Expand Down
56 changes: 39 additions & 17 deletions src/utils.js → src/coder.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,41 +28,63 @@ exports.encode = () => {
)
}

let States = {
PARSING: 0,
READING: 1
}
let state = States.PARSING
exports.decode = () => {
const decode = (msg) => {
let offset = 0
const h = varint.decode(msg)
offset += varint.decode.bytes
let length
let data
let length, data
try {
length = varint.decode(msg, offset)
offset += varint.decode.bytes

if (length > msg.length) {
throw new Error('partial buffer, need more data')
}

data = msg.slice(offset, offset + length)
} catch (err) {
log.err(err)
} // ignore if data is empty
log.err(err) // ignore if data is empty
}

const decoded = {
const message = {
id: h >> 3,
type: h & 7,
data
data: Buffer.alloc(length) // instead of allocating a new buff use a mem pool here
}

return [msg.slice(offset + length), decoded]
state = States.READING
return [msg.slice(offset), message, length]
}

const read = (msg, data, length) => {
let left = length - msg.length
if (msg.length > 0) {
const buff = left > 0 ? msg.slice() : msg.slice(0, length)
buff.copy(data)
msg = msg.slice(buff.length)
}
if (left <= 0) { state = States.PARSING }
return [left, msg, data]
}

let offset = 0
let message = {}
let length = 0
return through(function (msg) {
let offset = 0
let decoded
while (msg.length) {
[msg, decoded] = decode(msg)
this.queue(decoded)
if (States.PARSING === state) {
[msg, message, length] = decode(msg)
}

if (States.READING === state) {
[length, msg, message.data] = read(msg, message.data, length)
if (length <= 0 && States.PARSING === state) {
this.queue(message)
offset = 0
message = {}
length = 0
}
}
}
})
}
43 changes: 28 additions & 15 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const EE = require('events')

const Channel = require('./channel')
const consts = require('./consts')
const utils = require('./utils')
const coder = require('./coder')

const debug = require('debug')

Expand All @@ -17,6 +17,12 @@ log.err = debug('pull-plex:err')
class Plex extends EE {
constructor (initiator, onChan) {
super()

if (typeof initiator === 'function') {
onChan = initiator
initiator = true
}

this._initiator = !!initiator
this._chanId = this._initiator ? 1 : 0
this._channels = {}
Expand Down Expand Up @@ -45,11 +51,11 @@ class Plex extends EE {

this.source = pull(
this._chandata,
utils.encode()
coder.encode()
)

this.sink = pull(
utils.decode(),
coder.decode(),
(read) => {
const next = (end, data) => {
if (this._endedLocal) { return }
Expand All @@ -76,15 +82,15 @@ class Plex extends EE {
setImmediate(() => this.emit('error', err))
}

err = err || new Error('Underlying stream has been closed')
err = err || 'Underlying stream has been closed'
this._endedLocal = true

// propagate close to channels
Object
.keys(this._channels)
.forEach((id) => {
const chan = this._channels[id]
chan.close(err)
if (chan) { return chan.close(err) }
})

this.emit('close')
Expand All @@ -95,7 +101,7 @@ class Plex extends EE {
}

reset (err) {
err = err || new Error('Underlying stream has been closed')
err = err || 'Underlying stream has been closed'
this._chandata.end(err)
this.close(err)
}
Expand All @@ -107,7 +113,9 @@ class Plex extends EE {
}

_nextChanId () {
return this._chanId += 2
const id = this._chanId
this._chanId += 2
return id
}

createStream (name) {
Expand All @@ -127,19 +135,29 @@ class Plex extends EE {
open = false
}

id = id || this._nextChanId(initiator)
id = typeof id === 'number' ? id : this._nextChanId(initiator)
name = typeof name === 'number' ? name.toString() : name
name = name == null ? id.toString() : name
name = !name.length ? id.toString() : name
const chan = new Channel(id,
name || id.toString(),
name,
this,
initiator,
open || false)

chan.once('close', () => {
this._log('deleting channel', JSON.stringify({
channel: this._name,
id: id,
endedLocal: this._channels[id]._endedLocal,
endedRemote: this._channels[id]._endedRemote,
initiator: this._channels[id]._initiator
}))
delete this._channels[id]
})

if (this._channels[id]) {
return this.emit('error', new Error(`channel with id ${id} already exist!`))
return this.emit('error', `channel with id ${id} already exist!`)
}

this._channels[id] = chan
Expand All @@ -151,11 +169,6 @@ class Plex extends EE {
const { id, type, data } = msg
switch (type) {
case consts.type.NEW: {
if (!this._initiator && (id & 1) !== 1) {
return this.emit('error',
new Error(`Initiator can't have even id's!`))
}

const chan = this._newStream(id, this._initiator, true, data.toString())
setImmediate(() => this.emit('stream', chan))
return
Expand Down
5 changes: 0 additions & 5 deletions test/channel.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ const pushable = require('pull-pushable')
const abortable = require('pull-abortable')

const Plex = require('../src')
const utils = require('../src/utils')
const consts = require('../src/consts')

const series = require('async/series')

function closeAndWait (stream) {
pull(
Expand Down Expand Up @@ -94,7 +90,6 @@ describe('channel', () => {
pull(plex2, p[1], plex2)

const chan1 = plex1.createStream('stream 1')

plex2.on('stream', (stream) => {
pull(
stream,
Expand Down
16 changes: 8 additions & 8 deletions test/utils.spec.js → test/coder.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ chai.use(dirtyChai)

const pull = require('pull-stream')

const utils = require('../src/utils')
const coder = require('../src/coder')

describe('utils', () => {
describe('coder', () => {
it('encodes header', () => {
pull(
pull.values([[17, 0, Buffer.from('17')]]),
utils.encode(),
coder.encode(),
pull.collect((err, data) => {
expect(err).to.not.exist()
expect(data[0]).to.be.eql(Buffer.from('8801023137', 'hex'))
Expand All @@ -27,7 +27,7 @@ describe('utils', () => {
it('decodes header', () => {
pull(
pull.values([Buffer.from('8801023137', 'hex')]),
utils.decode(),
coder.decode(),
pull.collect((err, data) => {
expect(err).to.not.exist()
expect(data[0]).to.be.eql({ id: 17, type: 0, data: Buffer.from('17') })
Expand All @@ -42,7 +42,7 @@ describe('utils', () => {
[19, 0, Buffer.from('19')],
[21, 0, Buffer.from('21')]
]),
utils.encode(),
coder.encode(),
pull.collect((err, data) => {
expect(err).to.not.exist()
expect(Buffer.concat(data)).to.be.eql(Buffer.from('88010231379801023139a801023231', 'hex'))
Expand All @@ -53,7 +53,7 @@ describe('utils', () => {
it('decodes msgs from buffer', () => {
pull(
pull.values([Buffer.from('88010231379801023139a801023231', 'hex')]),
utils.decode(),
coder.decode(),
pull.collect((err, data) => {
expect(err).to.not.exist()
expect(data).to.be.deep.eql([
Expand All @@ -68,7 +68,7 @@ describe('utils', () => {
it('encodes zero length body msg', () => {
pull(
pull.values([[17, 0]]),
utils.encode(),
coder.encode(),
pull.collect((err, data) => {
expect(err).to.not.exist()
expect(data[0]).to.be.eql(Buffer.from('880100', 'hex'))
Expand All @@ -79,7 +79,7 @@ describe('utils', () => {
it('decodes zero length body msg', () => {
pull(
pull.values([Buffer.from('880100', 'hex')]),
utils.decode(),
coder.decode(),
pull.collect((err, data) => {
expect(err).to.not.exist()
expect(data[0]).to.be.eql({ id: 17, type: 0, data: Buffer.alloc(0) })
Expand Down
Loading

0 comments on commit 0db5abf

Please sign in to comment.