Skip to content

Commit

Permalink
Merge pull request libp2p#79 from libp2p/update-transports
Browse files Browse the repository at this point in the history
update the transports
  • Loading branch information
daviddias authored Jun 23, 2016
2 parents edee20b + 0ff3a0a commit ca75c3c
Show file tree
Hide file tree
Showing 11 changed files with 165 additions and 110 deletions.
3 changes: 3 additions & 0 deletions .aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ module.exports = {
'../vendor/forge.bundle.js'
)
}
},
externals: {
'simple-websocket-server': '{}'
}
}
}
17 changes: 9 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@
},
"repository": {
"type": "git",
"url": "https://github.com/diasdavid/js-libp2p-swarm.git"
"url": "https://github.com/libp2p/js-libp2p-swarm.git"
},
"keywords": [
"IPFS"
],
"author": "David Dias <daviddias@ipfs.io>",
"license": "MIT",
"bugs": {
"url": "https://github.com/diasdavid/js-libp2p-swarm/issues"
"url": "https://github.com/libp2p/js-libp2p-swarm/issues"
},
"homepage": "https://github.com/diasdavid/js-libp2p-swarm",
"homepage": "https://github.com/libp2p/js-libp2p-swarm",
"pre-commit": [
"lint",
"test"
Expand All @@ -43,10 +43,10 @@
"gulp": "^3.9.1",
"istanbul": "^0.4.3",
"libp2p-multiplex": "^0.2.1",
"libp2p-spdy": "^0.6.1",
"libp2p-tcp": "^0.6.1",
"libp2p-webrtc-star": "^0.2.0",
"libp2p-websockets": "^0.6.1",
"libp2p-spdy": "^0.6.3",
"libp2p-tcp": "^0.7.1",
"libp2p-webrtc-star": "^0.3.1",
"libp2p-websockets": "^0.7.0",
"pre-commit": "^1.1.2",
"stream-pair": "^1.0.3",
"webrtcsupport": "^2.2.0"
Expand All @@ -56,6 +56,7 @@
"bl": "^1.1.2",
"browserify-zlib": "github:ipfs/browserify-zlib",
"duplexify": "^3.4.3",
"interface-connection": "^0.1.3",
"ip-address": "^5.8.0",
"length-prefixed-stream": "^1.5.0",
"lodash.contains": "^2.4.3",
Expand All @@ -74,4 +75,4 @@
"Richard Littauer <richard.littauer@gmail.com>",
"dignifiedquire <dignifiedquire@gmail.com>"
]
}
}
6 changes: 3 additions & 3 deletions src/connection.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const connHandler = require('./default-handler')
const protocolMuxer = require('./protocol-muxer')
const identify = require('./identify')

