Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
feat: add delay support to random walk (#101)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobheun authored and vasco-santos committed Apr 17, 2019
1 parent c1517a0 commit 7b70fa7
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 41 deletions.
3 changes: 2 additions & 1 deletion src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,6 @@ exports.defaultRandomWalk = {
enabled: true,
queriesPerPeriod: 1,
interval: 5 * minute,
timeout: 10 * second
timeout: 10 * second,
delay: 10 * second
}
17 changes: 4 additions & 13 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ const Message = require('./message')
const RandomWalk = require('./random-walk')
const QueryManager = require('./query-manager')
const assert = require('assert')
const mergeOptions = require('merge-options')

/**
* A DHT implementation modeled after Kademlia with S/Kademlia modifications.
Expand All @@ -40,6 +39,7 @@ class KadDHT extends EventEmitter {
* @property {number} queriesPerPeriod how many queries to run per period (default: 1)
* @property {number} interval how often to run the the random-walk process, in milliseconds (default: 300000)
* @property {number} timeout how long to wait for the the random-walk query to run, in milliseconds (default: 10000)
* @property {number} delay how long to wait before starting the first random walk, in milliseconds (default: 10000)
*/

/**
Expand All @@ -59,7 +59,6 @@ class KadDHT extends EventEmitter {
options = options || {}
options.validators = options.validators || {}
options.selectors = options.selectors || {}
options.randomWalk = mergeOptions(c.defaultRandomWalk, options.randomWalk)

/**
* Local reference to the libp2p-switch instance
Expand Down Expand Up @@ -126,15 +125,7 @@ class KadDHT extends EventEmitter {
*
* @type {RandomWalk}
*/
this.randomWalk = new RandomWalk(this)

/**
* Random walk state, default true
*/
this.randomWalkEnabled = Boolean(options.randomWalk.enabled)
this.randomWalkQueriesPerPeriod = parseInt(options.randomWalk.queriesPerPeriod)
this.randomWalkInterval = parseInt(options.randomWalk.interval)
this.randomWalkTimeout = parseInt(options.randomWalk.timeout)
this.randomWalk = new RandomWalk(this, options.randomWalk)

/**
* Keeps track of running queries
Expand Down Expand Up @@ -167,8 +158,8 @@ class KadDHT extends EventEmitter {
return callback(err)
}

// Start random walk if enabled
this.randomWalkEnabled && this.randomWalk.start(this.randomWalkQueriesPerPeriod, this.randomWalkInterval, this.randomWalkTimeout)
// Start random walk, it will not run if it's disabled
this.randomWalk.start()
callback()
})
}
Expand Down
56 changes: 36 additions & 20 deletions src/random-walk.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,45 +8,56 @@ const multihashing = require('multihashing-async')
const PeerId = require('peer-id')
const assert = require('assert')
const c = require('./constants')
const { logger } = require('./utils')

const errcode = require('err-code')

class RandomWalk {
constructor (kadDHT) {
assert(kadDHT, 'Random Walk needs an instance of the Kademlia DHT')
/**
* @constructor
* @param {DHT} dht
* @param {object} options
* @param {randomWalkOptions.enabled} options.enabled
* @param {randomWalkOptions.queriesPerPeriod} options.queriesPerPeriod
* @param {randomWalkOptions.interval} options.interval
* @param {randomWalkOptions.timeout} options.timeout
* @param {randomWalkOptions.delay} options.delay
* @param {DHT} options.dht
*/
constructor (dht, options) {
this._options = { ...c.defaultRandomWalk, ...options }
assert(dht, 'Random Walk needs an instance of the Kademlia DHT')
this._runningHandle = null
this._kadDHT = kadDHT
this._kadDHT = dht
this.log = logger(dht.peerInfo.id, 'random-walk')
}

/**
* Start the Random Walk process. This means running a number of queries
* every interval requesting random data. This is done to keep the dht
* healthy over time.
*
* @param {number} [queries=1] - how many queries to run per period
* @param {number} [period=300000] - how often to run the the random-walk process, in milliseconds (5min)
* @param {number} [timeout=10000] - how long to wait for the the random-walk query to run, in milliseconds (10s)
* @returns {void}
*/
start (queries = c.defaultRandomWalk.queriesPerPeriod, period = c.defaultRandomWalk.interval, timeout = c.defaultRandomWalk.timeout) {
start () {
// Don't run twice
if (this._running) { return }
if (this._running || !this._options.enabled) { return }

// Create running handle
const runningHandle = {
_onCancel: null,
_timeoutId: null,
runPeriodically: (fn, period) => {
runPeriodically: (walk, period) => {
runningHandle._timeoutId = setTimeout(() => {
runningHandle._timeoutId = null

fn((nextPeriod) => {
walk((nextPeriod) => {
// Was walk cancelled while fn was being called?
if (runningHandle._onCancel) {
return runningHandle._onCancel()
}
// Schedule next
runningHandle.runPeriodically(fn, nextPeriod)
runningHandle.runPeriodically(walk, nextPeriod)
})
}, period)
},
Expand All @@ -61,10 +72,15 @@ class RandomWalk {
}
}

// Start runner
runningHandle.runPeriodically((done) => {
this._walk(queries, timeout, () => done(period))
}, period)
// Start doing random walks after `this._options.delay`
runningHandle._timeoutId = setTimeout(() => {
// Start runner immediately
runningHandle.runPeriodically((done) => {
// Each subsequent walk should run on a `this._options.interval` interval
this._walk(this._options.queriesPerPeriod, this._options.timeout, () => done(this._options.interval))
}, 0)
}, this._options.delay)

this._runningHandle = runningHandle
}

Expand Down Expand Up @@ -96,7 +112,7 @@ class RandomWalk {
* @private
*/
_walk (queries, walkTimeout, callback) {
this._kadDHT._log('random-walk:start')
this.log('start')

times(queries, (i, cb) => {
waterfall([
Expand All @@ -106,11 +122,11 @@ class RandomWalk {
}, walkTimeout)(cb)
], (err) => {
if (err) {
this._kadDHT._log.error('random-walk:error', err)
this.log.error('query finished with error', err)
return callback(err)
}

this._kadDHT._log('random-walk:done')
this.log('done')
callback(null)
})
})
Expand All @@ -126,7 +142,7 @@ class RandomWalk {
* @private
*/
_query (id, callback) {
this._kadDHT._log('random-walk:query:%s', id.toB58String())
this.log('query:%s', id.toB58String())

this._kadDHT.findPeer(id, (err, peer) => {
if (err.code === 'ERR_NOT_FOUND') {
Expand All @@ -136,7 +152,7 @@ class RandomWalk {
if (err) {
return callback(err)
}
this._kadDHT._log('random-walk:query:found', err, peer)
this.log('query:found', peer)

// wait what, there was something found? Lucky day!
callback(errcode(new Error(`random-walk: ACTUALLY FOUND PEER: ${peer}, ${id.toB58String()}`), 'ERR_FOUND_RANDOM_PEER'))
Expand Down
25 changes: 18 additions & 7 deletions test/kad-dht.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ function connect (a, b, callback) {

function bootstrap (dhts) {
dhts.forEach((dht) => {
// dht.randomWalk._walk(3, 10000, () => {}) // don't need to know when it finishes
dht.randomWalk.start(1, 1000) // don't need to know when it finishes
dht.randomWalk._walk(1, 1000, () => {})
})
}

Expand Down Expand Up @@ -212,7 +211,7 @@ describe('KadDHT', () => {
(cb) => dht.start(cb),
(cb) => {
expect(dht.network.start.calledOnce).to.equal(true)
expect(dht.randomWalk.start.calledOnce).to.equal(false)
expect(dht.randomWalk._runningHandle).to.not.exist()

cb()
},
Expand All @@ -231,7 +230,11 @@ describe('KadDHT', () => {
sw.transport.add('tcp', new TCP())
sw.connection.addStreamMuxer(Mplex)
sw.connection.reuse()
const dht = new KadDHT(sw, { enabledDiscovery: false })
const dht = new KadDHT(sw, {
randomWalk: {
enabled: false
}
})

series([
(cb) => dht.start(cb),
Expand All @@ -247,7 +250,11 @@ describe('KadDHT', () => {
sw.transport.add('tcp', new TCP())
sw.connection.addStreamMuxer(Mplex)
sw.connection.reuse()
const dht = new KadDHT(sw, { enabledDiscovery: false })
const dht = new KadDHT(sw, {
randomWalk: {
enabled: false
}
})

series([
(cb) => dht.stop(cb)
Expand Down Expand Up @@ -567,13 +574,17 @@ describe('KadDHT', () => {
})

it('random-walk', function (done) {
this.timeout(40 * 1000)
this.timeout(10 * 1000)

const nDHTs = 20
const tdht = new TestDHT()

// random walk disabled for a manual usage
tdht.spawn(nDHTs, { enabledDiscovery: false }, (err, dhts) => {
tdht.spawn(nDHTs, {
randomWalk: {
enabled: false
}
}, (err, dhts) => {
expect(err).to.not.exist()

series([
Expand Down

0 comments on commit 7b70fa7

Please sign in to comment.