Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
feat: mplex is all here
Browse files Browse the repository at this point in the history
  • Loading branch information
daviddias committed Feb 19, 2018
1 parent 0df73ef commit 48a6075
Show file tree
Hide file tree
Showing 9 changed files with 1,180 additions and 31 deletions.
16 changes: 11 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,29 @@
},
"homepage": "https://github.com/libp2p/js-libp2p-multiplex#readme",
"devDependencies": {
"aegir": "^12.2.0",
"aegir": "^13.0.1",
"chai": "^4.1.2",
"dirty-chai": "^2.0.1",
"interface-stream-muxer": "~0.5.9",
"libp2p-tcp": "^0.11.1",
"libp2p-tcp": "^0.11.5",
"libp2p-websockets": "~0.10.4",
"pre-commit": "^1.2.2",
"pull-pair": "^1.1.0"
},
"dependencies": {
"async": "^2.6.0",
"multiplex": "dignifiedquire/multiplex",
"chunky": "0.0.0",
"concat-stream": "^1.6.0",
"debug": "^3.1.0",
"duplexify": "^3.5.3",
"pull-catch": "^1.0.0",
"pull-stream": "^3.6.1",
"pull-stream-to-stream": "^1.3.4",
"pump": "^2.0.0",
"stream-to-pull-stream": "^1.7.2"
"pump": "^3.0.0",
"readable-stream": "^2.3.4",
"stream-to-pull-stream": "^1.7.2",
"through2": "^2.0.3",
"varint": "^5.0.0"
},
"contributors": [
"David Dias <daviddias.p@gmail.com>",
Expand Down
File renamed without changes.
12 changes: 5 additions & 7 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
'use strict'

const Multiplex = require('multiplex')
const toStream = require('pull-stream-to-stream')

const MULTIPLEX_CODEC = require('./multiplex-codec')
const MplexCore = require('./internals')
const MULTIPLEX_CODEC = require('./codec')
const Muxer = require('./muxer')

const pump = require('pump')

function create (rawConn, isListener) {
const stream = toStream(rawConn)

// Cleanup and destroy the connection when it ends
// as the converted stream doesn't emit 'close'
// but .destroy will trigger a 'close' event.
// Cleanup and destroy the connection when it ends as the converted stream
// doesn't emit 'close' but .destroy will trigger a 'close' event.
stream.on('end', () => stream.destroy())

const mpx = new Multiplex({
const mpx = new MplexCore({
halfOpen: true,
initiator: !isListener
})
Expand Down
194 changes: 194 additions & 0 deletions src/internals/channel.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
'use strict'
/* @flow */

const EventEmitter = require('events').EventEmitter
const stream = require('readable-stream')
const debug = require('debug')

/* :: import type Multiplex from './index'
export type ChannelOpts = {
chunked?: bool,
halfOpen?: bool,
lazy?: bool
}
*/

class Channel extends stream.Duplex {
constructor (name/* : Buffer | string */, plex/* : Multiplex */, opts/* : ChannelOpts = {} */) {
const halfOpen = Boolean(opts.halfOpen)
super({
allowHalfOpen: halfOpen
})

this.name = name
this.log = debug('mplex:channel:' + this.name.toString())
this.channel = 0
this.initiator = false
this.chunked = Boolean(opts.chunked)
this.halfOpen = halfOpen
this.destroyed = false
this.finalized = false

this._multiplex = plex
this._dataHeader = 0
this._opened = false
this._awaitDrain = 0
this._lazy = Boolean(opts.lazy)

let finished = false
let ended = false
this.log('open, halfOpen: ' + this.halfOpen)

this.once('end', () => {
this.log('end')
this._read() // trigger drain

if (this.destroyed) {
return
}

ended = true
if (finished) {
this._finalize()
} else if (!this.halfOpen) {
this.end()
}
})

this.once('finish', function onfinish () {
if (this.destroyed) {
return
}

if (!this._opened) {
return this.once('open', onfinish)
}

if (this._lazy && this.initiator) {
this._open()
}

this._multiplex._send(
this.channel << 3 | (this.initiator ? 4 : 3),
null
)

finished = true

if (ended) {
this._finalize()
}
})
}

destroy (err/* : Error */) {
this._destroy(err, true)
}

_destroy (err/* : Error */, local/* : bool */) {
this.log('_destroy:' + (local ? 'local' : 'remote'))
if (this.destroyed) {
this.log('already destroyed')
return
}

this.destroyed = true

const hasErrorListeners = EventEmitter.listenerCount(this, 'error') > 0

if (err && (!local || hasErrorListeners)) {
this.emit('error', err)
}

this.emit('close')

if (local && this._opened) {
if (this._lazy && this.initiator) {
this._open()
}

const msg = err ? new Buffer(err.message) : null
try {
this._multiplex._send(
this.channel << 3 | (this.initiator ? 6 : 5),
msg
)
} catch (e) {}
}

this._finalize()
}

_finalize () {
if (this.finalized) {
return
}

this.finalized = true
this.emit('finalize')
}

_write (data/* : Buffer */, enc/* : string */, cb/* : () => void */) {
this.log('write: ', data.length)
if (!this._opened) {
this.once('open', () => {
this._write(data, enc, cb)
})
return
}

if (this.destroyed) {
cb()
return
}

if (this._lazy && this.initiator) {
this._open()
}

const drained = this._multiplex._send(
this._dataHeader,
data
)

if (drained) {
cb()
return
}

this._multiplex._ondrain.push(cb)
}

_read () {
if (this._awaitDrain) {
const drained = this._awaitDrain
this._awaitDrain = 0
this._multiplex._onchanneldrain(drained)
}
}

_open () {
let buf = null
if (Buffer.isBuffer(this.name)) {
buf = this.name
} else if (this.name !== this.channel.toString()) {
buf = new Buffer(this.name)
}

this._lazy = false
this._multiplex._send(this.channel << 3 | 0, buf)
}

open (channel/* : number */, initiator/* : bool */) {
this.log('open: ' + channel)
this.channel = channel
this.initiator = initiator
this._dataHeader = channel << 3 | (initiator ? 2 : 1)
this._opened = true
if (!this._lazy && this.initiator) this._open()
this.emit('open')
}
}

module.exports = Channel
Loading

0 comments on commit 48a6075

Please sign in to comment.