module.exports = function connection (swarm) {
Expand All @@ -21,7 +21,7 @@ module.exports = function connection (swarm) {
function gotId () {
if (peerIdForConn) {
conn.peerId = peerIdForConn
connHandler(swarm.protocols, conn)
protocolMuxer(swarm.protocols, conn)
} else {
setTimeout(gotId, 100)
}
Expand All @@ -32,7 +32,7 @@ module.exports = function connection (swarm) {
return gotId()
}

connHandler(swarm.protocols, conn)
protocolMuxer(swarm.protocols, conn)
})

// if identify is enabled, attempt to do it for muxer reuse
Expand Down
18 changes: 8 additions & 10 deletions src/dial.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
'use strict'

const multistream = require('multistream-select')
const Duplexify = require('duplexify')
const Connection = require('interface-connection').Connection

const connHandler = require('./default-handler')
const protocolMuxer = require('./protocol-muxer')

module.exports = function dial (swarm) {
return (pi, protocol, callback) => {
Expand All @@ -16,7 +16,7 @@ module.exports = function dial (swarm) {
callback = function noop () {}
}

const pt = new Duplexify()
const proxyConn = new Connection()

const b58Id = pi.id.toB58String()

Expand All @@ -40,7 +40,7 @@ module.exports = function dial (swarm) {
gotMuxer(swarm.muxedConns[b58Id].muxer)
}

return pt
return proxyConn

function gotWarmedUpConn (conn) {
attemptMuxerUpgrade(conn, (err, muxer) => {
Expand Down Expand Up @@ -145,7 +145,7 @@ module.exports = function dial (swarm) {
// in case identify is on
muxedConn.on('stream', (conn) => {
conn.peerId = pi.id
connHandler(swarm.protocols, conn)
protocolMuxer(swarm.protocols, conn)
})

cb(null, muxedConn)
Expand All @@ -168,11 +168,9 @@ module.exports = function dial (swarm) {
if (err) {
return callback(err)
}

pt.setReadable(conn)
pt.setWritable(conn)
pt.peerId = pi.id
callback(null, pt)
proxyConn.setInnerConn(conn)
proxyConn.peerId = pi.id
callback(null, proxyConn)
})
})
}
Expand Down
72 changes: 42 additions & 30 deletions src/identify.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ exports.exec = (rawConn, muxer, pInfo, callback) => {
return callback(err)
}
const msg = idPb.Identify.decode(data)
if (msg.observedAddr.length > 0) {
if (hasObservedAddr(msg)) {
pInfo.multiaddr.addSafe(multiaddr(msg.observedAddr))
}

Expand All @@ -66,23 +66,28 @@ exports.exec = (rawConn, muxer, pInfo, callback) => {
callback(null, otherPInfo)
}))

const obsMultiaddr = rawConn.getObservedAddrs()[0]
rawConn.getObservedAddrs((err, addrs) => {
if (err) {
return
}
const obsMultiaddr = addrs[0]

let publicKey = new Buffer(0)
if (pInfo.id.pubKey) {
publicKey = pInfo.id.pubKey.bytes
}
let publicKey = new Buffer(0)
if (pInfo.id.pubKey) {
publicKey = pInfo.id.pubKey.bytes
}

const msg = idPb.Identify.encode({
protocolVersion: 'na',
agentVersion: 'na',
publicKey: publicKey,
listenAddrs: pInfo.multiaddrs.map((mh) => mh.buffer),
observedAddr: obsMultiaddr ? obsMultiaddr.buffer : new Buffer('')
})
const msg = idPb.Identify.encode({
protocolVersion: 'na',
agentVersion: 'na',
publicKey: publicKey,
listenAddrs: pInfo.multiaddrs.map((mh) => mh.buffer),
observedAddr: obsMultiaddr ? obsMultiaddr.buffer : new Buffer('')
})

encode.write(msg)
encode.end()
encode.write(msg)
encode.end()
})
})
})
}
Expand All @@ -105,29 +110,36 @@ exports.handler = (pInfo, swarm) => {
return
}
const msg = idPb.Identify.decode(data)
if (msg.observedAddr.length > 0) {
if (hasObservedAddr(msg)) {
pInfo.multiaddr.addSafe(multiaddr(msg.observedAddr))
}

const pId = PeerId.createFromPubKey(msg.publicKey)
const conn = swarm.muxedConns[pId.toB58String()].conn
const obsMultiaddr = conn.getObservedAddrs()[0]
conn.getObservedAddrs((err, addrs) => {
if (err) {}
const obsMultiaddr = addrs[0]

let publicKey = new Buffer(0)
if (pInfo.id.pubKey) {
publicKey = pInfo.id.pubKey.bytes
}
let publicKey = new Buffer(0)
if (pInfo.id.pubKey) {
publicKey = pInfo.id.pubKey.bytes
}

const msgSend = idPb.Identify.encode({
protocolVersion: 'na',
agentVersion: 'na',
publicKey: publicKey,
listenAddrs: pInfo.multiaddrs.map((ma) => ma.buffer),
observedAddr: obsMultiaddr ? obsMultiaddr.buffer : new Buffer('')
})
const msgSend = idPb.Identify.encode({
protocolVersion: 'na',
agentVersion: 'na',
publicKey: publicKey,
listenAddrs: pInfo.multiaddrs.map((ma) => ma.buffer),
observedAddr: obsMultiaddr ? obsMultiaddr.buffer : new Buffer('')
})

encode.write(msgSend)
encode.end()
encode.write(msgSend)
encode.end()
})
}))
}
}

function hasObservedAddr (msg) {
return msg.observedAddr && msg.observedAddr.length > 0
}
18 changes: 12 additions & 6 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const contains = require('lodash.contains')
const transport = require('./transport')
const connection = require('./connection')
const dial = require('./dial')
const connHandler = require('./default-handler')
const protocolMuxer = require('./protocol-muxer')

exports = module.exports = Swarm

Expand All @@ -26,12 +26,10 @@ function Swarm (peerInfo) {
this._peerInfo = peerInfo

// transports --

// { key: transport }; e.g { tcp: <tcp> }
this.transports = {}

// connections --

// { peerIdB58: { conn: <conn> }}
this.conns = {}

Expand Down Expand Up @@ -94,7 +92,7 @@ function Swarm (peerInfo) {

// our crypto handshake :)
this.handle('/plaintext/1.0.0', (conn) => {
connHandler(this.protocols, conn)
protocolMuxer(this.protocols, conn)
})

this.unhandle = (protocol, handler) => {
Expand Down Expand Up @@ -122,8 +120,16 @@ function Swarm (peerInfo) {
this.muxedConns[key].muxer.end()
})

parallel(Object.keys(this.transports).map((key) => {
return (cb) => this.transports[key].close(cb)
const transports = this.transports

parallel(Object.keys(transports).map((key) => {
return (cb) => {
parallel(transports[key].listeners.map((listener) => {
return (cb) => {
listener.close(cb)
}
}), cb)
}
}), callback)
}
}
4 changes: 2 additions & 2 deletions src/default-handler.js → src/protocol-muxer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

const multistream = require('multistream-select')

// incomming connection handler
module.exports = function connHandler (protocols, conn) {
module.exports = function protocolMuxer (protocols, conn) {
const ms = new multistream.Listener()

Object.keys(protocols).forEach((protocol) => {
if (!protocol) {
return
Expand Down
Loading

0 comments on commit ca75c3c

Please sign in to comment.