diff --git a/.aegir.js b/.aegir.js new file mode 100644 index 0000000..f8c90e7 --- /dev/null +++ b/.aegir.js @@ -0,0 +1,12 @@ +'use strict' + +const createServer = require('ipfsd-ctl').createServer + +const server = createServer() + +module.exports = { + hooks: { + pre: server.start.bind(server), + post: server.stop.bind(server) + } +} diff --git a/.gitignore b/.gitignore index 00cbbdf..edec2c6 100644 --- a/.gitignore +++ b/.gitignore @@ -57,3 +57,4 @@ typings/ # dotenv environment variables file .env +yarn.lock \ No newline at end of file diff --git a/README.md b/README.md index b13a8e1..0ccb37f 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,32 @@ # js-libp2p-delegated-content-routing + Leverage other peers in the network to perform Content Routing calls. + +## Example + +``` +const DelegatedContentRouting = require('libp2p-delegated-content-routing') + +// default is to use ipfs.io +const routing = new DelegatedContentRouing() + +routing.findProviders(key, (err, peerInfos) => { + if (err) { + return console.error(err) + } + + console.log('found peers', peerInfos) +}) + +routing.provide(key, (err) => { + if (err) { + return console.error(err) + } + + console.log('providing %s', key) +}) +``` + +## License + +MIT diff --git a/ci/Jenkinsfile b/ci/Jenkinsfile new file mode 100644 index 0000000..704e61b --- /dev/null +++ b/ci/Jenkinsfile @@ -0,0 +1 @@ +javascript() \ No newline at end of file diff --git a/package.json b/package.json new file mode 100644 index 0000000..c04ab76 --- /dev/null +++ b/package.json @@ -0,0 +1,32 @@ +{ + "name": "libp2p-delegated-content-routing", + "version": "0.1.0", + "main": "src/index.js", + "repository": "git@github.com:libp2p/js-libp2p-delegated-content-routing.git", + "author": "dignifiedquire ", + "license": "MIT", + "private": false, + "scripts": { + "lint": "aegir lint", + "build": "aegir build", + "test": "aegir test", + "test:node": "aegir test --target node", + "release": "aegir release", + "release-minor": "aegir release --type minor", + "release-major": "aegir release --type major", + "coverage": "aegir coverage" + }, + "devDependencies": { + "aegir": "^13.0.7", + "chai": "^4.1.2", + "cids": "^0.5.3", + "go-ipfs-dep": "^0.4.14", + "ipfsd-ctl": "^0.32.1" + }, + "dependencies": { + "async": "^2.6.0", + "ipfs-api": "^20.0.1", + "multiaddr": "^4.0.0", + "peer-info": "^0.14.1" + } +} diff --git a/src/index.js b/src/index.js new file mode 100644 index 0000000..e4a2e18 --- /dev/null +++ b/src/index.js @@ -0,0 +1,126 @@ +'use strict' + +const PeerInfo = require('peer-info') +const dht = require('ipfs-api/src/dht') +const swarm = require('ipfs-api/src/swarm') +const refs = require('ipfs-api/src/refs') +const defaultConfig = require('ipfs-api/src/utils/default-config') +const series = require('async/series') +const parallel = require('async/parallel') +const reflect = require('async/reflect') +const multiaddr = require('multiaddr') + +const DEFAULT_IPFS_API = { + protocol: 'https', + port: 443, + host: 'ipfs.io' +} + +const DEFAULT_BOOSTRAP_NODES = [ + '/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd', + '/ipfs/QmSoLMeWqB7YGVLJN3pNLQpmmEk35v6wYtsMGLzSr5QBU3', + '/ipfs/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM', + '/ipfs/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu', + '/ipfs/QmSoLueR4xBeUbY9WZ9xGUUxunbKWcrNFTDAadQJmocnWm', + '/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64', + '/ipfs/QmZMxNdpMkewiVZLMRxaNxUeZpDUb34pWjZ1kZvsd16Zic', + '/ipfs/Qmbut9Ywz9YEDrz8ySBSgWyJk41Uvm2QJPhwDJzJyGFsD6' +] + +/** + * An implementation of content routing, using a delegated peer. + */ +class DelegatedContentRouting { + /** + * Create a new DelegatedContentRouting instance. + * + * @param {PeerInfo} peerInfo - the node that is using this routing. + * @param {object} [api] - the api endpoint of the delegated node to use. + * @param {Array} [bootstrappers] - list of bootstrapper nodes we are connected to. + */ + constructor (peerInfo, api, bootstrappers) { + if (peerInfo == null) { + throw new Error('missing self peerInfo') + } + + this.api = Object.assign({}, defaultConfig(), api || DEFAULT_IPFS_API) + this.dht = dht(this.api) + this.swarm = swarm(this.api) + this.refs = refs(this.api) + + this.peerInfo = peerInfo + this.bootstrappers = bootstrappers || DEFAULT_BOOSTRAP_NODES.map((addr) => multiaddr(addr)) + } + + /** + * Search the dht for providers of the given CID. + * + * - call `findProviders` on the delegated node. + * - does not support the `timeout` parameter, as this is specific to the delegate node. + * + * @param {CID} key + * @param {function(Error, Array)} callback + * @returns {void} + */ + findProviders (key, callback) { + this.dht.findprovs(key, (err, results) => { + if (err) { + return callback(err) + } + + // cleanup result from ipfs-api + const infos = [] + results + .filter((res) => Boolean(res.Responses)) + .forEach((res) => { + res.Responses.forEach((raw) => { + const info = new PeerInfo(raw.ID) + if (raw.Addrs) { + raw.Addrs.forEach((addr) => info.multiaddrs.add(addr)) + } + infos.push(info) + }) + }) + + callback(null, infos) + }) + } + + /** + * Announce to the network that the delegated node can provide the given key. + * + * Currently this uses the following hack + * - call swarm.connect on the delegated node to us, to ensure we are connected + * - call refs --recursive on the delegated node, so it fetches the content + * + * @param {CID} key + * @param {function(Error)} callback + * @returns {void} + */ + provide (key, callback) { + const addrs = this.bootstrappers.map((addr) => { + return addr.encapsulate(`/p2p-circuit/ipfs/${this.peerInfo.id}`) + }) + series([ + // TODO: do we want to connect through all of them? + (cb) => parallel(addrs.map((addr) => { + return reflect((cb) => this.swarm.connect(addr.toString(), cb)) + }), (err, results) => { + if (err) { + return cb(err) + } + // only some need to succeed + const success = results.filter((res) => res.error == null) + if (success.length === 0) { + return cb(new Error('unable to swarm.connect using p2p-circuit')) + } + cb() + }), + (cb) => { + this.refs(key.toBaseEncodedString(), {recursive: true}, cb) + } + ], (err) => callback(err)) + } +} + +module.exports = DelegatedContentRouting diff --git a/test/index.spec.js b/test/index.spec.js new file mode 100644 index 0000000..52ec6c1 --- /dev/null +++ b/test/index.spec.js @@ -0,0 +1,164 @@ +/* eslint-env mocha */ +'use strict' + +const expect = require('chai').expect +const IPFSFactory = require('ipfsd-ctl') +const async = require('async') +const CID = require('cids') +const IPFSApi = require('ipfs-api') + +const factory = IPFSFactory.create({ type: 'go' }) + +const DelegatedContentRouting = require('../src') + +describe('DelegatedContentRouting', () => { + let selfNode + let selfId + + beforeEach((done) => { + factory.spawn((err, node) => { + if (err != null) { + return done(err) + } + selfNode = node + + selfNode.api.id((err, id) => { + if (err) { + return done(err) + } + selfId = id + done() + }) + }) + }) + + afterEach(() => { + selfNode.stop() + }) + + describe('findProviders', () => { + it('fetches providers on the connected node', function (done) { + this.timeout(100000) + + let ipfsd + + async.waterfall([ + (cb) => factory.spawn(cb), + (_ipfsd, cb) => { + ipfsd = _ipfsd + const opts = ipfsd.apiAddr.toOptions() + const routing = new DelegatedContentRouting(selfId, { + protocol: 'http', + port: opts.port, + host: opts.host + }) + const cid = 'QmS4ustL54uo8FzR9455qaxZwuMiUhyvMcX9Ba8nUH4uVv' + routing.findProviders(cid, cb) + }, + (providers, cb) => { + expect(providers).to.have.lengthOf.above(0) + + ipfsd.stop() + cb() + } + ], done) + }) + + // skipping, as otherwise CI will randomly break + it.skip('fetches providers on the connected node (using ipfs.io)', function (done) { + this.timeout(100000) + + const routing = new DelegatedContentRouting(selfId) + const cid = 'QmS4ustL54uo8FzR9455qaxZwuMiUhyvMcX9Ba8nUH4uVv' + + async.waterfall([ + (cb) => routing.findProviders(cid, cb), + (providers, cb) => { + expect(providers).to.have.lengthOf.above(0) + cb() + } + ], done) + }) + }) + + describe.only('provide', () => { + it('makes content available on the delegated node', function (done) { + this.timeout(100000) + + let routing + let ipfsd + let cid + let delegateId + async.waterfall([ + (cb) => factory.spawn(cb), + (_ipfsd, cb) => { + ipfsd = _ipfsd + const opts = ipfsd.apiAddr.toOptions() + routing = new DelegatedContentRouting(selfId, { + protocol: 'http', + port: opts.port, + host: opts.host + }) + + selfNode.api.files.add(Buffer.from(`hello-${Math.random()}`), cb) + }, + (res, cb) => { + cid = new CID(res[0].hash) + routing.provide(cid, cb) + }, + (cb) => ipfsd.api.id(cb), + (id, cb) => { + delegateId = id + ipfsd.api.dht.findprovs(cid.toBaseEncodedString(), {n: 1}, cb) + }, + (provs, cb) => { + let providers = [] + provs.filter((res) => Boolean(res.Responses)).forEach((res) => { + providers = providers.concat(res.Responses) + }) + + const res = providers.find((prov) => prov.ID === delegateId.id) + expect(res != null).to.be.eql(true) + + ipfsd.stop() + cb() + } + ], done) + }) + + // skipping, as otherwise CI will randomly break + it.skip('makes content available on the delegated node (using ipfs.io)', function (done) { + this.timeout(100000) + + const routing = new DelegatedContentRouting(selfId) + const api = new IPFSApi(routing.api) + + let cid + + async.waterfall([ + (cb) => { + selfNode.api.files.add(Buffer.from(`hello-${Math.random()}`), cb) + }, + (res, cb) => { + cid = new CID(res[0].hash) + routing.provide(cid, cb) + }, + (cb) => { + console.log('findprovs') + // TODO: this does not return, why? + api.dht.findprovs(cid.toBaseEncodedString(), {n: 1}, cb) + }, + (provs, cb) => { + let providers = [] + provs.filter((res) => Boolean(res.Responses)).forEach((res) => { + providers = providers.concat(res.Responses) + }) + console.log('got provs', providers) + expect(providers).to.have.lengthOf.above(0) + + cb() + } + ], done) + }) + }) +})