Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refactor to use async/await #7

Merged
merged 1 commit into from
Jul 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ const server = createServer()

module.exports = {
hooks: {
pre: server.start.bind(server),
post: server.stop.bind(server)
browser: {
pre: () => server.start(),
post: () => server.stop()
}
}
}
13 changes: 6 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@
},
"devDependencies": {
"aegir": "^19.0.5",
"async-iterator-all": "^1.0.0",
"chai": "^4.2.0",
"cids": "^0.7.1",
"go-ipfs-dep": "^0.4.21",
"ipfsd-ctl": "^0.43.0"
"go-ipfs-dep": "~0.4.17",
"ipfsd-ctl": "~0.44.1",
"peer-id": "~0.13.1"
},
"dependencies": {
"async": "^2.6.2",
"ipfs-http-client": "^33.0.2",
"multiaddr": "^6.1.0",
"peer-id": "^0.12.2",
"peer-info": "^0.15.1"
"ipfs-http-client": "^33.1.0",
"multiaddr": "^6.1.0"
},
"contributors": [
"David Dias <daviddias.p@gmail.com>",
Expand Down
64 changes: 25 additions & 39 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ const dht = require('ipfs-http-client/src/dht')
const swarm = require('ipfs-http-client/src/swarm')
const refs = require('ipfs-http-client/src/files-regular/refs')
const defaultConfig = require('ipfs-http-client/src/utils/default-config')
const series = require('async/series')
const parallel = require('async/parallel')
const reflect = require('async/reflect')
const multiaddr = require('multiaddr')

const DEFAULT_MAX_TIMEOUT = 30e3 // 30 second default
Expand Down Expand Up @@ -60,26 +57,18 @@ class DelegatedContentRouting {
* @param {CID} key
* @param {object} options
* @param {number} options.maxTimeout How long the query can take. Defaults to 30 seconds
* @param {function(Error, Array<PeerInfo>)} callback
* @returns {void}
* @returns {AsyncIterable<PeerInfo>}
*/
findProviders (key, options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
} else if (typeof options === 'number') { // This will be deprecated in a next release
options = {
maxTimeout: options
}
} else {
options = options || {}
}

async * findProviders (key, options = {}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 it will be nice to have an iterable api all ready to go once the dht supports it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Death to lists. Iterate all of the things.

Seriously, it's going to alleviate so many memory problems..

options.maxTimeout = options.maxTimeout || DEFAULT_MAX_TIMEOUT

this.dht.findProvs(key.toString(), {
const results = await this.dht.findProvs(key, {
timeout: `${options.maxTimeout}ms` // The api requires specification of the time unit (s/ms)
}, callback)
})

for (let i = 0; i < results.length; i++) {
yield results[i]
}
}

/**
Expand All @@ -91,32 +80,29 @@ class DelegatedContentRouting {
*
* @param {CID} key
* @param {function(Error)} callback
* @returns {void}
* @returns {Promise<void>}
*/
provide (key, callback) {
async provide (key) {
const addrs = this.bootstrappers.map((addr) => {
return addr.encapsulate(`/p2p-circuit/ipfs/${this.peerId.toB58String()}`)
})

series([
(cb) => parallel(addrs.map((addr) => {
return reflect((cb) => this.swarm.connect(addr.toString(), cb))
}), (err, results) => {
if (err) {
return cb(err)
}
const results = await Promise.all(
addrs.map((addr) => {
return this.swarm.connect(addr.toString()).catch(() => {})
})
)

// only some need to succeed
const success = results.filter((res) => res.error == null)
if (success.length === 0) {
return cb(new Error('unable to swarm.connect using p2p-circuit'))
}
cb()
}),
(cb) => {
this.refs(key.toString(), { recursive: true }, cb)
}
], (err) => callback(err))
// only some need to succeed
const success = results.filter((res) => res && res.error == null)

if (success.length === 0) {
throw new Error('unable to swarm.connect using p2p-circuit')
}

this.refs(key.toBaseEncodedString(), {
recursive: true
})
}
}

Expand Down
207 changes: 78 additions & 129 deletions test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,17 @@

const expect = require('chai').expect
const IPFSFactory = require('ipfsd-ctl')
const parallel = require('async/parallel')
const waterfall = require('async/waterfall')
const CID = require('cids')
const PeerId = require('peer-id')

const factory = IPFSFactory.create({ type: 'go' })
const all = require('async-iterator-all')
const factory = IPFSFactory.create({
type: 'go'
})

const DelegatedContentRouting = require('../src')

function spawnNode (bootstrap, callback) {
if (typeof bootstrap === 'function') {
callback = bootstrap
bootstrap = []
}

factory.spawn({
async function spawnNode (bootstrap = []) {
const node = await factory.spawn({
// Lock down the nodes so testing can be deterministic
config: {
Bootstrap: bootstrap,
Expand All @@ -28,59 +23,47 @@ function spawnNode (bootstrap, callback) {
}
}
}
}, (err, node) => {
if (err) return callback(err)
})

node.api.id((err, id) => {
if (err) return callback(err)
const id = await node.api.id()

callback(null, node, id)
})
})
return {
node,
id
}
}

describe('DelegatedContentRouting', function () {
this.timeout(20 * 1000) // we're spawning daemons, give ci some time

let selfNode
let selfId
let delegatedNode
let delegatedId
let delegateNode
let bootstrapNode
let bootstrapId

before((done) => {
waterfall([
// Spawn a "bootstrap" node that doesnt connect to anything
(cb) => spawnNode(cb),
(ipfsd, id, cb) => {
bootstrapNode = ipfsd
bootstrapId = id
cb()
},
// Spawn our local node and bootstrap the bootstrapper node
(cb) => spawnNode(bootstrapId.addresses, cb),
(ipfsd, id, cb) => {
selfNode = ipfsd
selfId = PeerId.createFromB58String(id.id)
cb()
},
// Spawn the delegate node and bootstrap the bootstrapper node
(cb) => spawnNode(bootstrapId.addresses, cb),
(ipfsd, id, cb) => {
delegatedNode = ipfsd
delegatedId = PeerId.createFromB58String(id.id)
cb()
}
], done)
before(async () => {
// Spawn a "Boostrap" node that doesnt connect to anything
const bootstrap = await spawnNode()
bootstrapNode = bootstrap.node
bootstrapId = bootstrap.id

// Spawn our local node and bootstrap the bootstrapper node
const self = await spawnNode(bootstrapId.addresses)
selfNode = self.node
selfId = PeerId.createFromB58String(self.id.id)

// Spawn the delegate node and bootstrap the bootstrapper node
const delegate = await spawnNode(bootstrapId.addresses)
delegateNode = delegate.node
})

after((done) => {
parallel([
(cb) => selfNode.stop(cb),
(cb) => delegatedNode.stop(cb),
(cb) => bootstrapNode.stop(cb)
], done)
after(() => {
return Promise.all([
selfNode.stop(),
delegateNode.stop(),
bootstrapNode.stop()
])
})

describe('create', () => {
Expand Down Expand Up @@ -127,96 +110,62 @@ describe('DelegatedContentRouting', function () {

describe('findProviders', () => {
const cid = new CID('QmS4ustL54uo8FzR9455qaxZwuMiUhyvMcX9Ba8nUH4uVv')
before('register providers', (done) => {
parallel([
(cb) => bootstrapNode.api.dht.provide(cid, cb),
(cb) => selfNode.api.dht.provide(cid, cb)
], done)

before('register providers', async () => {
await bootstrapNode.api.dht.provide(cid)
await selfNode.api.dht.provide(cid)
})

it('should be able to find providers through the delegate node', function (done) {
waterfall([
(cb) => {
const opts = delegatedNode.apiAddr.toOptions()
const routing = new DelegatedContentRouting(selfId, {
protocol: 'http',
port: opts.port,
host: opts.host
})
routing.findProviders(cid, cb)
},
(providers, cb) => {
// We should get our local node and the bootstrap node as providers.
// The delegate node is not included, because it is handling the requests
expect(providers).to.have.length(2)
expect(providers.map((p) => p.id.toB58String())).to.have.members([
bootstrapId.id,
selfId.toB58String()
])
cb()
}
], done)
it('should be able to find providers through the delegate node', async () => {
const opts = delegateNode.apiAddr.toOptions()
const routing = new DelegatedContentRouting(selfId, {
protocol: 'http',
port: opts.port,
host: opts.host
})

const providers = await all(routing.findProviders(cid))

// We should get the bootstrap node as provider
// The delegate node is not included, because it is handling the requests
expect(providers.map((p) => p.id.toB58String())).to.include(bootstrapId.id, 'Did not include bootstrap node')
expect(providers.map((p) => p.id.toB58String())).to.include(selfId.toB58String(), 'Did not include self node')
})

it('should be able to specify a maxTimeout', function (done) {
waterfall([
(cb) => {
const opts = delegatedNode.apiAddr.toOptions()
const routing = new DelegatedContentRouting(selfId, {
protocol: 'http',
port: opts.port,
host: opts.host
})
const cid = new CID('QmS4ustL54uo8FzR9455qaxZwuMiUhyvMcX9Ba8nUH4uVv')
routing.findProviders(cid, { maxTimeout: 5e3 }, cb)
},
(providers, cb) => {
// We should get our local node and the bootstrap node as providers.
// The delegate node is not included, because it is handling the requests
expect(providers).to.have.length(2)
expect(providers.map((p) => p.id.toB58String())).to.have.members([
bootstrapId.id,
selfId.toB58String()
])
cb()
}
], done)
it('should be able to specify a maxTimeout', async () => {
const opts = delegateNode.apiAddr.toOptions()
const routing = new DelegatedContentRouting(selfId, {
protocol: 'http',
port: opts.port,
host: opts.host
})

const cid = new CID('QmS4ustL54uo8FzR9455qaxZwuMiUhyvMcX9Ba8nUH4uVv')
const providers = await all(routing.findProviders(cid, { maxTimeout: 5e3 }))

expect(providers.map((p) => p.id.toB58String())).to.include(bootstrapId.id, 'Did not include bootstrap node')
})
})

describe('provide', () => {
it('should be able to register as a content provider to the delegate node', function (done) {
it('should be able to register as a content provider to the delegate node', async () => {
let contentRouter
let cid

waterfall([
(cb) => {
const opts = delegatedNode.apiAddr.toOptions()
contentRouter = new DelegatedContentRouting(selfId, {
protocol: 'http',
port: opts.port,
host: opts.host
})

selfNode.api.add(Buffer.from(`hello-${Math.random()}`), cb)
},
(res, cb) => {
cid = new CID(res[0].hash)
contentRouter.provide(cid, cb)
},
(cb) => {
delegatedNode.api.dht.findProvs(cid, cb)
},
(providers, cb) => {
const providerIds = providers.map(p => p.id.toB58String())
// The delegate should be a provider
expect(providerIds).to.have.members([
selfId.toB58String(),
delegatedId.toB58String()
])
cb()
}
], done)
const opts = delegateNode.apiAddr.toOptions()
contentRouter = new DelegatedContentRouting(selfId, {
protocol: 'http',
port: opts.port,
host: opts.host
})

const res = await selfNode.api.add(Buffer.from(`hello-${Math.random()}`))
cid = new CID(res[0].hash)
await contentRouter.provide(cid)
const providers = await delegateNode.api.dht.findProvs(cid.toBaseEncodedString())

// We are hosting the file, validate we're the provider
expect(providers.map((p) => p.id.toB58String())).to.include(selfId.toB58String(), 'Did not include self node')
})
})
})