diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..02f1c4e --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +package-lock.json +yarn.lock +docs + +node_modules/** +.vscode/** diff --git a/package.json b/package.json new file mode 100644 index 0000000..3545b05 --- /dev/null +++ b/package.json @@ -0,0 +1,43 @@ +{ + "name": "mplex", + "version": "0.0.1", + "description": "multiplex implementation of https://github.com/libp2p/mplex", + "main": "src/index.js", + "scripts": { + "lint": "aegir lint", + "build": "aegir build", + "test": "aegir test -t node -t browser", + "test:node": "aegir test -t node", + "test:browser": "aegir test -t browser", + "release": "aegir release -t node -t browser", + "release-minor": "aegir release --type minor -t node -t browser", + "release-major": "aegir release --type major -t node -t browser", + "coverage": "aegir coverage", + "coverage-publish": "aegir coverage --provider coveralls" + }, + "author": "", + "license": "ISC", + "devDependencies": { + "aegir": "^13.0.6" + }, + "dependencies": { + "async": "^2.6.0", + "chai": "^4.1.2", + "debug": "^3.1.0", + "dirty-chai": "^2.0.1", + "interface-connection": "^0.3.2", + "pull-batch": "^1.0.0", + "pull-cat": "^1.1.11", + "pull-defer": "^0.2.2", + "pull-handshake": "^1.1.4", + "pull-length-prefixed": "^1.3.0", + "pull-looper": "^1.0.0", + "pull-many": "^1.0.8", + "pull-offset-limit": "^1.1.1", + "pull-pair": "^1.1.0", + "pull-pushable": "^2.2.0", + "pull-stream": "^3.6.2", + "pull-through": "^1.0.18", + "varint": "^5.0.0" + } +} diff --git a/src/channel.js b/src/channel.js new file mode 100644 index 0000000..a2e6a1e --- /dev/null +++ b/src/channel.js @@ -0,0 +1,167 @@ +'use strict' + +const pushable = require('pull-pushable') + +const consts = require('./consts') +const utils = require('./utils') +const EE = require('events') + +const debug = require('debug') + +const log = debug('pull-mplex') +log.err = debug('pull-mplex:err') + +class Channel extends EE { + constructor (id, name, plex, initiator, open) { + super() + this._id = id + this._name = name || this._id.toString() + this._plex = plex + this._open = open + this._initiator = initiator + this._msgs = pushable((err) => { + this._endedLocal = err || true + this.emit('end', err) + }) + this._cb = null // queue cb for async data + this._endedRemote = false // remote stream ended + this._endedLocal = false // local stream ended + + this._log = (name, data) => { + log({ + op: name, + channel: this._name, + id: this._id, + localEnded: this._endedLocal, + remoteEnded: this._endedRemote, + initiator: this._initiator, + data: data || '' + }) + } + + this._log('new channel', this._name) + + this.source = this._msgs + + this.sink = (read) => { + const next = (end, data) => { + this._log('sink', data) + + // stream already ended + if (this._endedLocal && this._endedRemote) { return } + + this._endedLocal = end || false + + // source ended, close the stream + if (end === true) { + this.endChan((err) => { + if (err) { log.err(err) } + }) + return + } + + // source errored, reset stream + if (end) { return this.emit('error', err) } + + // just send + return this.sendMsg(data, (err) => { + read(err, next) + }) + } + + read(null, next) + } + } + + get open () { + return this._open + } + + set open (open) { + this._open = open + } + + push (data) { + this._log('push', data) + this._msgs.push(data) + // this._drain() + } + + end (err) { + this._log('end') + this._msgs.end(err) + this._endedRemote = err || true + } + + openChan (cb) { + this._log('openChan') + + utils.encodeMsg(this._id, + consts.NEW, + this._name, + (err, data) => { + if (err) { + log.err(err) + return cb(err) + } + + this._plex.push(data) + this.open = true + cb() + }) + } + + sendMsg (data, cb) { + this._log('sendMsg', data) + + if (!this.open) { + return this.openChan((err) => { + if (err) { + log.err(err) + return cb(err) + } + + this.sendMsg(data, cb) + }) + } + + utils.encodeMsg(this._id, + this._initiator + ? consts.type.OUT_MESSAGE + : consts.type.IN_MESSAGE, + data, + (err, data) => { + if (err) { + log.err(err) + return cb(err) + } + + this._plex.push(data) + cb() + }) + } + + endChan (cb) { + this._log('endChan') + + if (!this.open) { + return cb() + } + + utils.encodeMsg(this._id, + this._initiator + ? consts.type.OUT_CLOSE + : consts.type.IN_CLOSE, + '', + (err, data) => { + if (err) { + log.err(err) + return cb(err) + } + this._plex.push(data) + cb() + }) + } +} + +module.exports = Channel diff --git a/src/consts.js b/src/consts.js new file mode 100644 index 0000000..af11626 --- /dev/null +++ b/src/consts.js @@ -0,0 +1,9 @@ +exports.type = { + NEW: 0, + OUT_MESSAGE: 1, + IN_MESSAGE: 2, + OUT_CLOSE: 3, + IN_CLOSE: 4, + OUT_RESET: 5, + IN_RESET: 6 +} diff --git a/src/index.js b/src/index.js new file mode 100644 index 0000000..59dbc4d --- /dev/null +++ b/src/index.js @@ -0,0 +1,131 @@ +'use strict' + +const pushable = require('pull-pushable') + +const EE = require('events') + +const Channel = require('./channel') +const consts = require('./consts') +const utils = require('./utils') + +class Mplex extends EE { + constructor (initiator) { + super() + this._initiator = initiator || false + this._chanId = this._initiator ? 0 : 1 + this._channels = {} + this._chandata = pushable() + this._cb = null + + this.source = this._chandata + + this.sink = (read) => { + const next = (end, data) => { + // if (end) { + // // propagate close to channels + // Object + // .keys(this._channels) + // .forEach((id) => { + // this._channels[id].end(end) + // delete this._channels[id] + // }) + // } + + if (end === true) { return } + if (end) { return this.emit('error', end) } + return this._handle(data, (err) => { + read(err, next) + }) + } + + read(null, next) + } + } + + push (data) { + this._chandata.push(data) + // this._drain() + } + + nextChanId (initiator) { + let inc = 1 + if (initiator) { inc += 1 } + this._chanId += inc + 1 + return this._chanId + } + + newStream (name) { + return this._newStream(null, this._initiator, false, name) + } + + _newStream (id, initiator, open, name) { + if (typeof initiator === 'string') { + name = initiator + initiator = false + open = false + } + + if (typeof open === 'string') { + name = open + open = false + } + + id = id || this.nextChanId(initiator) + const chan = new Channel(id, + name || id.toString(), + this, + initiator, + open || false) + this._channels[id] = chan + return chan + } + + _handle (msg, cb) { + utils.decodeMsg(msg, (err, _data) => { + if (err) { return cb(err) } + const { id, type } = _data[0] + const data = _data[1] + switch (type) { + case consts.type.NEW: { + if (this._initiator && (id & 1) !== 1) { + return this.emit( + 'error', + new Error(`stream initiator can't have even ids`)) + } + + const chan = this._newStream(id, this._initiator, true, data.toString()) + chan.once('end', () => { + delete this._channels[id] + }) + setImmediate(() => this.emit('stream', chan)) + return cb() + } + + case consts.type.OUT_MESSAGE: + case consts.type.IN_MESSAGE: { + const chan = this._channels[id] + if (chan) { + chan.push(data) + } + return cb() + } + + case consts.type.OUT_CLOSE: + case consts.type.IN_CLOSE: { + const chan = this._channels[id] + if (chan) { + chan.end() + } + return cb() + } + + case consts.type.OUT_RESET: + case consts.type.IN_RESET: { + return cb() + } + } + }) + } +} + +module.exports = Mplex diff --git a/src/utils.js b/src/utils.js new file mode 100644 index 0000000..f399bfb --- /dev/null +++ b/src/utils.js @@ -0,0 +1,46 @@ +'use strict' + +const pull = require('pull-stream') +const varint = require('varint') +const lp = require('pull-length-prefixed') +const cat = require('pull-cat') +const through = require('pull-through') + +exports.encodeMsg = (id, type, data, cb) => { + return pull( + cat([ + pull.values([varint.encode(id << 3 | type)]), + pull( + pull.values([Buffer.from(data)]), + lp.encode() + ) + ]), + pull.flatten(), + pull.collect((err, data) => { + if (err) { return cb(err) } + cb(null, Buffer.from(data)) + }) + ) +} + +exports.decodeMsg = (msg, cb) => { + return pull( + cat([ + pull( + pull.values([msg.slice(0, 1)]), + through(function (h) { + const header = varint.decode(h) + this.queue({ id: header >> 3, type: header & 7 }) + }) + ), + pull( + pull.values([msg.slice(1)]), + lp.decode() + ) + ]), + pull.collect((err, data) => { + if (err) { return cb(err) } + cb(null, data) + }) + ) +} diff --git a/test/channel.spec.js b/test/channel.spec.js new file mode 100644 index 0000000..c0ce268 --- /dev/null +++ b/test/channel.spec.js @@ -0,0 +1,204 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 5] */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) + +const pull = require('pull-stream') +const pair = require('pull-pair/duplex') + +const Mplex = require('../src') +const utils = require('../src/utils') +const consts = require('../src/consts') + +const series = require('async/series') + +describe('channel', () => { + it('should be writable', (done) => { + const plex = new Mplex(false) + + plex.on('stream', (stream) => { + pull(pull.values([Buffer.from('hellooooooooooooo')]), stream) + }) + + utils.encodeMsg(3, + consts.type.NEW, + Buffer.from('chan1'), + (err, msg) => { + expect(err).to.not.exist() + pull( + pull.values([msg]), + plex, + pull.drain((_data) => { + expect(err).to.not.exist() + utils.decodeMsg(_data, (err, data) => { + expect(err).to.not.exist() + const { id, type } = data[0] + expect(id).to.eql(3) + expect(type).to.eql(consts.type.IN_MESSAGE) + expect(data[1]).to.deep.eql(Buffer.from('hellooooooooooooo')) + done() + }) + }) + ) + }) + }) + + it('should be readable', (done) => { + const plex = new Mplex() + + plex.on('stream', (stream) => { + pull( + stream, + // drain, because otherwise we have to send an explicit close + pull.drain((data) => { + expect(data).to.deep.eql(Buffer.from('hellooooooooooooo')) + done() + }) + ) + }) + + series([ + (cb) => utils.encodeMsg(3, + consts.type.NEW, + Buffer.from('chan1'), cb), + (cb) => utils.encodeMsg(3, + consts.type.IN_MESSAGE, + Buffer.from('hellooooooooooooo'), + cb) + ], (err, msgs) => { + expect(err).to.not.exist() + pull( + pull.values(msgs), + plex + ) + }) + }) + + it('initiator should be able to send data', (done) => { + const p = pair() + + const plex1 = new Mplex(true) + const plex2 = new Mplex(false) + + pull(plex1, p[0], plex1) + pull(plex2, p[1], plex2) + + const stream = plex1._newStream(plex1.nextChanId(true), true, 'stream 1') + pull( + pull.values([Buffer.from('hello from plex1!!')]), + stream + ) + + plex2.on('stream', (stream) => { + pull( + stream, + pull.collect((err, data) => { + expect(err).to.not.exist() + expect(data[0]).to.deep.eql(Buffer.from('hello from plex1!!')) + done() + }) + ) + }) + }) + + it('receiver should be able to send data', (done) => { + const p = pair() + + const plex1 = new Mplex() + const plex2 = new Mplex() + + pull(plex1, p[0], plex1) + pull(plex2, p[1], plex2) + + const id = plex1.nextChanId(true) + const chan1 = plex1._newStream(id, true, true, 'stream 1') + + const chan2 = plex2._newStream(id, false, true, 'stream 2') + + pull( + pull.values([Buffer.from('hello from plex2!!')]), + chan2 + ) + + pull( + chan1, + pull.collect((err, data) => { + expect(err).to.not.exist() + expect(data[0]).to.deep.eql(Buffer.from('hello from plex2!!')) + done() + }) + ) + }) + + it('sending close msg finalizes stream', (done) => { + const plex = new Mplex() + + plex.on('stream', (stream) => { + pull( + stream, + pull.collect((err, data) => { + expect(err).to.not.exist() + expect(stream._endedRemote).to.be.ok() + expect(stream._endedLocal).to.be.ok() + expect(data[0]).to.deep.eql(Buffer.from('hellooooooooooooo')) + done() + }) + ) + }) + + series([ + (cb) => utils.encodeMsg(3, + consts.type.NEW, + Buffer.from('chan1'), cb), + (cb) => utils.encodeMsg(3, + consts.type.IN_MESSAGE, + Buffer.from('hellooooooooooooo'), + cb), + (cb) => utils.encodeMsg(3, + consts.type.IN_CLOSE, + Buffer.from([]), + cb) + ], (err, msgs) => { + expect(err).to.not.exist() + pull( + pull.values(msgs), + plex + ) + }) + }) + + it('closing channel should allow reading but not writing', (done) => { + const p = pair() + + const plex1 = new Mplex(true) + const plex2 = new Mplex(false) + + pull(plex1, p[0], plex1) + pull(plex2, p[1], plex2) + + const chan1 = plex1.newStream('stream 1') + + pull( + pull.values([Buffer.from('hello')]), + chan1, + pull.through(d => console.dir(d.toString())), + pull.collect((err, data) => { + expect(err).to.not.exist() + expect(data[0]).to.deep.eql(Buffer.from('hello')) + done() + }) + ) + + plex2.on('stream', (stream) => { + pull( + stream, + pull.through(d => console.dir(d.toString())), + stream + ) + }) + }) +})