Skip to content

Commit

Permalink
feat: initial implementation (#1)
Browse files Browse the repository at this point in the history
* feat: implement findProviders

* feat: implement provide

* cleanup

* chore(ci): switch to jenkins
  • Loading branch information
dignifiedquire authored and daviddias committed Jun 26, 2018
1 parent 8e293ee commit 011adc9
Show file tree
Hide file tree
Showing 7 changed files with 366 additions and 0 deletions.
12 changes: 12 additions & 0 deletions .aegir.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
'use strict'

const createServer = require('ipfsd-ctl').createServer

const server = createServer()

module.exports = {
hooks: {
pre: server.start.bind(server),
post: server.stop.bind(server)
}
}
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,4 @@ typings/
# dotenv environment variables file
.env

yarn.lock
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,32 @@
# js-libp2p-delegated-content-routing

Leverage other peers in the network to perform Content Routing calls.

## Example

```
const DelegatedContentRouting = require('libp2p-delegated-content-routing')
// default is to use ipfs.io
const routing = new DelegatedContentRouing()
routing.findProviders(key, (err, peerInfos) => {
if (err) {
return console.error(err)
}
console.log('found peers', peerInfos)
})
routing.provide(key, (err) => {
if (err) {
return console.error(err)
}
console.log('providing %s', key)
})
```

## License

MIT
1 change: 1 addition & 0 deletions ci/Jenkinsfile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
javascript()
32 changes: 32 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"name": "libp2p-delegated-content-routing",
"version": "0.1.0",
"main": "src/index.js",
"repository": "git@github.com:libp2p/js-libp2p-delegated-content-routing.git",
"author": "dignifiedquire <dignifiedquire@gmail.com>",
"license": "MIT",
"private": false,
"scripts": {
"lint": "aegir lint",
"build": "aegir build",
"test": "aegir test",
"test:node": "aegir test --target node",
"release": "aegir release",
"release-minor": "aegir release --type minor",
"release-major": "aegir release --type major",
"coverage": "aegir coverage"
},
"devDependencies": {
"aegir": "^13.0.7",
"chai": "^4.1.2",
"cids": "^0.5.3",
"go-ipfs-dep": "^0.4.14",
"ipfsd-ctl": "^0.32.1"
},
"dependencies": {
"async": "^2.6.0",
"ipfs-api": "^20.0.1",
"multiaddr": "^4.0.0",
"peer-info": "^0.14.1"
}
}
126 changes: 126 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
'use strict'

const PeerInfo = require('peer-info')
const dht = require('ipfs-api/src/dht')
const swarm = require('ipfs-api/src/swarm')
const refs = require('ipfs-api/src/refs')
const defaultConfig = require('ipfs-api/src/utils/default-config')
const series = require('async/series')
const parallel = require('async/parallel')
const reflect = require('async/reflect')
const multiaddr = require('multiaddr')

const DEFAULT_IPFS_API = {
protocol: 'https',
port: 443,
host: 'ipfs.io'
}

const DEFAULT_BOOSTRAP_NODES = [
'/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd',
'/ipfs/QmSoLMeWqB7YGVLJN3pNLQpmmEk35v6wYtsMGLzSr5QBU3',
'/ipfs/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM',
'/ipfs/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu',
'/ipfs/QmSoLueR4xBeUbY9WZ9xGUUxunbKWcrNFTDAadQJmocnWm',
'/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64',
'/ipfs/QmZMxNdpMkewiVZLMRxaNxUeZpDUb34pWjZ1kZvsd16Zic',
'/ipfs/Qmbut9Ywz9YEDrz8ySBSgWyJk41Uvm2QJPhwDJzJyGFsD6'
]

/**
* An implementation of content routing, using a delegated peer.
*/
class DelegatedContentRouting {
/**
* Create a new DelegatedContentRouting instance.
*
* @param {PeerInfo} peerInfo - the node that is using this routing.
* @param {object} [api] - the api endpoint of the delegated node to use.
* @param {Array<Multiaddr>} [bootstrappers] - list of bootstrapper nodes we are connected to.
*/
constructor (peerInfo, api, bootstrappers) {
if (peerInfo == null) {
throw new Error('missing self peerInfo')
}

this.api = Object.assign({}, defaultConfig(), api || DEFAULT_IPFS_API)
this.dht = dht(this.api)
this.swarm = swarm(this.api)
this.refs = refs(this.api)

this.peerInfo = peerInfo
this.bootstrappers = bootstrappers || DEFAULT_BOOSTRAP_NODES.map((addr) => multiaddr(addr))
}

/**
* Search the dht for providers of the given CID.
*
* - call `findProviders` on the delegated node.
* - does not support the `timeout` parameter, as this is specific to the delegate node.
*
* @param {CID} key
* @param {function(Error, Array<PeerInfo>)} callback
* @returns {void}
*/
findProviders (key, callback) {
this.dht.findprovs(key, (err, results) => {
if (err) {
return callback(err)
}

// cleanup result from ipfs-api
const infos = []
results
.filter((res) => Boolean(res.Responses))
.forEach((res) => {
res.Responses.forEach((raw) => {
const info = new PeerInfo(raw.ID)
if (raw.Addrs) {
raw.Addrs.forEach((addr) => info.multiaddrs.add(addr))
}
infos.push(info)
})
})

callback(null, infos)
})
}

/**
* Announce to the network that the delegated node can provide the given key.
*
* Currently this uses the following hack
* - call swarm.connect on the delegated node to us, to ensure we are connected
* - call refs --recursive on the delegated node, so it fetches the content
*
* @param {CID} key
* @param {function(Error)} callback
* @returns {void}
*/
provide (key, callback) {
const addrs = this.bootstrappers.map((addr) => {
return addr.encapsulate(`/p2p-circuit/ipfs/${this.peerInfo.id}`)
})
series([
// TODO: do we want to connect through all of them?
(cb) => parallel(addrs.map((addr) => {
return reflect((cb) => this.swarm.connect(addr.toString(), cb))
}), (err, results) => {
if (err) {
return cb(err)
}
// only some need to succeed
const success = results.filter((res) => res.error == null)
if (success.length === 0) {
return cb(new Error('unable to swarm.connect using p2p-circuit'))
}
cb()
}),
(cb) => {
this.refs(key.toBaseEncodedString(), {recursive: true}, cb)
}
], (err) => callback(err))
}
}

