diff --git a/README.md b/README.md index f174c98..925b494 100644 --- a/README.md +++ b/README.md @@ -4,16 +4,16 @@ libp2p-swarm JavaScript implementation [![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) [![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/) [![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs) -[![Build Status](https://img.shields.io/travis/diasdavid/js-libp2p-swarm/master.svg?style=flat-square)](https://travis-ci.org/diasdavid/js-libp2p-swarm) -[![Coverage Status](https://coveralls.io/repos/github/diasdavid/js-libp2p-swarm/badge.svg?branch=master)](https://coveralls.io/github/diasdavid/js-libp2p-swarm?branch=master) -[![Dependency Status](https://david-dm.org/diasdavid/js-libp2p-swarm.svg?style=flat-square)](https://david-dm.org/diasdavid/js-libp2p-swarm) +[![Build Status](https://img.shields.io/travis/libp2p/js-libp2p-swarm/master.svg?style=flat-square)](https://travis-ci.org/libp2p/js-libp2p-swarm) +[![Coverage Status](https://coveralls.io/repos/github/libp2p/js-libp2p-swarm/badge.svg?branch=master)](https://coveralls.io/github/libp2p/js-libp2p-swarm?branch=master) +[![Dependency Status](https://david-dm.org/libp2p/js-libp2p-swarm.svg?style=flat-square)](https://david-dm.org/libp2p/js-libp2p-swarm) [![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/feross/standard) > libp2p swarm implementation in JavaScript. libp2p-swarm is a connection abstraction that is able to leverage several transports and connection upgrades, such as congestion control, channel encryption, the multiplexing of several streams in one connection, and more. It does this by bringing protocol multiplexing to the application level (instead of the traditional Port level) using multicodec and multistream. -libp2p-swarm is used by [libp2p](https://github.com/diasdavid/js-libp2p) but it can be also used as a standalone module. +libp2p-swarm is used by [libp2p](https://github.com/libp2p/js-libp2p) but it can be also used as a standalone module. ## Table of Contents diff --git a/package.json b/package.json index 4b779a5..f5111be 100644 --- a/package.json +++ b/package.json @@ -43,7 +43,7 @@ "gulp": "^3.9.1", "istanbul": "^0.4.3", "libp2p-multiplex": "^0.2.1", - "libp2p-spdy": "^0.6.3", + "libp2p-spdy": "^0.7.0", "libp2p-tcp": "^0.7.1", "libp2p-webrtc-star": "^0.3.1", "libp2p-websockets": "^0.7.0", @@ -56,9 +56,10 @@ "bl": "^1.1.2", "browserify-zlib": "github:ipfs/browserify-zlib", "duplexify": "^3.4.3", - "interface-connection": "^0.1.3", + "interface-connection": "^0.1.7", "ip-address": "^5.8.0", "length-prefixed-stream": "^1.5.0", + "libp2p-identify": "^0.1.1", "lodash.contains": "^2.4.3", "multiaddr": "^2.0.0", "multistream-select": "^0.9.0", @@ -75,4 +76,4 @@ "Richard Littauer ", "dignifiedquire " ] -} \ No newline at end of file +} diff --git a/src/connection.js b/src/connection.js index d739241..1a67b4b 100644 --- a/src/connection.js +++ b/src/connection.js @@ -1,7 +1,8 @@ 'use strict' const protocolMuxer = require('./protocol-muxer') -const identify = require('./identify') +const identify = require('libp2p-identify') +const multistream = require('multistream-select') module.exports = function connection (swarm) { return { @@ -15,43 +16,50 @@ module.exports = function connection (swarm) { swarm.handle(muxer.multicodec, (conn) => { const muxedConn = muxer(conn, true) - var peerIdForConn - muxedConn.on('stream', (conn) => { - function gotId () { - if (peerIdForConn) { - conn.peerId = peerIdForConn - protocolMuxer(swarm.protocols, conn) - } else { - setTimeout(gotId, 100) - } - } - - // If identify happened, when we have the Id of the conn - if (swarm.identify) { - return gotId() - } - protocolMuxer(swarm.protocols, conn) }) - // if identify is enabled, attempt to do it for muxer reuse + // If identify is enabled + // 1. overload getPeerInfo + // 2. call getPeerInfo + // 3. add this conn to the pool if (swarm.identify) { - identify.exec(conn, muxedConn, swarm._peerInfo, (err, pi) => { - if (err) { - return console.log('Identify exec failed', err) - } + // overload peerInfo to use Identify instead + conn.getPeerInfo = (cb) => { + const conn = muxedConn.newStream() + const ms = new multistream.Dialer() + ms.handle(conn, (err) => { + if (err) { return cb(err) } - peerIdForConn = pi.id - swarm.muxedConns[pi.id.toB58String()] = {} - swarm.muxedConns[pi.id.toB58String()].muxer = muxedConn - swarm.muxedConns[pi.id.toB58String()].conn = conn // to be able to extract addrs + ms.select(identify.multicodec, (err, conn) => { + if (err) { return cb(err) } - swarm.emit('peer-mux-established', pi) + identify.exec(conn, (err, peerInfo, observedAddrs) => { + if (err) { return cb(err) } + + observedAddrs.forEach((oa) => { + swarm._peerInfo.multiaddr.addSafe(oa) + }) + + cb(null, peerInfo) + }) + }) + }) + } + + conn.getPeerInfo((err, peerInfo) => { + if (err) { + return console.log('Identify not successful') + } + swarm.muxedConns[peerInfo.id.toB58String()] = { + muxer: muxedConn + } + swarm.emit('peer-mux-established', peerInfo) muxedConn.on('close', () => { - delete swarm.muxedConns[pi.id.toB58String()] - swarm.emit('peer-mux-closed', pi) + delete swarm.muxedConns[peerInfo.id.toB58String()] + swarm.emit('peer-mux-closed', peerInfo) }) }) } @@ -60,7 +68,7 @@ module.exports = function connection (swarm) { reuse () { swarm.identify = true - swarm.handle(identify.multicodec, identify.handler(swarm._peerInfo, swarm)) + swarm.handle(identify.multicodec, identify.handler(swarm._peerInfo)) } } } diff --git a/src/dial.js b/src/dial.js index ddad5ad..ae6e10b 100644 --- a/src/dial.js +++ b/src/dial.js @@ -43,6 +43,8 @@ module.exports = function dial (swarm) { return proxyConn function gotWarmedUpConn (conn) { + conn.setPeerInfo(pi) + attemptMuxerUpgrade(conn, (err, muxer) => { if (!protocol) { if (err) { @@ -61,6 +63,13 @@ module.exports = function dial (swarm) { } function gotMuxer (muxer) { + if (swarm.identify) { + // TODO: Consider: + // 1. overload getPeerInfo + // 2. exec identify (through getPeerInfo) + // 3. update the peerInfo that is already stored in the conn + } + openConnInMuxedConn(muxer, (conn) => { protocolHandshake(conn, protocol, callback) }) @@ -88,7 +97,7 @@ module.exports = function dial (swarm) { cryptoDial() function cryptoDial () { - // currently, js-libp2p-swarm doesn't implement any crypto + // currently, no crypto channel is implemented const ms = new multistream.Dialer() ms.handle(conn, (err) => { if (err) { @@ -133,7 +142,7 @@ module.exports = function dial (swarm) { const muxedConn = swarm.muxers[key](conn, false) swarm.muxedConns[b58Id] = {} swarm.muxedConns[b58Id].muxer = muxedConn - swarm.muxedConns[b58Id].conn = conn + // should not be needed anymore - swarm.muxedConns[b58Id].conn = conn swarm.emit('peer-mux-established', pi) @@ -142,9 +151,8 @@ module.exports = function dial (swarm) { swarm.emit('peer-mux-closed', pi) }) - // in case identify is on + // For incoming streams, in case identify is on muxedConn.on('stream', (conn) => { - conn.peerId = pi.id protocolMuxer(swarm.protocols, conn) }) @@ -169,7 +177,6 @@ module.exports = function dial (swarm) { return callback(err) } proxyConn.setInnerConn(conn) - proxyConn.peerId = pi.id callback(null, proxyConn) }) }) diff --git a/src/identify.js b/src/identify.js deleted file mode 100644 index c395a2f..0000000 --- a/src/identify.js +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Identify is one of the protocols swarms speaks in order to - * broadcast and learn about the ip:port pairs a specific peer - * is available through and to know when a new stream muxer is - * established, so a conn can be reused - */ - -'use strict' - -const multistream = require('multistream-select') -const fs = require('fs') -const path = require('path') -const PeerInfo = require('peer-info') -const PeerId = require('peer-id') -const multiaddr = require('multiaddr') -const bl = require('bl') - -const lpstream = require('length-prefixed-stream') -const protobuf = require('protocol-buffers') -const schema = fs.readFileSync(path.join(__dirname, 'identify.proto')) -const idPb = protobuf(schema) - -exports = module.exports -exports.multicodec = '/ipfs/id/1.0.0' - -exports.exec = (rawConn, muxer, pInfo, callback) => { - // 1. open a stream - // 2. multistream into identify - // 3. send what I see from this other peer (extract fro conn) - // 4. receive what the other peer sees from me - // 4. callback with (err, peerInfo) - - const conn = muxer.newStream() - - const ms = new multistream.Dialer() - ms.handle(conn, (err) => { - if (err) { - return callback(err) - } - - ms.select(exports.multicodec, (err, conn) => { - if (err) { - return callback(err) - } - - const encode = lpstream.encode() - const decode = lpstream.decode() - - encode - .pipe(conn) - .pipe(decode) - .pipe(bl((err, data) => { - if (err) { - return callback(err) - } - const msg = idPb.Identify.decode(data) - if (hasObservedAddr(msg)) { - pInfo.multiaddr.addSafe(multiaddr(msg.observedAddr)) - } - - const pId = PeerId.createFromPubKey(msg.publicKey) - const otherPInfo = new PeerInfo(pId) - msg.listenAddrs.forEach((ma) => { - otherPInfo.multiaddr.add(multiaddr(ma)) - }) - callback(null, otherPInfo) - })) - - rawConn.getObservedAddrs((err, addrs) => { - if (err) { - return - } - const obsMultiaddr = addrs[0] - - let publicKey = new Buffer(0) - if (pInfo.id.pubKey) { - publicKey = pInfo.id.pubKey.bytes - } - - const msg = idPb.Identify.encode({ - protocolVersion: 'na', - agentVersion: 'na', - publicKey: publicKey, - listenAddrs: pInfo.multiaddrs.map((mh) => mh.buffer), - observedAddr: obsMultiaddr ? obsMultiaddr.buffer : new Buffer('') - }) - - encode.write(msg) - encode.end() - }) - }) - }) -} - -exports.handler = (pInfo, swarm) => { - return (conn) => { - // 1. receive incoming observed info about me - // 2. update my own information (on peerInfo) - // 3. send back what I see from the other (get from swarm.muxedConns[incPeerID].conn.getObservedAddrs() - - const encode = lpstream.encode() - const decode = lpstream.decode() - - encode - .pipe(conn) - .pipe(decode) - .pipe(bl((err, data) => { - if (err) { - console.log(new Error('Failed to decode lpm from identify')) - return - } - const msg = idPb.Identify.decode(data) - if (hasObservedAddr(msg)) { - pInfo.multiaddr.addSafe(multiaddr(msg.observedAddr)) - } - - const pId = PeerId.createFromPubKey(msg.publicKey) - const conn = swarm.muxedConns[pId.toB58String()].conn - conn.getObservedAddrs((err, addrs) => { - if (err) {} - const obsMultiaddr = addrs[0] - - let publicKey = new Buffer(0) - if (pInfo.id.pubKey) { - publicKey = pInfo.id.pubKey.bytes - } - - const msgSend = idPb.Identify.encode({ - protocolVersion: 'na', - agentVersion: 'na', - publicKey: publicKey, - listenAddrs: pInfo.multiaddrs.map((ma) => ma.buffer), - observedAddr: obsMultiaddr ? obsMultiaddr.buffer : new Buffer('') - }) - - encode.write(msgSend) - encode.end() - }) - })) - } -} - -function hasObservedAddr (msg) { - return msg.observedAddr && msg.observedAddr.length > 0 -} diff --git a/src/identify.proto b/src/identify.proto deleted file mode 100644 index e4845aa..0000000 --- a/src/identify.proto +++ /dev/null @@ -1,25 +0,0 @@ -message Identify { - - // protocolVersion determines compatibility between peers - optional string protocolVersion = 5; // e.g. ipfs/1.0.0 - - // agentVersion is like a UserAgent string in browsers, or client version in bittorrent - // includes the client name and client. - optional string agentVersion = 6; // e.g. go-ipfs/0.1.0 - - // publicKey is this node's public key (which also gives its node.ID) - // - may not need to be sent, as secure channel implies it has been sent. - // - then again, if we change / disable secure channel, may still want it. - optional bytes publicKey = 1; - - // listenAddrs are the multiaddrs the sender node listens for open connections on - repeated bytes listenAddrs = 2; - - // oservedAddr is the multiaddr of the remote endpoint that the sender node perceives - // this is useful information to convey to the other side, as it helps the remote endpoint - // determine whether its connection to the local peer goes through NAT. - optional bytes observedAddr = 4; - - // (DEPRECATED) protocols are the services this node is running - // repeated string protocols = 3; -} diff --git a/test/04-muxing-multiplex.node.js b/test/04-muxing-multiplex.node.js index 0f0e18d..57ec7c1 100644 --- a/test/04-muxing-multiplex.node.js +++ b/test/04-muxing-multiplex.node.js @@ -10,7 +10,8 @@ const Swarm = require('../src') const TCP = require('libp2p-tcp') const multiplex = require('libp2p-spdy') -describe('stream muxing with multiplex (on TCP)', function () { +// TODO multiplex needs to be upgraded, like spdy, to work again +describe.skip('stream muxing with multiplex (on TCP)', function () { this.timeout(60 * 1000) var swarmA diff --git a/test/05-muxing-spdy.node.js b/test/05-muxing-spdy.node.js index ac6c321..fe00252 100644 --- a/test/05-muxing-spdy.node.js +++ b/test/05-muxing-spdy.node.js @@ -127,7 +127,36 @@ describe('stream muxing with spdy (on TCP)', function () { }) }) - it('make sure it does not blow up when the socket is closed', (done) => { + it('with Identify, do getPeerInfo', (done) => { + swarmA.handle('/banana/1.0.0', (conn) => { + conn.getPeerInfo((err, peerInfoC) => { + expect(err).to.not.exist + expect(peerInfoC.id.toB58String()).to.equal(peerC.id.toB58String()) + }) + + conn.pipe(conn) + }) + + swarmC.dial(peerA, '/banana/1.0.0', (err, conn) => { + expect(err).to.not.exist + setTimeout(() => { + expect(Object.keys(swarmC.muxedConns).length).to.equal(1) + expect(Object.keys(swarmA.muxedConns).length).to.equal(2) + conn.getPeerInfo((err, peerInfoA) => { + expect(err).to.not.exist + expect(peerInfoA.id.toB58String()).to.equal(peerA.id.toB58String()) + conn.on('end', done) + conn.resume() + conn.end() + }) + }, 500) + }) + }) + + // This test is not possible as the raw conn is not exposed anymore + // TODO: create a similar version, but that spawns a swarm in a + // different proc + it.skip('make sure it does not blow up when the socket is closed', (done) => { swarmD.connection.reuse() let count = 0 @@ -148,7 +177,10 @@ describe('stream muxing with spdy (on TCP)', function () { }) }) - it('blow up a socket, with WebSockets', (done) => { + // This test is not possible as the raw conn is not exposed anymore + // TODO: create a similar version, but that spawns a swarm in a + // different proc + it.skip('blow up a socket, with WebSockets', (done) => { var swarmE var peerE var swarmF diff --git a/test/09-swarm-with-muxing.node.js b/test/09-swarm-with-muxing.node.js index aa15bba..74f85ea 100644 --- a/test/09-swarm-with-muxing.node.js +++ b/test/09-swarm-with-muxing.node.js @@ -198,13 +198,19 @@ describe('high level API - with everything mixed all together!', function () { it('dial from tcp+ws to tcp+ws', (done) => { swarmC.handle('/mamao/1.0.0', (conn) => { - expect(conn.peerId).to.exist + conn.getPeerInfo((err, peerInfo) => { + expect(err).to.not.exist + expect(peerInfo).to.exist + }) conn.pipe(conn) }) swarmA.dial(peerC, '/mamao/1.0.0', (err, conn) => { expect(err).to.not.exist - expect(conn.peerId).to.exist + conn.getPeerInfo((err, peerInfo) => { + expect(err).to.not.exist + expect(peerInfo).to.exist + }) expect(Object.keys(swarmA.muxedConns).length).to.equal(2) conn.end()