Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

update swarm #19

Merged
merged 1 commit into from
Mar 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
"description": "libp2p swarm implementation in Node.js",
"main": "src/index.js",
"scripts": {
"test": "./node_modules/.bin/lab tests/*-test.js",
"coverage": "./node_modules/.bin/lab -t 88 tests/*-test.js",
"lint": "./node_modules/.bin/standard",
"validate": "npm ls"
"test": "mocha tests/*-test.js",
"coverage": "istanbul cover --print both -- _mocha tests/*-test.js",
"lint": "standard"
},
"repository": {
"type": "git",
Expand All @@ -24,20 +23,20 @@
"homepage": "https://github.com/diasdavid/js-libp2p-swarm",
"pre-commit": [
"lint",
"test",
"coverage"
"test"
],
"engines": {
"node": "^4.0.0"
},
"devDependencies": {
"code": "^1.4.1",
"lab": "^5.13.0",
"chai": "^3.5.0",
"istanbul": "^0.4.2",
"libp2p-spdy": "^0.1.0",
"libp2p-tcp": "^0.1.1",
"precommit-hook": "^3.0.0",
"mocha": "^2.4.5",
"pre-commit": "^1.1.2",
"sinon": "^1.15.4",
"standard": "^4.5.2",
"standard": "^6.0.7",
"stream-pair": "^1.0.3"
},
"dependencies": {
Expand All @@ -49,7 +48,6 @@
"multistream-select": "^0.6.1",
"peer-id": "^0.3.3",
"peer-info": "^0.3.2",
"protocol-buffers-stream": "^1.2.0",
"spdy-stream-muxer": "^0.6.0"
"protocol-buffers-stream": "^1.2.0"
}
}
File renamed without changes.
File renamed without changes.
342 changes: 341 additions & 1 deletion src/index.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,343 @@
var Swarm = require('./swarm')
var multistream = require('multistream-select')
var async = require('async')
var identify = require('./identify')

exports = module.exports = Swarm

