From 7f0a0db677faccf3f3a6a2bd176ba042b25259d9 Mon Sep 17 00:00:00 2001 From: David Dias Date: Sat, 28 May 2016 21:45:43 +0100 Subject: [PATCH] level up libp2p interface and error handling --- gulpfile.js | 4 +- package.json | 13 ++- src/index.js | 167 ++++++++++++++++++++++++++-- test/peer.json | 4 +- test/webrtc-star-only.js | 51 ++++++--- test/websockets-only.js | 232 +++++++++++++++++++++++++++------------ 6 files changed, 368 insertions(+), 103 deletions(-) diff --git a/gulpfile.js b/gulpfile.js index a9f512c..7f13afe 100644 --- a/gulpfile.js +++ b/gulpfile.js @@ -20,7 +20,7 @@ gulp.task('libnode:start', (done) => { node = new Node(peer) node.start(() => { - node.swarm.handle('/echo/1.0.0', (conn) => { + node.handle('/echo/1.0.0', (conn) => { conn.pipe(conn) }) ready() @@ -34,7 +34,7 @@ gulp.task('libnode:start', (done) => { gulp.task('libnode:stop', (done) => { setTimeout(() => { - node.swarm.close((err) => { + node.stop((err) => { if (err) { throw err } diff --git a/package.json b/package.json index 1023cff..1140023 100644 --- a/package.json +++ b/package.json @@ -35,7 +35,7 @@ "bl": "^1.1.2", "chai": "^3.5.0", "gulp": "^3.9.1", - "libp2p-ipfs": "^0.9.0", + "libp2p-ipfs": "^0.10.0", "peer-id": "^0.7.0", "pre-commit": "^1.1.3", "run-parallel": "^1.1.6", @@ -44,10 +44,13 @@ "dependencies": { "babel-runtime": "^6.9.0", "libp2p-spdy": "^0.6.1", - "libp2p-swarm": "^0.19.0", - "libp2p-webrtc-star": "^0.2.0", - "libp2p-websockets": "^0.6.0", + "libp2p-swarm": "^0.19.4", + "libp2p-webrtc-star": "^0.2.1", + "libp2p-websockets": "^0.6.1", + "mafmt": "^2.1.1", "multiaddr": "^2.0.2", + "peer-book": "^0.3.0", + "peer-id": "^0.7.0", "peer-info": "^0.7.0" }, "contributors": [ @@ -55,4 +58,4 @@ "dignifiedquire ", "greenkeeperio-bot " ] -} \ No newline at end of file +} diff --git a/src/index.js b/src/index.js index 3922bd4..53d7a25 100644 --- a/src/index.js +++ b/src/index.js @@ -2,33 +2,57 @@ const Swarm = require('libp2p-swarm') const PeerInfo = require('peer-info') +const PeerId = require('peer-id') const WS = require('libp2p-websockets') const WebRTCStar = require('libp2p-webrtc-star') const spdy = require('libp2p-spdy') const EE = require('events').EventEmitter +const multiaddr = require('multiaddr') +const PeerBook = require('peer-book') +const mafmt = require('mafmt') exports = module.exports -exports.Node = function Node (peerInfo) { +const OFFLINE_ERROR_MESSAGE = 'The libp2p node is not started yet' +const IPFS_CODE = 421 + +exports.Node = function Node (pInfo, pBook) { if (!(this instanceof Node)) { - return new Node(peerInfo) + return new Node(pInfo, pBook) + } + + if (!pInfo) { + pInfo = new PeerInfo() + pInfo.multiaddr.add(multiaddr('/ip4/0.0.0.0/tcp/0')) } - if (!peerInfo) { - peerInfo = new PeerInfo() + + if (!pBook) { + pBook = new PeerBook() } - this.peerInfo = peerInfo + this.peerInfo = pInfo + this.peerBook = pBook // Swarm - this.swarm = new Swarm(peerInfo) + this.swarm = new Swarm(pInfo) this.swarm.connection.addStreamMuxer(spdy) this.swarm.connection.reuse() + this.swarm.on('peer-mux-established', (peerInfo) => { + this.peerBook.put(peerInfo) + }) + + this.swarm.on('peer-mux-closed', (peerInfo) => { + this.peerBook.removeByB58String(peerInfo.id.toB58String()) + }) + + let isOnline = false + this.start = (callback) => { // if we have `webrtc-star` addrs, then add // the WebRTCStar transport const wstar = new WebRTCStar() - if (wstar.filter(peerInfo.multiaddrs).length > 0) { + if (wstar.filter(this.peerInfo.multiaddrs).length > 0) { this.swarm.transport.add('wstar', wstar) wstar.discovery.on('peer', (peerInfo) => { this.discovery.emit('peer', peerInfo) @@ -40,20 +64,141 @@ exports.Node = function Node (peerInfo) { // WebSockets needs to be added after because // it can't have a listener on the browser this.swarm.transport.add('ws', new WS()) + isOnline = true callback() }) } else { // if just WebSockets, no thing to listen this.swarm.transport.add('ws', new WS()) + isOnline = true callback() } } + this.stop = (callback) => { + isOnline = false + this.swarm.close(callback) + } + + this.dialById = (id, protocol, callback) => { + if (typeof protocol === 'function') { + callback = protocol + protocol = undefined + } + + if (!isOnline) { + return callback(new Error(OFFLINE_ERROR_MESSAGE)) + } + // NOTE, these dialById only works if a previous dial + // was made until we have PeerRouting + // TODO support PeerRouting when it is Ready + callback(new Error('not implemented yet')) + } + + this.dialByMultiaddr = (maddr, protocol, callback) => { + if (typeof protocol === 'function') { + callback = protocol + protocol = undefined + } + + if (!isOnline) { + return callback(new Error(OFFLINE_ERROR_MESSAGE)) + } + + if (typeof maddr === 'string') { + maddr = multiaddr(maddr) + } + + if (!mafmt.IPFS.matches(maddr.toString())) { + return callback(new Error('multiaddr not valid')) + } + + const ipfsIdB58String = maddr.stringTuples().filter((tuple) => { + if (tuple[0] === IPFS_CODE) { + return true + } + })[0][1] + + let peer + try { + peer = this.peerBook.getByB58String(ipfsIdB58String) + } catch (err) { + peer = new PeerInfo(PeerId.createFromB58String(ipfsIdB58String)) + } + + peer.multiaddr.add(maddr) + this.dialByPeerInfo(peer, protocol, callback) + } + + this.dialByPeerInfo = (peer, protocol, callback) => { + if (typeof protocol === 'function') { + callback = protocol + protocol = undefined + } + if (!isOnline) { + return callback(new Error(OFFLINE_ERROR_MESSAGE)) + } + + this.swarm.dial(peer, protocol, (err, conn) => { + if (err) { + return callback(err) + } + this.peerBook.put(peer) + callback(null, conn) + }) + } + + this.hangUpById = (id, callback) => { + callback(new Error('not implemented yet')) + // TODO + } + + this.hangUpByMultiaddr = (maddr, callback) => { + if (!isOnline) { + return callback(new Error(OFFLINE_ERROR_MESSAGE)) + } + + if (typeof maddr === 'string') { + maddr = multiaddr(maddr) + } + + if (!mafmt.IPFS.matches(maddr.toString())) { + return callback(new Error('multiaddr not valid')) + } + + const ipfsIdB58String = maddr.stringTuples().filter((tuple) => { + if (tuple[0] === IPFS_CODE) { + return true + } + })[0][1] + + try { + const pi = this.peerBook.getByB58String(ipfsIdB58String) + this.hangUpByPeerInfo(pi, callback) + } catch (err) { + // already disconnected + callback() + } + } + + this.hangUpByPeerInfo = (peer, callback) => { + if (!isOnline) { + return callback(new Error(OFFLINE_ERROR_MESSAGE)) + } + + this.peerBook.removeByB58String(peer.id.toB58String()) + this.swarm.hangUp(peer, callback) + } + + this.handle = (protocol, handler) => { + return this.swarm.handle(protocol, handler) + } + + this.unhandle = (protocol) => { + return this.swarm.unhandle(protocol) + } + this.discovery = new EE() this.routing = null this.records = null - - this.dial = () => { - throw new Error('THIS WILL BE EQUIVALENT TO THE ROUTED HOST FEATURE, IT WILL FIGURE OUT EVERYTHING :D') - } } diff --git a/test/peer.json b/test/peer.json index df4ff35..ba789ca 100644 --- a/test/peer.json +++ b/test/peer.json @@ -1,3 +1,5 @@ { - "privKey": "CAASQDA+AgEAAgkAiZMWorfx3pkCAwEAAQIIbl57fDLjyaUCBQDi+1JLAgUAmymkKwIEKc9xkQIFAIoy+KkCBHBfslE=" + "id": "Qmex1SSsueWFsUfjdkugJ5zhcnjddAt8TxcnDLUXKD9Sx7", + "privKey": "CAASqAkwggSkAgEAAoIBAQCXzV127CvVHOGMzvsn/U+/32JM58KA6k0FSCCeNFzNowiDS/vV5eezGN5AFoxsF6icWLoaczz7l9RdVD+I/t6PEt9X7XUdrDCtSS8WmAcCgvZWSSf7yAd3jT4GSZDUIgIEeRZsERDt/yVqTLwsZ1G9dMIeh8sbf2zwjTXZIWaRM6o4lq3DYFfzLvJUXlJodxPogU7l7nLkITPUv+yQAMcVHizbNwJvwiETKYeUj73/m/wEPAlnFESexDstxNiIwE/FH8Ao50QPZRO6E6Jb0hhYSI/4CLRdrzDFm/Vzplei3Wr2DokSROaNyeG37VAueyA+pDqn84um+L9uXLwbv5FbAgMBAAECggEAdBUzV/GaQ0nmoQrWvOnUxmFIho7kCjkh1NwnNVPNc+Msa1r7pcI9wJNPwap8j1w4L/cZuYhOJgcg+o2mWFiuULKZ4F9Ro/M89gZ038457g2/2pPu43c/Xoi/2YcAHXg0Gr+OCe2zCIyITBWKAFqyAzL6DubAxrJW2Ezj1LrZ+EZgMyzbh/go/eEGSJaaGkINeAkY144DqDWWWvzyhKhryipsGkZGEkVy9xJgMEI3ipVvuPez2XAvoyyeuinBBLe+Z2vY5G50XXzbIMhIQGLncHf9MwTv6wt1ilyOSLOXK0BoQbB76J3R3is5dSULXXP9r8VocjLBEkmBuf4FXAKzoQKBgQDNNS4F1XE1gxD8LPkL+aB/hi6eVHVPhr+w0I/9ATikcLGeUfBM2Gd6cZRPFtNVrv1p6ZF1D1UyGDknGbDBSQd9wLUgb0fDoo3jKYMGWq6G+VvaP5rzWQeBV8YV2EhSmUk1i6kiYe2ZE8WyrPie7iwpQIY60e2A8Ly0GKZiBZUcHQKBgQC9YDAVsGnEHFVFkTDpvw5HwEzCgTb2A3NgkGY3rTYZ7L6AFjqCYmUwFB8Fmbyc4kdFWNh8wfmq5Qrvl49NtaeukiqWKUUlB8uPdztB1P0IahA2ks0owStZlRifmwfgYyMd4xE17lhaOgQQJZZPxmP0F6mdOvb3YJafNURCdMS51wKBgEvvIM+h0tmFXXSjQ6kNvzlRMtD92ccKysYn9xAdMpOO6/r0wSH+dhQWEVZO0PcE4NsfReb2PIVj90ojtIdhebcr5xpQc1LORQjJJKXmSmzBux6AqNrhl+hhzXfp56FA/Zkly/lgGWaqrV5XqUxOP+Mn8EO1yNgMvRc7g94DyNB1AoGBAKLBuXHalXwDsdHBUB2Eo3xNLGt6bEcRfia+0+sEBdxQGQWylQScFkU09dh1YaIf44sZKa5HdBFJGpYCVxo9hmjFnK5Dt/Z0daHOonIY4INLzLVqg8KECoLKXkhGEIXsDjFQhukn+G1LMVTDSSU055DQiWjlVX4UWD9qo0jOXIkvAoGBAMP50p2X6PsWWZUuuR7i1JOJHRyQZPWdHh9p8SSLnCtEpHYZfJr4INXNmhnSiB/3TUnHix2vVKjosjMTCk/CjfzXV2H41WPOLZ2/Pi3SxCicWIRj4kCcWhkEuIF2jGkg1+jmNiCl/zNMaBOAIP3QbDPtqOWbYlPd2YIzdj6WQ6R4", + "pubKey": "CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCXzV127CvVHOGMzvsn/U+/32JM58KA6k0FSCCeNFzNowiDS/vV5eezGN5AFoxsF6icWLoaczz7l9RdVD+I/t6PEt9X7XUdrDCtSS8WmAcCgvZWSSf7yAd3jT4GSZDUIgIEeRZsERDt/yVqTLwsZ1G9dMIeh8sbf2zwjTXZIWaRM6o4lq3DYFfzLvJUXlJodxPogU7l7nLkITPUv+yQAMcVHizbNwJvwiETKYeUj73/m/wEPAlnFESexDstxNiIwE/FH8Ao50QPZRO6E6Jb0hhYSI/4CLRdrzDFm/Vzplei3Wr2DokSROaNyeG37VAueyA+pDqn84um+L9uXLwbv5FbAgMBAAE=" } diff --git a/test/webrtc-star-only.js b/test/webrtc-star-only.js index 6fcda97..a8bcc69 100644 --- a/test/webrtc-star-only.js +++ b/test/webrtc-star-only.js @@ -11,7 +11,7 @@ const bl = require('bl') const libp2p = require('../src') describe('libp2p-ipfs-browser (webrtc only)', function () { - this.timeout(60 * 1000) + this.timeout(15 * 1000) let peer1 let peer2 @@ -46,23 +46,48 @@ describe('libp2p-ipfs-browser (webrtc only)', function () { }) it('handle a protocol on the first node', (done) => { - node2.swarm.handle('/echo/1.0.0', (conn) => { + node2.handle('/echo/1.0.0', (conn) => { conn.pipe(conn) }) done() }) it('dial from the second node to the first node', (done) => { - node1.swarm.dial(peer2, '/echo/1.0.0', (err, conn) => { - const text = 'hello' + node1.dialByPeerInfo(peer2, '/echo/1.0.0', (err, conn) => { expect(err).to.not.exist - conn.pipe(bl((err, data) => { + setTimeout(check, 500) + + function check () { + const text = 'hello' + const peers1 = node1.peerBook.getAll() + expect(err).to.not.exist + expect(Object.keys(peers1)).to.have.length(1) + const peers2 = node2.peerBook.getAll() + expect(err).to.not.exist + expect(Object.keys(peers2)).to.have.length(1) + conn.pipe(bl((err, data) => { + expect(err).to.not.exist + expect(data.toString()).to.equal(text) + done() + })) + conn.write(text) + conn.end() + } + }) + }) + + it('node1 hangUp node2', (done) => { + node1.hangUpByPeerInfo(peer2, (err) => { + expect(err).to.not.exist + setTimeout(check, 500) + + function check () { + const peers = node1.peerBook.getAll() expect(err).to.not.exist - expect(data.toString()).to.equal(text) + expect(Object.keys(peers)).to.have.length(0) + expect(Object.keys(node1.swarm.muxedConns)).to.have.length(0) done() - })) - conn.write(text) - conn.end() + } }) }) @@ -73,17 +98,17 @@ describe('libp2p-ipfs-browser (webrtc only)', function () { peer3.multiaddr.add(mh3) node1.discovery.on('peer', (peerInfo) => { - node1.swarm.dial(peerInfo) + node1.dialByPeerInfo(peerInfo, () => {}) }) node2.discovery.on('peer', (peerInfo) => { - node2.swarm.dial(peerInfo) + node2.dialByPeerInfo(peerInfo, () => {}) }) const node3 = new libp2p.Node(peer3) node3.start(() => { setTimeout(() => { - expect(Object.keys(node1.swarm.muxedConns).length).to.equal(2) - expect(Object.keys(node2.swarm.muxedConns).length).to.equal(2) + expect(Object.keys(node1.swarm.muxedConns).length).to.equal(1) + expect(Object.keys(node2.swarm.muxedConns).length).to.equal(1) done() }, 2000) }) diff --git a/test/websockets-only.js b/test/websockets-only.js index 382c25f..aa28504 100644 --- a/test/websockets-only.js +++ b/test/websockets-only.js @@ -3,111 +3,201 @@ const expect = require('chai').expect const multiaddr = require('multiaddr') -const Peer = require('peer-info') -const Id = require('peer-id') +const PeerInfo = require('peer-info') +const PeerId = require('peer-id') -const Node = require('../src').Node +const libp2p = require('../src') const rawPeer = require('./peer.json') -const id = Id.createFromPrivKey(rawPeer.privKey) +const id = PeerId.createFromPrivKey(rawPeer.privKey) +const bl = require('bl') describe('libp2p-ipfs-browser (websockets only)', function () { - this.timeout(60 * 1000) - let node - let peer + this.timeout(20 * 1000) + + let peerB + let nodeA before((done) => { - const mh = multiaddr('/ip4/127.0.0.1/tcp/9200/ws') - peer = new Peer(id) - peer.multiaddr.add(mh) + const mh = multiaddr('/ip4/127.0.0.1/tcp/9200/ws/ipfs/' + rawPeer.id) + peerB = new PeerInfo(id) + peerB.multiaddr.add(mh) done() }) - it('start', (done) => { - node = new Node() - node.start(done) + it('create libp2pNode', () => { + nodeA = new libp2p.Node() + }) + + it('start libp2pNode', (done) => { + nodeA.start(done) }) - it('echo', (done) => { - const message = 'Hello World!' - node.swarm.dial(peer, '/echo/1.0.0', (err, conn) => { + // General connectivity tests + + it('libp2p.dialByMultiaddr nodeA to nodeB', (done) => { + nodeA.dialByMultiaddr(peerB.multiaddrs[0], (err) => { expect(err).to.not.exist + // Some time for Identify to finish + setTimeout(check, 500) - conn.write(message) - conn.on('data', (data) => { - expect(data.toString()).to.equal(message) + function check () { + const peers = nodeA.peerBook.getAll() + expect(err).to.not.exist + expect(Object.keys(peers)).to.have.length(1) + done() + } + // TODO confirm that we got the pubkey through identify + }) + }) - conn.end() + it('libp2p.dialByMultiaddr on Protocol nodeA to nodeB', (done) => { + nodeA.dialByMultiaddr(peerB.multiaddrs[0], '/echo/1.0.0', (err, conn) => { + expect(err).to.not.exist + const peers = nodeA.peerBook.getAll() + expect(err).to.not.exist + expect(Object.keys(peers)).to.have.length(1) + conn.pipe(bl((err, data) => { + expect(err).to.not.exist + expect(data.toString()).to.equal('hey') done() - }) + })) + conn.write('hey') + conn.end() + }) + }) + + it('libp2p.hangupByMultiaddr nodeA to nodeB', (done) => { + nodeA.hangUpByMultiaddr(peerB.multiaddrs[0], (err) => { + expect(err).to.not.exist + setTimeout(check, 500) + + function check () { + const peers = nodeA.peerBook.getAll() + expect(err).to.not.exist + expect(Object.keys(peers)).to.have.length(0) + expect(Object.keys(nodeA.swarm.muxedConns)).to.have.length(0) + done() + } }) }) - describe('stress', () => { - it('one big write', (done) => { - const message = new Buffer(1000000).fill('a').toString('hex') + it('libp2p.dialByPeerInfo nodeA to nodeB', (done) => { + nodeA.dialByPeerInfo(peerB, (err) => { + expect(err).to.not.exist + // Some time for Identify to finish + setTimeout(check, 500) - node.swarm.dial(peer, '/echo/1.0.0', (err, conn) => { + function check () { + const peers = nodeA.peerBook.getAll() expect(err).to.not.exist + expect(Object.keys(peers)).to.have.length(1) + done() + } + // TODO confirm that we got the pubkey through identify + }) + }) - conn.write(message) - conn.write('STOP') + it('libp2p.dialByPeerInfo on Protocol nodeA to nodeB', (done) => { + nodeA.dialByPeerInfo(peerB, '/echo/1.0.0', (err, conn) => { + expect(err).to.not.exist + const peers = nodeA.peerBook.getAll() + expect(err).to.not.exist + expect(Object.keys(peers)).to.have.length(1) + conn.pipe(bl((err, data) => { + expect(err).to.not.exist + expect(data.toString()).to.equal('hey') + done() + })) + conn.write('hey') + conn.end() + }) + }) + + it('libp2p.hangupByPeerInfo nodeA to nodeB', (done) => { + nodeA.hangUpByPeerInfo(peerB, (err) => { + expect(err).to.not.exist + setTimeout(check, 500) + + function check () { + const peers = nodeA.peerBook.getAll() + expect(err).to.not.exist + expect(Object.keys(peers)).to.have.length(0) + expect(Object.keys(nodeA.swarm.muxedConns)).to.have.length(0) + done() + } + }) + }) + + // NOTE, these dialById only works if a previous dial was made + // until we have PeerRouting + it.skip('libp2p.dialById nodeA to nodeB', (done) => {}) + it.skip('libp2p.dialById on Protocol nodeA to nodeB', (done) => {}) + it.skip('libp2p.hangupById nodeA to nodeB', (done) => {}) + + it('stress test: one big write', (done) => { + const message = new Buffer(1000000).fill('a').toString('hex') + + nodeA.dialByPeerInfo(peerB, '/echo/1.0.0', (err, conn) => { + expect(err).to.not.exist + + conn.write(message) + conn.write('STOP') - let result = '' + let result = '' - conn.on('data', (data) => { - if (data.toString() === 'STOP') { - conn.end() - return - } - result += data.toString() - }) + conn.on('data', (data) => { + if (data.toString() === 'STOP') { + conn.end() + return + } + result += data.toString() + }) - conn.on('end', () => { - expect(result).to.equal(message) - done() - }) + conn.on('end', () => { + expect(result).to.equal(message) + done() }) }) + }) - it('many writes in 2 batches', (done) => { - const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') - const peer = new Peer(id) - peer.multiaddr.add(mh) + it('stress test: many writes in 2 batches', (done) => { + let expected = '' + let counter = 0 - let expected = '' - let counter = 0 + nodeA.dialByPeerInfo(peerB, '/echo/1.0.0', (err, conn) => { + expect(err).to.not.exist - node.swarm.dial(peer, '/echo/1.0.0', (err, conn) => { - expect(err).to.not.exist + while (++counter < 10000) { + conn.write(`${counter} `) + expected += `${counter} ` + } - while (++counter < 10000) { - conn.write(`${counter} `) - expected += `${counter} ` - } + while (++counter < 20000) { + conn.write(`${counter} `) + expected += `${counter} ` + } - while (++counter < 20000) { - conn.write(`${counter} `) - expected += `${counter} ` + setTimeout(() => { + conn.write('STOP') + }, 2000) + + let result = '' + conn.on('data', (data) => { + if (data.toString() === 'STOP') { + conn.end() + return } + result += data.toString() + }) - setTimeout(() => { - conn.write('STOP') - }, 2000) - - let result = '' - conn.on('data', (data) => { - if (data.toString() === 'STOP') { - conn.end() - return - } - result += data.toString() - }) - - conn.on('end', () => { - expect(result).to.equal(expected) - done() - }) + conn.on('end', () => { + expect(result).to.equal(expected) + done() }) }) }) + + it('stop the libp2pnode', (done) => { + nodeA.stop(done) + }) })