Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

feat: start random walk and allow configuration for disabling #42

Merged
merged 4 commits into from
Oct 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,14 @@
"datastore-level": "~0.8.0",
"dirty-chai": "^2.0.1",
"interface-connection": "~0.3.2",
"libp2p-mplex": "~0.8.0",
"libp2p-mplex": "~0.8.1",
"libp2p-switch": "~0.40.5",
"libp2p-tcp": "~0.12.0",
"lodash": "^4.17.10",
"lodash.random": "^3.2.0",
"lodash.range": "^3.2.0",
"peer-book": "~0.8.0"
"peer-book": "~0.8.0",
"sinon": "^6.3.4"
},
"contributors": [
"Blake Byrnes <blakebyrnes@gmail.com>",
Expand Down
29 changes: 23 additions & 6 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ class KadDHT {
/**
* Create a new KadDHT.
*
* @param {Switch} sw
* @param {object} options // {kBucketSize=20, datastore=MemoryDatastore}
* @param {Switch} sw libp2p-switch instance
* @param {object} options DHT options
* @param {number} options.kBucketSize k-bucket size (default 20)
* @param {Datastore} options.datastore datastore (default MemoryDatastore)
* @param {boolean} options.enabledDiscovery enable dht discovery (default true)
*/
constructor (sw, options) {
assert(sw, 'libp2p-kad-dht requires a instance of Switch')
Expand Down Expand Up @@ -96,6 +99,11 @@ class KadDHT {
* @type {RandomWalk}
*/
this.randomWalk = new RandomWalk(this)

/**
* Random walk state, default true
*/
this.randomWalkEnabled = !options.hasOwnProperty('enabledDiscovery') ? true : Boolean(options.enabledDiscovery)
}

/**
Expand All @@ -115,7 +123,15 @@ class KadDHT {
*/
start (callback) {
this._running = true
this.network.start(callback)
this.network.start((err) => {
if (err) {
return callback(err)
}

// Start random walk if enabled
this.randomWalkEnabled && this.randomWalk.start()
callback()
})
}

/**
Expand All @@ -127,9 +143,10 @@ class KadDHT {
*/
stop (callback) {
this._running = false
this.randomWalk.stop()
this.providers.stop()
this.network.stop(callback)
this.randomWalk.stop(() => { // guarantee that random walk is stopped if it was started
this.providers.stop()
this.network.stop(callback)
})
}

/**
Expand Down
61 changes: 51 additions & 10 deletions src/random-walk.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const c = require('./constants')
class RandomWalk {
constructor (kadDHT) {
assert(kadDHT, 'Random Walk needs an instance of the Kademlia DHT')
this._running = false
this._runningHandle = null
this._kadDHT = kadDHT
}

Expand All @@ -34,33 +34,70 @@ class RandomWalk {
// Don't run twice
if (this._running) { return }

this._running = setInterval(
() => this._walk(queries, maxTimeout),
period
)
// Create running handle
const runningHandle = {
_onCancel: null,
_timeoutId: null,
runPeriodically: (fn, period) => {
runningHandle._timeoutId = setTimeout(() => {
runningHandle._timeoutId = null

fn((nextPeriod) => {
// Was walk cancelled while fn was being called?
if (runningHandle._onCancel) {
return runningHandle._onCancel()
}
// Schedule next
runningHandle.runPeriodically(fn, nextPeriod)
})
}, period)
},
cancel: (cb) => {
// Not currently running, can callback immediately
if (runningHandle._timeoutId) {
clearTimeout(runningHandle._timeoutId)
return cb()
}
// Wait to finish and then call callback
runningHandle._onCancel = cb
}
}

// Start runner
runningHandle.runPeriodically((done) => {
this._walk(queries, maxTimeout, () => done(period))
}, period)
this._runningHandle = runningHandle
}

/**
* Stop the random-walk process.
* @param {function(Error)} callback
*
* @returns {void}
*/
stop () {
if (this._running) {
clearInterval(this._running)
stop (callback) {
const runningHandle = this._runningHandle

if (!runningHandle) {
return callback()
}

this._runningHandle = null
runningHandle.cancel(callback)
}

/**
* Do the random walk work.
*
* @param {number} queries
* @param {number} maxTimeout
* @param {function(Error)} callback
* @returns {void}
*
* @private
*/
_walk (queries, maxTimeout) {
_walk (queries, maxTimeout, callback) {
this._kadDHT._log('random-walk:start')

times(queries, (i, cb) => {
Expand All @@ -70,9 +107,13 @@ class RandomWalk {
this._query(id, cb)
}, maxTimeout)(cb)
], (err) => {
if (err) { return this._kadDHT._log.error('random-walk:error', err) }
if (err) {
this._kadDHT._log.error('random-walk:error', err)
return callback(err)
}

this._kadDHT._log('random-walk:done')
callback(null)
})
})
}
Expand Down
69 changes: 66 additions & 3 deletions test/kad-dht.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
const chai = require('chai')
chai.use(require('dirty-chai'))
const expect = chai.expect
const sinon = require('sinon')
const series = require('async/series')
const times = require('async/times')
const parallel = require('async/parallel')
Expand Down Expand Up @@ -69,7 +70,7 @@ function connect (a, b, callback) {

function bootstrap (dhts) {
dhts.forEach((dht) => {
dht.randomWalk._walk(3, 10000)
dht.randomWalk._walk(3, 10000, () => {}) // don't need to know when it finishes
})
}

Expand Down Expand Up @@ -135,6 +136,68 @@ describe('KadDHT', () => {
expect(dht).to.have.property('routingTable')
})

it('should be able to start and stop', function (done) {
const sw = new Switch(peerInfos[0], new PeerBook())
sw.transport.add('tcp', new TCP())
sw.connection.addStreamMuxer(Mplex)
sw.connection.reuse()
const dht = new KadDHT(sw)

sinon.spy(dht.network, 'start')
sinon.spy(dht.randomWalk, 'start')

sinon.spy(dht.network, 'stop')
sinon.spy(dht.randomWalk, 'stop')

series([
(cb) => dht.start(cb),
(cb) => {
expect(dht.network.start.calledOnce).to.equal(true)
expect(dht.randomWalk.start.calledOnce).to.equal(true)

cb()
},
(cb) => dht.stop(cb)
], (err) => {
expect(err).to.not.exist()
expect(dht.network.stop.calledOnce).to.equal(true)
expect(dht.randomWalk.stop.calledOnce).to.equal(true)

done()
})
})

it('should be able to start with random-walk disabled', function (done) {
const sw = new Switch(peerInfos[0], new PeerBook())
sw.transport.add('tcp', new TCP())
sw.connection.addStreamMuxer(Mplex)
sw.connection.reuse()
const dht = new KadDHT(sw, { enabledDiscovery: false })

sinon.spy(dht.network, 'start')
sinon.spy(dht.randomWalk, 'start')

sinon.spy(dht.network, 'stop')
sinon.spy(dht.randomWalk, 'stop')

series([
(cb) => dht.start(cb),
(cb) => {
expect(dht.network.start.calledOnce).to.equal(true)
expect(dht.randomWalk.start.calledOnce).to.equal(false)

cb()
},
(cb) => dht.stop(cb)
], (err) => {
expect(err).to.not.exist()
expect(dht.network.stop.calledOnce).to.equal(true)
expect(dht.randomWalk.stop.calledOnce).to.equal(true) // Should be always disabled, as it can be started using the instance

done()
})
})

it('put - get', function (done) {
this.timeout(10 * 1000)
const tdht = new TestDHT()
Expand Down Expand Up @@ -206,7 +269,8 @@ describe('KadDHT', () => {
const nDHTs = 20
const tdht = new TestDHT()

tdht.spawn(nDHTs, (err, dhts) => {
// random walk disabled for a manual usage
tdht.spawn(nDHTs, { enabledDiscovery: false }, (err, dhts) => {
expect(err).to.not.exist()

series([
Expand All @@ -217,7 +281,6 @@ describe('KadDHT', () => {
(cb) => {
bootstrap(dhts)
waitForWellFormedTables(dhts, 7, 0, 20 * 1000, cb)
cb()
}
], (err) => {
expect(err).to.not.exist()
Expand Down
18 changes: 14 additions & 4 deletions test/utils/test-dht.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,24 @@ class TestDHT {
this.nodes = []
}

spawn (n, callback) {
times(n, (i, cb) => this._spawnOne(cb), (err, dhts) => {
spawn (n, options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
}

times(n, (i, cb) => this._spawnOne(options, cb), (err, dhts) => {
if (err) { return callback(err) }
callback(null, dhts)
})
}

_spawnOne (callback) {
_spawnOne (options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
}

createPeerInfo(1, (err, peers) => {
if (err) { return callback(err) }

Expand All @@ -37,7 +47,7 @@ class TestDHT {
sw.connection.addStreamMuxer(Mplex)
sw.connection.reuse()

const dht = new KadDHT(sw)
const dht = new KadDHT(sw, options)

dht.validators.v = {
func (key, publicKey, callback) {
Expand Down