function Swarm (peerInfo) {
var self = this

if (!(self instanceof Swarm)) {
throw new Error('Swarm must be called with new')
}

if (!peerInfo) {
throw new Error('You must provide a value for `peerInfo`')
}

self.peerInfo = peerInfo

// peerIdB58: { conn: <conn> }
self.conns = {}

// peerIdB58: {
// muxer: <muxer>,
// socket: socket // so we can extract the info we need for identify
// }
self.muxedConns = {}

// transportName: { transport: transport,
// dialOptions: dialOptions,
// listenOptions: listenOptions,
// listeners: [] }
self.transports = {}

// transportName: listener
self.listeners = {}

// protocolName: handlerFunc
self.protocols = {}

// muxerName: { Muxer: Muxer // Muxer is a constructor
// options: options }
self.muxers = {}

// for connection reuse
self.identify = false

// public interface

self.addTransport = function (name, transport, options, dialOptions, listenOptions, callback) {
// set up the transport and add the list of incoming streams
// add transport to the list of transports

var multiaddr = options.multiaddr
if (multiaddr) {
// no need to pass that to the transports
delete options.multiaddr
}

var listener = transport.createListener(options, listen)

listener.listen(listenOptions, function ready () {
self.transports[name] = {
transport: transport,
options: options,
dialOptions: dialOptions,
listenOptions: listenOptions,
listener: listener
}

// If a known multiaddr is passed, then add to our list of multiaddrs
if (multiaddr) {
self.peerInfo.multiaddrs.push(multiaddr)
}

callback()
})
}

self.addUpgrade = function (ConnUpgrade, options) {}

self.addStreamMuxer = function (name, StreamMuxer, options) {
self.muxers[name] = {
Muxer: StreamMuxer,
options: options
}
}

self.dial = function (peerInfo, options, protocol, callback) {
// 1. check if we have transports we support
// 2. check if we have a conn waiting
// 3. check if we have a stream muxer available

if (typeof protocol === 'function') {
callback = protocol
protocol = undefined
}

// check if a conn is waiting
// if it is and protocol was selected, jump into multistreamHandshake
// if it is and no protocol was selected, do nothing and call and empty callback

if (self.conns[peerInfo.id.toB58String()]) {
if (protocol) {
if (self.muxers['spdy']) {
// TODO upgrade this conn to a muxer
console.log('TODO: upgrade a warm conn to muxer that was added after')
} else {
multistreamHandshake(self.conns[peerInfo.id.toB58String()])
}
self.conns[peerInfo.id.toB58String()] = undefined
delete self.conns[peerInfo.id.toB58String()]
return
} else {
return callback()
}
}

// check if a stream muxer for this peer is available
if (self.muxedConns[peerInfo.id.toB58String()]) {
if (protocol) {
return openMuxedStream(self.muxedConns[peerInfo.id.toB58String()].muxer)
} else {
return callback()
}
}

// Creating a new conn with this peer routine

// TODO - check if there is a preference in protocol to use on
// options.protocol
var supportedTransports = Object.keys(self.transports)
var multiaddrs = peerInfo.multiaddrs.filter(function (multiaddr) {
return multiaddr.protoNames().some(function (proto) {
return supportedTransports.indexOf(proto) >= 0
})
})

if (!multiaddrs.length) {
callback(new Error("The swarm doesn't support any of the peer transports"))
return
}

var conn

async.eachSeries(multiaddrs, function (multiaddr, next) {
if (conn) {
return next()
}

var transportName = getTransportNameForMultiaddr(multiaddr)
var transport = self.transports[transportName]
var dialOptions = clone(transport.dialOptions)
dialOptions.ready = connected

var connTry = transport.transport.dial(multiaddr, dialOptions)

connTry.once('error', function (err) {
if (err) {
return console.log(err) // TODO handle error
}
next() // try next multiaddr
})

function connected () {
conn = connTry
next()
}

function getTransportNameForMultiaddr (multiaddr) {
// this works for all those ip + transport + port tripplets
return multiaddr.protoNames()[1]
}

function clone (obj) {
var target = {}
for (var i in obj) {
if (obj.hasOwnProperty(i)) {
target[i] = obj[i]
}
}
return target
}
}, done)

function done () {
// TODO apply upgrades
// apply stream muxer
// if no protocol is selected, save it in the pool
// if protocol is selected, multistream that protocol
if (!conn) {
callback(new Error('Unable to open a connection'))
return
}

if (self.muxers['spdy']) {
var spdy = new self.muxers['spdy'].Muxer(self.muxers['spdy'].options)
spdy.attach(conn, false, function (err, muxer) {
if (err) {
return console.log(err) // TODO Treat error
}

muxer.on('stream', userProtocolMuxer)

self.muxedConns[peerInfo.id.toB58String()] = {
muxer: muxer,
socket: conn
}

if (protocol) {
openMuxedStream(muxer)
} else {
callback()
}
})
} else {
if (protocol) {
multistreamHandshake(conn)
} else {
self.conns[peerInfo.id.toB58String()] = conn
callback()
}
}
}

function openMuxedStream (muxer) {
// 1. create a new stream on this muxedConn and pass that to
// multistreamHanshake
muxer.dialStream(function (err, conn) {
if (err) {
return console.log(err) // TODO Treat error
}
multistreamHandshake(conn)
})
}

function multistreamHandshake (conn) {
var msI = new multistream.Interactive()
msI.handle(conn, function () {
msI.select(protocol, callback)
})
}
}

self.closeListener = function (transportName, callback) {
self.transports[transportName].listener.close(closed)

// only gets called when all the streams on this transport are closed too
function closed () {
delete self.transports[transportName]
callback()
}
}

// Iterates all the listeners closing them
// one by one. It calls back once all are closed.
self.closeAllListeners = function (callback) {
var transportNames = Object.keys(self.transports)

async.each(transportNames, self.closeListener, callback)
}

self.closeConns = function (callback) {
// close warmed up cons so that the listener can gracefully exit
Object.keys(self.conns).forEach(function (conn) {
self.conns[conn].destroy()
})
self.conns = {}

callback()
}

// Closes both transport listeners and
// connections. It calls back once everything
// is closed
self.close = function (callback) {
async.parallel([
self.closeAllListeners,
self.closeConns
], callback)
}

self.enableIdentify = function () {
// set flag to true
// add identify to the list of handled protocols
self.identify = true

// we pass muxedConns so that identify can access the socket of the other
// peer
self.handleProtocol(identify.protoId,
identify.getHandlerFunction(self.peerInfo, self.muxedConns))
}

self.handleProtocol = function (protocol, handlerFunction) {
self.protocols[protocol] = handlerFunction
}

// internals

function listen (conn) {
// TODO apply upgrades
// add StreamMuxer if available (and point streams from muxer to userProtocolMuxer)

if (self.muxers['spdy']) {
var spdy = new self.muxers['spdy'].Muxer(self.muxers['spdy'].options)
spdy.attach(conn, true, function (err, muxer) {
if (err) {
return console.log(err) // TODO treat error
}

// TODO This muxer has to be identified!
// pass to identify a reference of
// our muxedConn list
// ourselves (peerInfo)
// the conn, which is the socket
// and a stream it can send stuff
if (self.identify) {
muxer.dialStream(function (err, stream) {
if (err) {
return console.log(err) // TODO Treat error
}
// conn === socket at this point
identify(self.muxedConns, self.peerInfo, conn, stream, muxer)
})
}

muxer.on('stream', userProtocolMuxer)
})
} else {
// if no stream muxer, then
userProtocolMuxer(conn)
}
}

// Handle user given protocols
function userProtocolMuxer (conn) {
var msS = new multistream.Select()
msS.handle(conn)
Object.keys(self.protocols).forEach(function (protocol) {
msS.addHandler(protocol, self.protocols[protocol])
})
}
}
Loading