From 31df87d858f40bbdd2e439be0c4bd33f466f0b76 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Thu, 23 Aug 2018 13:47:07 +0200 Subject: [PATCH] feat: support multiple peer and content routing modules --- src/config.js | 4 +- src/content-routing.js | 63 ++++++++++++++++++++--- src/index.js | 6 +-- src/peer-routing.js | 43 ++++++++++++++-- test/config.spec.js | 8 +-- test/content-routing.node.js | 99 +++++++++++++++++++++++++++++++++++- test/peer-routing.node.js | 75 ++++++++++++++++++++++++++- 7 files changed, 276 insertions(+), 22 deletions(-) diff --git a/src/config.js b/src/config.js index ec338c068c..9c51062f7c 100644 --- a/src/config.js +++ b/src/config.js @@ -14,10 +14,10 @@ const OptionsSchema = Joi.object({ connProtector: Joi.object().keys({ protect: Joi.func().required() }).unknown(), - contentRouting: Joi.object(), + contentRouting: Joi.array().items(Joi.object()).allow(null), dht: ModuleSchema.allow(null), peerDiscovery: Joi.array().items(ModuleSchema).allow(null), - peerRouting: Joi.object(), + peerRouting: Joi.array().items(Joi.object()).allow(null), streamMuxer: Joi.array().items(ModuleSchema).allow(null), transport: Joi.array().items(ModuleSchema).min(1).required() }).required(), diff --git a/src/content-routing.js b/src/content-routing.js index 559541ed45..a30e31bb50 100644 --- a/src/content-routing.js +++ b/src/content-routing.js @@ -1,20 +1,71 @@ 'use strict' +const tryEach = require('async/tryEach') +const parallel = require('async/parallel') + module.exports = (node) => { + const routers = node._modules.contentRouting || [] + + // If we have the dht, make it first + if (node._dht) { + routers.unshift(node._dht) + } + return { + /** + * Iterates over all content routers in series to find providers of the given key. + * Once a content router succeeds, iteration will stop. + * + * @param {CID} key The CID key of the content to find + * @param {number} timeout How long the query should run + * @param {function(Error, Result)} callback + * @returns {void} + */ findProviders: (key, timeout, callback) => { - if (!node._dht) { - return callback(new Error('DHT is not available')) + if (routers.length === 0) { + return callback(new Error('No content routers available')) } - node._dht.findProviders(key, timeout, callback) + const tasks = routers.map((router) => { + return (cb) => router.findProviders(key, timeout, (err, results) => { + if (err) { + return cb(err) + } + + // If we don't have any results, we need to provide an error to keep trying + if (!results || Object.keys(results).length === 0) { + return cb(true, null) + } + + cb(null, results) + }) + }) + + tryEach(tasks, (err, results) => { + if (err && err !== true) { + return callback(err) + } + results = results || [] + callback(null, results) + }) }, + + /** + * Iterates over all content routers in parallel to notify it is + * a provider of the given key. + * + * @param {CID} key The CID key of the content to find + * @param {function(Error)} callback + * @returns {void} + */ provide: (key, callback) => { - if (!node._dht) { - return callback(new Error('DHT is not available')) + if (routers.length === 0) { + return callback(new Error('No content routers available')) } - node._dht.provide(key, callback) + parallel(routers.map((router) => { + return (cb) => router.provide(key, cb) + }), callback) } } } diff --git a/src/index.js b/src/index.js index e4286672ed..3cfe7084ca 100644 --- a/src/index.js +++ b/src/index.js @@ -99,9 +99,9 @@ class Node extends EventEmitter { } // Attach remaining APIs - // If peer or content routing modules have been provided, use those, otherwise use the dht - this.peerRouting = this._modules.peerRouting || peerRouting(this) - this.contentRouting = this._modules.contentRouting || contentRouting(this) + // peer and content routing will automatically get modules from _modules and _dht + this.peerRouting = peerRouting(this) + this.contentRouting = contentRouting(this) this.dht = dht(this) this._getPeerInfo = getPeerInfo(this) diff --git a/src/peer-routing.js b/src/peer-routing.js index 3a48d075ee..183f9c15ec 100644 --- a/src/peer-routing.js +++ b/src/peer-routing.js @@ -1,13 +1,50 @@ 'use strict' +const tryEach = require('async/tryEach') + module.exports = (node) => { + const routers = node._modules.peerRouting || [] + + // If we have the dht, make it first + if (node._dht) { + routers.unshift(node._dht) + } + return { + /** + * Iterates over all peer routers in series to find the given peer. + * + * @param {String} id The id of the peer to find + * @param {function(Error, Result)} + * @returns {void} + */ findPeer: (id, callback) => { - if (!node._dht) { - return callback(new Error('DHT is not available')) + if (routers.length === 0) { + return callback(new Error('No peer routers available')) } - node._dht.findPeer(id, callback) + const tasks = routers.map((router) => { + return (cb) => router.findPeer(id, (err, result) => { + if (err) { + return cb(err) + } + + // If we don't have a result, we need to provide an error to keep trying + if (!result || Object.keys(result).length === 0) { + return cb(true, null) + } + + cb(null, result) + }) + }) + + tryEach(tasks, (err, results) => { + if (err && err !== true) { + return callback(err) + } + results = results || null + callback(null, results) + }) } } } diff --git a/test/config.spec.js b/test/config.spec.js index ba5e7c6f39..f45fe2f075 100644 --- a/test/config.spec.js +++ b/test/config.spec.js @@ -109,8 +109,8 @@ describe('configuration', () => { modules: { transport: [ WS ], peerDiscovery: [ Bootstrap ], - peerRouting: peerRouter, - contentRouting: contentRouter + peerRouting: [ peerRouter ], + contentRouting: [ contentRouter ] }, config: { peerDiscovery: { @@ -123,8 +123,8 @@ describe('configuration', () => { } expect(validateConfig(options).modules).to.deep.include({ - peerRouting: peerRouter, - contentRouting: contentRouter + peerRouting: [ peerRouter ], + contentRouting: [ contentRouter ] }) }) diff --git a/test/content-routing.node.js b/test/content-routing.node.js index b0146c2497..9ca1243208 100644 --- a/test/content-routing.node.js +++ b/test/content-routing.node.js @@ -140,7 +140,7 @@ describe('.contentRouting', () => { nodeA = new Node({ peerInfo, modules: { - contentRouting: delegate + contentRouting: [ delegate ] }, config: { relay: { @@ -157,7 +157,8 @@ describe('.contentRouting', () => { ], done) }) - afterEach(() => nock.cleanAll) + after((done) => nodeA.stop(done)) + afterEach(() => nock.cleanAll()) describe('provide', () => { it('should use the delegate router to provide', (done) => { @@ -272,4 +273,98 @@ describe('.contentRouting', () => { }) }) }) + + describe('via the dht and a delegate', () => { + let nodeA + let delegate + + before((done) => { + waterfall([ + (cb) => { + createPeerInfo(cb) + }, + // Create the node using the delegate + (peerInfo, cb) => { + delegate = new DelegatedContentRouter(peerInfo.id, { + host: '0.0.0.0', + protocol: 'http', + port: 60197 + }, [ + ma('/ip4/0.0.0.0/tcp/60194') + ]) + nodeA = new Node({ + peerInfo, + modules: { + contentRouting: [ delegate ] + }, + config: { + relay: { + enabled: true, + hop: { + enabled: true, + active: false + } + }, + EXPERIMENTAL: { + dht: true + } + } + }) + nodeA.start(cb) + } + ], done) + }) + + after((done) => nodeA.stop(done)) + + describe('provide', () => { + it('should use both the dht and delegate router to provide', (done) => { + const dhtStub = sinon.stub(nodeA._dht, 'provide').callsFake(() => {}) + const delegateStub = sinon.stub(delegate, 'provide').callsFake(() => { + expect(dhtStub.calledOnce).to.equal(true) + expect(delegateStub.calledOnce).to.equal(true) + delegateStub.restore() + dhtStub.restore() + done() + }) + nodeA.contentRouting.provide() + }) + }) + + describe('findProviders', () => { + it('should only use the dht if it finds providers', (done) => { + const results = [true] + const dhtStub = sinon.stub(nodeA._dht, 'findProviders').callsArgWith(2, null, results) + const delegateStub = sinon.stub(delegate, 'findProviders').throws(() => { + return new Error('the delegate should not have been called') + }) + + nodeA.contentRouting.findProviders('a cid', 5000, (err, results) => { + expect(err).to.not.exist() + expect(results).to.equal(results) + expect(dhtStub.calledOnce).to.equal(true) + expect(delegateStub.notCalled).to.equal(true) + delegateStub.restore() + dhtStub.restore() + done() + }) + }) + + it('should use the delegate if the dht fails to find providers', (done) => { + const results = [true] + const dhtStub = sinon.stub(nodeA._dht, 'findProviders').callsArgWith(2, null, []) + const delegateStub = sinon.stub(delegate, 'findProviders').callsArgWith(2, null, results) + + nodeA.contentRouting.findProviders('a cid', 5000, (err, results) => { + expect(err).to.not.exist() + expect(results).to.deep.equal(results) + expect(dhtStub.calledOnce).to.equal(true) + expect(delegateStub.calledOnce).to.equal(true) + delegateStub.restore() + dhtStub.restore() + done() + }) + }) + }) + }) }) diff --git a/test/peer-routing.node.js b/test/peer-routing.node.js index 068139ec4b..4d8499afa1 100644 --- a/test/peer-routing.node.js +++ b/test/peer-routing.node.js @@ -111,7 +111,7 @@ describe('.peerRouting', () => { }) createNode('/ip4/0.0.0.0/tcp/0', { modules: { - peerRouting: delegate + peerRouting: [ delegate ] } }, (err, node) => { expect(err).to.not.exist() @@ -122,7 +122,8 @@ describe('.peerRouting', () => { ], done) }) - afterEach(nock.cleanAll) + after((done) => nodeA.stop(done)) + afterEach(() => nock.cleanAll()) it('should use the delegate router to find peers', (done) => { const stub = sinon.stub(delegate, 'findPeer').callsFake(() => { @@ -192,4 +193,74 @@ describe('.peerRouting', () => { }) }) }) + + describe('via the dht and a delegate', () => { + let nodeA + let delegate + + before((done) => { + parallel([ + // Create the node using the delegate + (cb) => { + delegate = new DelegatedPeerRouter({ + host: 'ipfs.io', + protocol: 'https', + port: '443' + }) + createNode('/ip4/0.0.0.0/tcp/0', { + modules: { + peerRouting: [ delegate ] + }, + config: { + EXPERIMENTAL: { + dht: true + } + } + }, (err, node) => { + expect(err).to.not.exist() + nodeA = node + nodeA.start(cb) + }) + } + ], done) + }) + + after((done) => nodeA.stop(done)) + + describe('findPeer', () => { + it('should only use the dht if it find the peer', (done) => { + const results = [true] + const dhtStub = sinon.stub(nodeA._dht, 'findPeer').callsArgWith(1, null, results) + const delegateStub = sinon.stub(delegate, 'findPeer').throws(() => { + return new Error('the delegate should not have been called') + }) + + nodeA.peerRouting.findPeer('a peer id', (err, results) => { + expect(err).to.not.exist() + expect(results).to.equal(results) + expect(dhtStub.calledOnce).to.equal(true) + expect(delegateStub.notCalled).to.equal(true) + delegateStub.restore() + dhtStub.restore() + done() + }) + }) + + it('should use the delegate if the dht fails to find the peer', (done) => { + const results = [true] + const dhtStub = sinon.stub(nodeA._dht, 'findPeer').callsArgWith(1, null, undefined) + const delegateStub = sinon.stub(delegate, 'findPeer').callsArgWith(1, null, results) + + nodeA.peerRouting.findPeer('a peer id', (err, results) => { + expect(err).to.not.exist() + expect(results).to.deep.equal(results) + expect(dhtStub.calledOnce).to.equal(true) + expect(delegateStub.calledOnce).to.equal(true) + delegateStub.restore() + dhtStub.restore() + done() + }) + }) + }) + }) })