Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

Commit

Permalink
fix: handling of ipfs addresses in available transports
Browse files Browse the repository at this point in the history
and some refactoring into multiple files
  • Loading branch information
dignifiedquire committed May 11, 2016
1 parent 163624c commit 8e1413b
Show file tree
Hide file tree
Showing 7 changed files with 387 additions and 328 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,4 @@
"Richard Littauer <richard.littauer@gmail.com>",
"dignifiedquire <dignifiedquire@gmail.com>"
]
}
}
65 changes: 65 additions & 0 deletions src/connection.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
'use strict'

const connHandler = require('./default-handler')
const identify = require('./identify')

module.exports = function connection (swarm) {
return {
addUpgrade () {},

addStreamMuxer (muxer) {
// for dialing
swarm.muxers[muxer.multicodec] = muxer

// for listening
swarm.handle(muxer.multicodec, (conn) => {
const muxedConn = muxer(conn, true)

var peerIdForConn

muxedConn.on('stream', (conn) => {
function gotId () {
if (peerIdForConn) {
conn.peerId = peerIdForConn
connHandler(swarm.protocols, conn)
} else {
setTimeout(gotId, 100)
}
}

if (swarm.identify) {
return gotId()
}

connHandler(swarm.protocols, conn)
})

// if identify is enabled, attempt to do it for muxer reuse
if (swarm.identify) {
identify.exec(conn, muxedConn, swarm._peerInfo, (err, pi) => {
if (err) {
return console.log('Identify exec failed', err)
}

peerIdForConn = pi.id
swarm.muxedConns[pi.id.toB58String()] = {}
swarm.muxedConns[pi.id.toB58String()].muxer = muxedConn
swarm.muxedConns[pi.id.toB58String()].conn = conn // to be able to extract addrs

swarm.emit('peer-mux-established', pi)

muxedConn.on('close', () => {
delete swarm.muxedConns[pi.id.toB58String()]
swarm.emit('peer-mux-closed', pi)
})
})
}
})
},

reuse () {
swarm.identify = true
swarm.handle(identify.multicodec, identify.handler(swarm._peerInfo, swarm))
}
}
}
18 changes: 18 additions & 0 deletions src/default-handler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
'use strict'

const multistream = require('multistream-select')

// incomming connection handler
module.exports = function connHandler (protocols, conn) {
var msS = new multistream.Select()

Object.keys(protocols).forEach((protocol) => {
if (!protocol) {
return
}

msS.addHandler(protocol, protocols[protocol])
})

msS.handle(conn)
}
161 changes: 161 additions & 0 deletions src/dial.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
'use strict'

const multistream = require('multistream-select')
const DuplexPassThrough = require('duplex-passthrough')

const connHandler = require('./default-handler')

module.exports = function dial (swarm) {
return (pi, protocol, callback) => {
if (typeof protocol === 'function') {
callback = protocol
protocol = null
}

if (!callback) {
callback = function noop () {}
}

const pt = new DuplexPassThrough()

const b58Id = pi.id.toB58String()

if (!swarm.muxedConns[b58Id]) {
if (!swarm.conns[b58Id]) {
attemptDial(pi, (err, conn) => {
if (err) {
return callback(err)
}
gotWarmedUpConn(conn)
})
} else {
const conn = swarm.conns[b58Id]
swarm.conns[b58Id] = undefined
gotWarmedUpConn(conn)
}
} else {
if (!protocol) {
return callback()
}
gotMuxer(swarm.muxedConns[b58Id].muxer)
}

return pt

function gotWarmedUpConn (conn) {
attemptMuxerUpgrade(conn, (err, muxer) => {
if (!protocol) {
if (err) {
swarm.conns[b58Id] = conn
}
return callback()
}

if (err) {
// couldn't upgrade to Muxer, it is ok
protocolHandshake(conn, protocol, callback)
} else {
gotMuxer(muxer)
}
})
}

function gotMuxer (muxer) {
openConnInMuxedConn(muxer, (conn) => {
protocolHandshake(conn, protocol, callback)
})
}

function attemptDial (pi, cb) {
const tKeys = swarm.availableTransports(pi)

if (tKeys.length === 0) {
return cb(new Error('No available tranport to dial to'))
}

nextTransport(tKeys.shift())

function nextTransport (key) {
const multiaddrs = pi.multiaddrs.slice()
swarm.transport.dial(key, multiaddrs, (err, conn) => {
if (err) {
if (tKeys.length === 0) {
return cb(new Error('Could not dial in any of the transports'))
}
return nextTransport(tKeys.shift())
}
cb(null, conn)
})
}
}

function attemptMuxerUpgrade (conn, cb) {
const muxers = Object.keys(swarm.muxers)
if (muxers.length === 0) {
return cb(new Error('no muxers available'))
}

// 1. try to handshake in one of the muxers available
// 2. if succeeds
// - add the muxedConn to the list of muxedConns
// - add incomming new streams to connHandler

nextMuxer(muxers.shift())

function nextMuxer (key) {
var msI = new multistream.Interactive()
msI.handle(conn, function () {
msI.select(key, (err, conn) => {
if (err) {
if (muxers.length === 0) {
cb(new Error('could not upgrade to stream muxing'))
} else {
nextMuxer(muxers.shift())
}
return
}

const muxedConn = swarm.muxers[key](conn, false)
swarm.muxedConns[b58Id] = {}
swarm.muxedConns[b58Id].muxer = muxedConn
swarm.muxedConns[b58Id].conn = conn

swarm.emit('peer-mux-established', pi)

muxedConn.on('close', () => {
delete swarm.muxedConns[pi.id.toB58String()]
swarm.emit('peer-mux-closed', pi)
})

// in case identify is on
muxedConn.on('stream', (conn) => {
conn.peerId = pi.id
connHandler(swarm.protocols, conn)
})

cb(null, muxedConn)
})
})
}
}

function openConnInMuxedConn (muxer, cb) {
cb(muxer.newStream())
}

function protocolHandshake (conn, protocol, cb) {
var msI = new multistream.Interactive()
msI.handle(conn, function () {
msI.select(protocol, (err, conn) => {
if (err) {
return callback(err)
}

pt.wrapStream(conn)
pt.peerId = pi.id
callback(null, pt)
})
})
}
}
}
Loading

0 comments on commit 8e1413b

Please sign in to comment.