Skip to content

Commit

Permalink
feat: initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
dryajov authored and jacobheun committed Feb 7, 2019
1 parent e37eedf commit 1843747
Show file tree
Hide file tree
Showing 7 changed files with 606 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package-lock.json
yarn.lock
docs

node_modules/**
.vscode/**
43 changes: 43 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
"name": "mplex",
"version": "0.0.1",
"description": "multiplex implementation of https://github.com/libp2p/mplex",
"main": "src/index.js",
"scripts": {
"lint": "aegir lint",
"build": "aegir build",
"test": "aegir test -t node -t browser",
"test:node": "aegir test -t node",
"test:browser": "aegir test -t browser",
"release": "aegir release -t node -t browser",
"release-minor": "aegir release --type minor -t node -t browser",
"release-major": "aegir release --type major -t node -t browser",
"coverage": "aegir coverage",
"coverage-publish": "aegir coverage --provider coveralls"
},
"author": "",
"license": "ISC",
"devDependencies": {
"aegir": "^13.0.6"
},
"dependencies": {
"async": "^2.6.0",
"chai": "^4.1.2",
"debug": "^3.1.0",
"dirty-chai": "^2.0.1",
"interface-connection": "^0.3.2",
"pull-batch": "^1.0.0",
"pull-cat": "^1.1.11",
"pull-defer": "^0.2.2",
"pull-handshake": "^1.1.4",
"pull-length-prefixed": "^1.3.0",
"pull-looper": "^1.0.0",
"pull-many": "^1.0.8",
"pull-offset-limit": "^1.1.1",
"pull-pair": "^1.1.0",
"pull-pushable": "^2.2.0",
"pull-stream": "^3.6.2",
"pull-through": "^1.0.18",
"varint": "^5.0.0"
}
}
167 changes: 167 additions & 0 deletions src/channel.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
'use strict'

const pushable = require('pull-pushable')

const consts = require('./consts')
const utils = require('./utils')
const EE = require('events')

const debug = require('debug')

const log = debug('pull-mplex')
log.err = debug('pull-mplex:err')

class Channel extends EE {
constructor (id, name, plex, initiator, open) {
super()
this._id = id
this._name = name || this._id.toString()
this._plex = plex
this._open = open
this._initiator = initiator
this._msgs = pushable((err) => {
this._endedLocal = err || true
this.emit('end', err)
})
this._cb = null // queue cb for async data
this._endedRemote = false // remote stream ended
this._endedLocal = false // local stream ended

this._log = (name, data) => {
log({
op: name,
channel: this._name,
id: this._id,
localEnded: this._endedLocal,
remoteEnded: this._endedRemote,
initiator: this._initiator,
data: data || ''
})
}

this._log('new channel', this._name)

this.source = this._msgs

this.sink = (read) => {
const next = (end, data) => {
this._log('sink', data)

// stream already ended
if (this._endedLocal && this._endedRemote) { return }

this._endedLocal = end || false

// source ended, close the stream
if (end === true) {
this.endChan((err) => {
if (err) { log.err(err) }
})
return
}

// source errored, reset stream
if (end) { return this.emit('error', err) }

// just send
return this.sendMsg(data, (err) => {
read(err, next)
})
}

read(null, next)
}
}

get open () {
return this._open
}

set open (open) {
this._open = open
}

push (data) {
this._log('push', data)
this._msgs.push(data)
// this._drain()
}

end (err) {
this._log('end')
this._msgs.end(err)
this._endedRemote = err || true
}

openChan (cb) {
this._log('openChan')

utils.encodeMsg(this._id,
consts.NEW,
this._name,
(err, data) => {
if (err) {
log.err(err)
return cb(err)
}

this._plex.push(data)
this.open = true
cb()
})
}

sendMsg (data, cb) {
this._log('sendMsg', data)

if (!this.open) {
return this.openChan((err) => {
if (err) {
log.err(err)
return cb(err)
}

this.sendMsg(data, cb)
})
}

utils.encodeMsg(this._id,
this._initiator
? consts.type.OUT_MESSAGE
: consts.type.IN_MESSAGE,
data,
(err, data) => {
if (err) {
log.err(err)
return cb(err)
}

this._plex.push(data)
cb()
})
}

endChan (cb) {
this._log('endChan')

if (!this.open) {
return cb()
}

utils.encodeMsg(this._id,
this._initiator
? consts.type.OUT_CLOSE
: consts.type.IN_CLOSE,
'',
(err, data) => {
if (err) {
log.err(err)
return cb(err)
}
this._plex.push(data)
cb()
})
}
}

module.exports = Channel
9 changes: 9 additions & 0 deletions src/consts.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
exports.type = {
NEW: 0,
OUT_MESSAGE: 1,
IN_MESSAGE: 2,
OUT_CLOSE: 3,
IN_CLOSE: 4,
OUT_RESET: 5,
IN_RESET: 6
}
131 changes: 131 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
'use strict'

const pushable = require('pull-pushable')

const EE = require('events')

const Channel = require('./channel')
const consts = require('./consts')
const utils = require('./utils')

class Mplex extends EE {
constructor (initiator) {
super()
this._initiator = initiator || false
this._chanId = this._initiator ? 0 : 1
this._channels = {}
this._chandata = pushable()
this._cb = null

this.source = this._chandata

this.sink = (read) => {
const next = (end, data) => {
// if (end) {
// // propagate close to channels
// Object
// .keys(this._channels)
// .forEach((id) => {
// this._channels[id].end(end)
// delete this._channels[id]
// })
// }

if (end === true) { return }
if (end) { return this.emit('error', end) }
return this._handle(data, (err) => {
read(err, next)
})
}

read(null, next)
}
}

push (data) {
this._chandata.push(data)
// this._drain()
}

nextChanId (initiator) {
let inc = 1
if (initiator) { inc += 1 }
this._chanId += inc + 1
return this._chanId
}

newStream (name) {
return this._newStream(null, this._initiator, false, name)
}

_newStream (id, initiator, open, name) {
if (typeof initiator === 'string') {
name = initiator
initiator = false
open = false
}

if (typeof open === 'string') {
name = open
open = false
}

id = id || this.nextChanId(initiator)
const chan = new Channel(id,
name || id.toString(),
this,
initiator,
open || false)
this._channels[id] = chan
return chan
}

_handle (msg, cb) {
utils.decodeMsg(msg, (err, _data) => {
if (err) { return cb(err) }
const { id, type } = _data[0]
const data = _data[1]
switch (type) {
case consts.type.NEW: {
if (this._initiator && (id & 1) !== 1) {
return this.emit(
'error',
new Error(`stream initiator can't have even ids`))
}

const chan = this._newStream(id, this._initiator, true, data.toString())
chan.once('end', () => {
delete this._channels[id]
})
setImmediate(() => this.emit('stream', chan))
return cb()
}

case consts.type.OUT_MESSAGE:
case consts.type.IN_MESSAGE: {
const chan = this._channels[id]
if (chan) {
chan.push(data)
}
return cb()
}

case consts.type.OUT_CLOSE:
case consts.type.IN_CLOSE: {
const chan = this._channels[id]
if (chan) {
chan.end()
}
return cb()
}

case consts.type.OUT_RESET:
case consts.type.IN_RESET: {
return cb()
}
}
})
}
}

module.exports = Mplex
Loading

0 comments on commit 1843747

Please sign in to comment.