Skip to content

Commit

Permalink
feat: getting interface-stream-muxer tests to pass
Browse files Browse the repository at this point in the history
  • Loading branch information
dryajov authored and jacobheun committed Feb 7, 2019
1 parent e7409ec commit 8dccda1
Show file tree
Hide file tree
Showing 6 changed files with 387 additions and 133 deletions.
13 changes: 9 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "mplex",
"name": "pull-plex",
"version": "0.0.1",
"description": "multiplex implementation of https://github.com/libp2p/mplex",
"main": "src/index.js",
Expand All @@ -15,10 +15,15 @@
"coverage": "aegir coverage",
"coverage-publish": "aegir coverage --provider coveralls"
},
"author": "",
"license": "ISC",
"author": "dryajov@gmail.com",
"license": "MIT",
"devDependencies": {
"aegir": "^13.0.6"
"aegir": "^13.0.6",
"pull-abortable": "^4.1.1"
},
"repository": {
"type": "git",
"url": "git+https://github.com/dryajov/pull-plex.git"
},
"dependencies": {
"async": "^2.6.0",
Expand Down
84 changes: 68 additions & 16 deletions src/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ const EE = require('events')

const debug = require('debug')

const log = debug('pull-mplex')
log.err = debug('pull-mplex:err')
const log = debug('pull-plex')
log.err = debug('pull-plex:err')

class Channel extends EE {
constructor (id, name, plex, initiator, open) {
Expand All @@ -21,14 +21,16 @@ class Channel extends EE {
this._initiator = initiator
this._endedRemote = false // remote stream ended
this._endedLocal = false // local stream ended
this._reset = false

this._log = (name, data) => {
log({
src: 'channel.js',
op: name,
channel: this._name,
id: this._id,
localEnded: this._endedLocal,
remoteEnded: this._endedRemote,
endedLocal: this._endedLocal,
endedRemote: this._endedRemote,
initiator: this._initiator,
data: data || ''
})
Expand All @@ -37,31 +39,43 @@ class Channel extends EE {
this._log('new channel', this._name)

this._msgs = pushable((err) => {
if (err) { log.err(err) }
setImmediate(() => this.emit('end', err))
this._log('source closed', err)
if (this._reset) { return } // don't try closing the channel on reset
this.endChan((err) => {
if (err) { setImmediate(() => this.emit('error', err)) }
})
})

this.source = this._msgs
this._source = this._msgs

this.sink = (read) => {
const next = (end, data) => {
this._log('sink', data)

// stream already ended
if (this._endedLocal && this._endedRemote) { return }
if (this._endedLocal) { return }

this._endedLocal = end || false

// source ended, close the stream
if (end === true) {
this.endChan((err) => {
if (err) { log.err(err) }
if (err) {
log.err(err)
setImmediate(() => this.emit('error', err))
}
})
return
}

// source errored, reset stream
if (end) { return this.emit('error', err) }
if (end || this._reset) {
this.resetChan(() => {
setImmediate(() => this.emit('error', end || this._reset))
this.reset()
})
return
}

// just send
return this.sendMsg(data, (err) => {
Expand All @@ -73,6 +87,10 @@ class Channel extends EE {
}
}

get source () {
return this._source
}

get id () {
return this._id
}
Expand All @@ -85,33 +103,45 @@ class Channel extends EE {
this._open = open
}

get name () {
return this._name
}

push (data) {
this._log('push', data)
this._msgs.push(data)
// this._drain()
}

end (err) {
this._log('end')
this._msgs.end(err)
// close for reading
close (err) {
this._log('close', err)
this.emit('close', err)
this._endedRemote = err || true
this._msgs.end(err)
}

reset (err) {
this._log('reset', err)
this._reset = err || new Error('channel reset!')
this.close(this._reset)
}

openChan (cb) {
this._log('openChan')

this.open = true // avoid duplicate open msgs
utils.encodeMsg(this._id,
consts.NEW,
this._name,
(err, data) => {
if (err) {
log.err(err)
this.open = false
return cb(err)
}

this._plex.push(data)
this.open = true
cb()
cb(null, this)
})
}

Expand Down Expand Up @@ -166,6 +196,28 @@ class Channel extends EE {
cb()
})
}

resetChan (cb) {
this._log('endChan')

if (!this.open) {
return cb()
}

utils.encodeMsg(this._id,
this._initiator
? consts.type.OUT_RESET
: consts.type.IN_RESET,
'',
(err, data) => {
if (err) {
log.err(err)
return cb(err)
}
this._plex.push(data)
cb()
})
}
}

module.exports = Channel
65 changes: 49 additions & 16 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,45 @@ const Channel = require('./channel')
const consts = require('./consts')
const utils = require('./utils')

const debug = require('debug')

const log = debug('pull-plex')
log.err = debug('pull-plex:err')

class Mplex extends EE {
constructor (initiator) {
constructor (initiator, onChan) {
super()
this._initiator = initiator || false
this._chanId = this._initiator ? 0 : 1
this._initiator = initiator || true
this._chanId = 1
this._channels = {}

this._log = (name, data) => {
log({
src: 'channel.js',
op: name,
channel: this._name,
id: this._id,
localEnded: this._endedLocal,
remoteEnded: this._endedRemote,
initiator: this._initiator,
data: data || ''
})
}

this._chandata = pushable((err) => {
setImmediate(() => this.emit('close'))
this.destroy(err || new Error('Underlying stream has been closed'))
})

if (onChan) {
this.on('stream', (chan) => onChan(chan, chan.id))
}

this.source = this._chandata

this.sink = (read) => {
const next = (end, data) => {
if (end === true) { return }
if (end) { return this.emit('error', end) }
if (end) { return this.destroy(end) }
return this._handle(data, (err) => {
read(err, next)
})
Expand All @@ -33,29 +56,36 @@ class Mplex extends EE {
}
}

end () {
destroy (err) {
// propagate close to channels
Object
.keys(this._channels)
.forEach((id) => {
this._channels[id].end(end)
const chan = this._channels[id]
chan.close(err)
delete this._channels[id]
})

if (err) {
return setImmediate(() => this.emit('error', err))
}

this.emit('close')
}

push (data) {
this._chandata.push(data)
// this._drain()
}

nextChanId (initiator) {
let inc = 1
if (initiator) { inc += 1 }
if (initiator) { inc = 1 }
this._chanId += inc + 1

return this._chanId
}

newStream (name) {
createStream (name) {
return this._newStream(null, this._initiator, false, name)
}

Expand All @@ -78,7 +108,7 @@ class Mplex extends EE {
initiator,
open || false)

chan.once('end', () => {
chan.once('close', () => {
delete this._channels[id]
})

Expand All @@ -93,10 +123,9 @@ class Mplex extends EE {
const data = _data[1]
switch (type) {
case consts.type.NEW: {
if (this._initiator && (id & 1) !== 1) {
return this.emit(
'error',
new Error(`stream initiator can't have even ids`))
if (!this._initiator && (id & 1) !== 1) {
return this.emit('error',
new Error(`Initiator can't have even id's!`))
}

const chan = this._newStream(id, this._initiator, true, data.toString())
Expand All @@ -117,13 +146,17 @@ class Mplex extends EE {
case consts.type.IN_CLOSE: {
const chan = this._channels[id]
if (chan) {
chan.end()
chan.close()
}
return cb()
}

case consts.type.OUT_RESET:
case consts.type.IN_RESET: {
const chan = this._channels[id]
if (chan) {
chan.reset()
}
return cb()
}
}
Expand Down
25 changes: 10 additions & 15 deletions src/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,19 @@ exports.encodeMsg = (id, type, data, cb) => {
}

exports.decodeMsg = (msg, cb) => {
let h = null
return pull(
cat([
pull(
pull.values([msg.slice(0, 1)]),
through(function (h) {
const header = varint.decode(h)
this.queue({ id: header >> 3, type: header & 7 })
this.queue(null)
})
),
pull(
pull.values([msg.slice(1)]),
lp.decode()
)
]),
pull.values([msg]),
through(function (buf) {
const header = varint.decode(buf)
h = { id: header >> 3, type: header & 7 }
this.queue(buf.slice(varint.decode.bytes))
this.queue(null)
}),
lp.decode(),
pull.collect((err, data) => {
if (err) { return cb(err) }
cb(null, data)
cb(null, [h, data[0]])
})
)
}
Loading

0 comments on commit 8dccda1

Please sign in to comment.