Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

Commit

Permalink
starting to observe
Browse files Browse the repository at this point in the history
  • Loading branch information
pgte committed Feb 16, 2018
1 parent 52967cd commit e0d175d
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 25 deletions.
7 changes: 2 additions & 5 deletions src/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ const setImmediate = require('async/setImmediate')

const Circuit = require('libp2p-circuit')

const protocolMuxer = require('./protocol-muxer')
const plaintext = require('./plaintext')

module.exports = function connection (swarm) {
Expand All @@ -25,9 +24,7 @@ module.exports = function connection (swarm) {
swarm.handle(muxer.multicodec, (protocol, conn) => {
const muxedConn = muxer.listener(conn)

muxedConn.on('stream', (conn) => {
protocolMuxer(swarm.protocols, conn)
})
muxedConn.on('stream', swarm.protocolMuxer)

// If identify is enabled
// 1. overload getPeerInfo
Expand Down Expand Up @@ -120,7 +117,7 @@ module.exports = function connection (swarm) {
swarm.handle(tag, (protocol, conn) => {
const myId = swarm._peerInfo.id
const secure = encrypt(myId, conn, undefined, () => {
protocolMuxer(swarm.protocols, secure)
swarm.protocolMuxer(secure)
})
})

Expand Down
4 changes: 1 addition & 3 deletions src/dial.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ const Circuit = require('libp2p-circuit')
const debug = require('debug')
const log = debug('libp2p:swarm:dial')

const protocolMuxer = require('./protocol-muxer')

function dial (swarm) {
return (peer, protocol, callback) => {
if (typeof protocol === 'function') {
Expand Down Expand Up @@ -189,7 +187,7 @@ function dial (swarm) {

// For incoming streams, in case identify is on
muxedConn.on('stream', (conn) => {
protocolMuxer(swarm.protocols, conn)
swarm.protocolMuxer(conn)
})

setImmediate(() => swarm.emit('peer-mux-established', pi))
Expand Down
8 changes: 6 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ const transport = require('./transport')
const connection = require('./connection')
const getPeerInfo = require('./get-peer-info')
const dial = require('./dial')
const protocolMuxer = require('./protocol-muxer')
const ProtocolMuxer = require('./protocol-muxer')
const plaintext = require('./plaintext')
const Observer = require('./observer')
const assert = require('assert')

class Switch extends EE {
Expand Down Expand Up @@ -69,10 +70,13 @@ class Switch extends EE {
})
}

this.observer = Observer()
this.protocolMuxer = ProtocolMuxer(this.protocols, this.observer)

this.handle(this.crypto.tag, (protocol, conn) => {
const peerId = this._peerInfo.id
const wrapped = this.crypto.encrypt(peerId, conn, undefined, () => {})
return protocolMuxer(this.protocols, wrapped)
return this.protocolMuxer(wrapped)
})

// higher level (public) API
Expand Down
36 changes: 23 additions & 13 deletions src/protocol-muxer.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,31 @@
'use strict'

const multistream = require('multistream-select')
const observeConn = require('./observe-connection')

module.exports = function protocolMuxer (protocols, conn) {
const ms = new multistream.Listener()
module.exports = function protocolMuxer (protocols, observer) {
return (conn) => {
const ms = new multistream.Listener()

Object.keys(protocols).forEach((protocol) => {
if (!protocol) {
return
}
Object.keys(protocols).forEach((protocol) => {
if (!protocol) {
return
}

ms.addHandler(protocol, protocols[protocol].handlerFunc, protocols[protocol].matchFunc)
})
const handler = (protocol, _conn) => {
const conn = observeConn(protocol, _conn, observer)
protocols[protocol].handlerFunc(protocol, conn)
}

ms.handle(conn, (err) => {
if (err) {
// the multistream handshake failed
}
})
// const handler = protocols[protocol].handlerFunc

ms.addHandler(protocol, handler, protocols[protocol].matchFunc)
})

ms.handle(conn, (err) => {
if (err) {
// the multistream handshake failed
}
})
}
}
3 changes: 1 addition & 2 deletions src/transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ const once = require('once')
const debug = require('debug')
const log = debug('libp2p:swarm:transport')

const protocolMuxer = require('./protocol-muxer')
const LimitDialer = require('./limit-dialer')

// number of concurrent outbound dials to make per peer, same as go-libp2p-swarm
Expand Down Expand Up @@ -58,7 +57,7 @@ module.exports = function (swarm) {
listen (key, options, handler, callback) {
// if no handler is passed, we pass conns to protocolMuxer
if (!handler) {
handler = protocolMuxer.bind(null, swarm.protocols)
handler = swarm.protocolMuxer
}

const multiaddrs = dialables(swarm.transports[key], swarm._peerInfo.multiaddrs.distinct())
Expand Down

0 comments on commit e0d175d

Please sign in to comment.