module.exports = DelegatedContentRouting
164 changes: 164 additions & 0 deletions test/index.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/* eslint-env mocha */
'use strict'

const expect = require('chai').expect
const IPFSFactory = require('ipfsd-ctl')
const async = require('async')
const CID = require('cids')
const IPFSApi = require('ipfs-api')

const factory = IPFSFactory.create({ type: 'go' })

const DelegatedContentRouting = require('../src')

describe('DelegatedContentRouting', () => {
let selfNode
let selfId

beforeEach((done) => {
factory.spawn((err, node) => {
if (err != null) {
return done(err)
}
selfNode = node

selfNode.api.id((err, id) => {
if (err) {
return done(err)
}
selfId = id
done()
})
})
})

afterEach(() => {
selfNode.stop()
})

describe('findProviders', () => {
it('fetches providers on the connected node', function (done) {
this.timeout(100000)

let ipfsd

async.waterfall([
(cb) => factory.spawn(cb),
(_ipfsd, cb) => {
ipfsd = _ipfsd
const opts = ipfsd.apiAddr.toOptions()
const routing = new DelegatedContentRouting(selfId, {
protocol: 'http',
port: opts.port,
host: opts.host
})
const cid = 'QmS4ustL54uo8FzR9455qaxZwuMiUhyvMcX9Ba8nUH4uVv'
routing.findProviders(cid, cb)
},
(providers, cb) => {
expect(providers).to.have.lengthOf.above(0)

ipfsd.stop()
cb()
}
], done)
})

// skipping, as otherwise CI will randomly break
it.skip('fetches providers on the connected node (using ipfs.io)', function (done) {
this.timeout(100000)

const routing = new DelegatedContentRouting(selfId)
const cid = 'QmS4ustL54uo8FzR9455qaxZwuMiUhyvMcX9Ba8nUH4uVv'

async.waterfall([
(cb) => routing.findProviders(cid, cb),
(providers, cb) => {
expect(providers).to.have.lengthOf.above(0)
cb()
}
], done)
})
})

describe.only('provide', () => {
it('makes content available on the delegated node', function (done) {
this.timeout(100000)

let routing
let ipfsd
let cid
let delegateId
async.waterfall([
(cb) => factory.spawn(cb),
(_ipfsd, cb) => {
ipfsd = _ipfsd
const opts = ipfsd.apiAddr.toOptions()
routing = new DelegatedContentRouting(selfId, {
protocol: 'http',
port: opts.port,
host: opts.host
})

selfNode.api.files.add(Buffer.from(`hello-${Math.random()}`), cb)
},
(res, cb) => {
cid = new CID(res[0].hash)
routing.provide(cid, cb)
},
(cb) => ipfsd.api.id(cb),
(id, cb) => {
delegateId = id
ipfsd.api.dht.findprovs(cid.toBaseEncodedString(), {n: 1}, cb)
},
(provs, cb) => {
let providers = []
provs.filter((res) => Boolean(res.Responses)).forEach((res) => {
providers = providers.concat(res.Responses)
})

const res = providers.find((prov) => prov.ID === delegateId.id)
expect(res != null).to.be.eql(true)

ipfsd.stop()
cb()
}
], done)
})

// skipping, as otherwise CI will randomly break
it.skip('makes content available on the delegated node (using ipfs.io)', function (done) {
this.timeout(100000)

const routing = new DelegatedContentRouting(selfId)
const api = new IPFSApi(routing.api)

let cid

async.waterfall([
(cb) => {
selfNode.api.files.add(Buffer.from(`hello-${Math.random()}`), cb)
},
(res, cb) => {
cid = new CID(res[0].hash)
routing.provide(cid, cb)
},
(cb) => {
console.log('findprovs')
// TODO: this does not return, why?
api.dht.findprovs(cid.toBaseEncodedString(), {n: 1}, cb)
},
(provs, cb) => {
let providers = []
provs.filter((res) => Boolean(res.Responses)).forEach((res) => {
providers = providers.concat(res.Responses)
})
console.log('got provs', providers)
expect(providers).to.have.lengthOf.above(0)

cb()
}
], done)
})
})
})

0 comments on commit 011adc9

Please sign in to comment.