Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
feat: start random walk and allow configuration for disabling (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos authored Oct 1, 2018
1 parent 0884fe2 commit abe9407
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 25 deletions.
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,14 @@
"datastore-level": "~0.8.0",
"dirty-chai": "^2.0.1",
"interface-connection": "~0.3.2",
"libp2p-mplex": "~0.8.0",
"libp2p-mplex": "~0.8.1",
"libp2p-switch": "~0.40.5",
"libp2p-tcp": "~0.12.0",
"lodash": "^4.17.10",
"lodash.random": "^3.2.0",
"lodash.range": "^3.2.0",
"peer-book": "~0.8.0"
"peer-book": "~0.8.0",
"sinon": "^6.3.4"
},
"contributors": [
"Blake Byrnes <blakebyrnes@gmail.com>",
Expand Down
29 changes: 23 additions & 6 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ class KadDHT {
/**
* Create a new KadDHT.
*
* @param {Switch} sw
* @param {object} options // {kBucketSize=20, datastore=MemoryDatastore}
* @param {Switch} sw libp2p-switch instance
* @param {object} options DHT options
* @param {number} options.kBucketSize k-bucket size (default 20)
* @param {Datastore} options.datastore datastore (default MemoryDatastore)
* @param {boolean} options.enabledDiscovery enable dht discovery (default true)
*/
constructor (sw, options) {
assert(sw, 'libp2p-kad-dht requires a instance of Switch')
Expand Down Expand Up @@ -96,6 +99,11 @@ class KadDHT {
* @type {RandomWalk}
*/
this.randomWalk = new RandomWalk(this)

/**
* Random walk state, default true
*/
this.randomWalkEnabled = !options.hasOwnProperty('enabledDiscovery') ? true : Boolean(options.enabledDiscovery)
}

/**
Expand All @@ -115,7 +123,15 @@ class KadDHT {
*/
start (callback) {
this._running = true
this.network.start(callback)
this.network.start((err) => {
if (err) {
return callback(err)
}

// Start random walk if enabled
this.randomWalkEnabled && this.randomWalk.start()
callback()
})
}

/**
Expand All @@ -127,9 +143,10 @@ class KadDHT {
*/
stop (callback) {
this._running = false
this.randomWalk.stop()
this.providers.stop()
this.network.stop(callback)
this.randomWalk.stop(() => { // guarantee that random walk is stopped if it was started
this.providers.stop()
this.network.stop(callback)
})
}

/**
Expand Down
61 changes: 51 additions & 10 deletions src/random-walk.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const c = require('./constants')
class RandomWalk {
constructor (kadDHT) {
assert(kadDHT, 'Random Walk needs an instance of the Kademlia DHT')
this._running = false
this._runningHandle = null
this._kadDHT = kadDHT
}

Expand All @@ -34,33 +34,70 @@ class RandomWalk {
// Don't run twice
if (this._running) { return }

this._running = setInterval(
() => this._walk(queries, maxTimeout),
period
)
// Create running handle
const runningHandle = {
_onCancel: null,
_timeoutId: null,
runPeriodically: (fn, period) => {
runningHandle._timeoutId = setTimeout(() => {
runningHandle._timeoutId = null

fn((nextPeriod) => {
// Was walk cancelled while fn was being called?
if (runningHandle._onCancel) {
return runningHandle._onCancel()
}
// Schedule next
runningHandle.runPeriodically(fn, nextPeriod)
})
}, period)
},
cancel: (cb) => {
// Not currently running, can callback immediately
if (runningHandle._timeoutId) {
clearTimeout(runningHandle._timeoutId)
return cb()
}
// Wait to finish and then call callback
runningHandle._onCancel = cb
}
}

// Start runner
runningHandle.runPeriodically((done) => {
this._walk(queries, maxTimeout, () => done(period))
}, period)
this._runningHandle = runningHandle
}

/**
* Stop the random-walk process.
* @param {function(Error)} callback
*
* @returns {void}
*/
stop () {
if (this._running) {
clearInterval(this._running)
stop (callback) {
const runningHandle = this._runningHandle

if (!runningHandle) {
return callback()
}

this._runningHandle = null
runningHandle.cancel(callback)
}

/**
* Do the random walk work.
*
* @param {number} queries
* @param {number} maxTimeout
* @param {function(Error)} callback
* @returns {void}
*
* @private
*/
_walk (queries, maxTimeout) {
_walk (queries, maxTimeout, callback) {
this._kadDHT._log('random-walk:start')

times(queries, (i, cb) => {
Expand All @@ -70,9 +107,13 @@ class RandomWalk {
this._query(id, cb)
}, maxTimeout)(cb)
], (err) => {
if (err) { return this._kadDHT._log.error('random-walk:error', err) }
if (err) {
this._kadDHT._log.error('random-walk:error', err)
return callback(err)
}

this._kadDHT._log('random-walk:done')
callback(null)
})
})
}
Expand Down
69 changes: 66 additions & 3 deletions test/kad-dht.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
const chai = require('chai')
chai.use(require('dirty-chai'))
const expect = chai.expect
const sinon = require('sinon')
const series = require('async/series')
const times = require('async/times')
const parallel = require('async/parallel')
Expand Down Expand Up @@ -69,7 +70,7 @@ function connect (a, b, callback) {

function bootstrap (dhts) {
dhts.forEach((dht) => {
dht.randomWalk._walk(3, 10000)
dht.randomWalk._walk(3, 10000, () => {}) // don't need to know when it finishes
})
}

Expand Down Expand Up @@ -135,6 +136,68 @@ describe('KadDHT', () => {
expect(dht).to.have.property('routingTable')
})

it('should be able to start and stop', function (done) {
const sw = new Switch(peerInfos[0], new PeerBook())
sw.transport.add('tcp', new TCP())
sw.connection.addStreamMuxer(Mplex)
sw.connection.reuse()
const dht = new KadDHT(sw)

sinon.spy(dht.network, 'start')
sinon.spy(dht.randomWalk, 'start')

sinon.spy(dht.network, 'stop')
sinon.spy(dht.randomWalk, 'stop')

series([
(cb) => dht.start(cb),
(cb) => {
expect(dht.network.start.calledOnce).to.equal(true)
expect(dht.randomWalk.start.calledOnce).to.equal(true)

cb()
},
(cb) => dht.stop(cb)
], (err) => {
expect(err).to.not.exist()
expect(dht.network.stop.calledOnce).to.equal(true)
expect(dht.randomWalk.stop.calledOnce).to.equal(true)

done()
})
})

it('should be able to start with random-walk disabled', function (done) {
const sw = new Switch(peerInfos[0], new PeerBook())
sw.transport.add('tcp', new TCP())
sw.connection.addStreamMuxer(Mplex)
sw.connection.reuse()
const dht = new KadDHT(sw, { enabledDiscovery: false })

sinon.spy(dht.network, 'start')
sinon.spy(dht.randomWalk, 'start')

sinon.spy(dht.network, 'stop')
sinon.spy(dht.randomWalk, 'stop')

series([
(cb) => dht.start(cb),
(cb) => {
expect(dht.network.start.calledOnce).to.equal(true)
expect(dht.randomWalk.start.calledOnce).to.equal(false)

cb()
},
(cb) => dht.stop(cb)
], (err) => {
expect(err).to.not.exist()
expect(dht.network.stop.calledOnce).to.equal(true)
expect(dht.randomWalk.stop.calledOnce).to.equal(true) // Should be always disabled, as it can be started using the instance

done()
})
})

it('put - get', function (done) {
this.timeout(10 * 1000)
const tdht = new TestDHT()
Expand Down Expand Up @@ -206,7 +269,8 @@ describe('KadDHT', () => {
const nDHTs = 20
const tdht = new TestDHT()

tdht.spawn(nDHTs, (err, dhts) => {
// random walk disabled for a manual usage
tdht.spawn(nDHTs, { enabledDiscovery: false }, (err, dhts) => {
expect(err).to.not.exist()

series([
Expand All @@ -217,7 +281,6 @@ describe('KadDHT', () => {
(cb) => {
bootstrap(dhts)
waitForWellFormedTables(dhts, 7, 0, 20 * 1000, cb)
cb()
}
], (err) => {
expect(err).to.not.exist()
Expand Down
18 changes: 14 additions & 4 deletions test/utils/test-dht.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,24 @@ class TestDHT {
this.nodes = []
}

spawn (n, callback) {
times(n, (i, cb) => this._spawnOne(cb), (err, dhts) => {
spawn (n, options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
}

times(n, (i, cb) => this._spawnOne(options, cb), (err, dhts) => {
if (err) { return callback(err) }
callback(null, dhts)
})
}

_spawnOne (callback) {
_spawnOne (options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
}

createPeerInfo(1, (err, peers) => {
if (err) { return callback(err) }

Expand All @@ -37,7 +47,7 @@ class TestDHT {
sw.connection.addStreamMuxer(Mplex)
sw.connection.reuse()

const dht = new KadDHT(sw)
const dht = new KadDHT(sw, options)

dht.validators.v = {
func (key, publicKey, callback) {
Expand Down

0 comments on commit abe9407

Please sign in to comment.