Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

feat: add delay support to random walk #101

Merged
merged 3 commits into from
Apr 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,6 @@ exports.defaultRandomWalk = {
enabled: true,
queriesPerPeriod: 1,
interval: 5 * minute,
timeout: 10 * second
timeout: 10 * second,
delay: 10 * second
}
17 changes: 4 additions & 13 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ const Message = require('./message')
const RandomWalk = require('./random-walk')
const QueryManager = require('./query-manager')
const assert = require('assert')
const mergeOptions = require('merge-options')

/**
* A DHT implementation modeled after Kademlia with S/Kademlia modifications.
Expand All @@ -40,6 +39,7 @@ class KadDHT extends EventEmitter {
* @property {number} queriesPerPeriod how many queries to run per period (default: 1)
* @property {number} interval how often to run the the random-walk process, in milliseconds (default: 300000)
* @property {number} timeout how long to wait for the the random-walk query to run, in milliseconds (default: 10000)
* @property {number} delay how long to wait before starting the first random walk, in milliseconds (default: 10000)
*/

/**
Expand All @@ -59,7 +59,6 @@ class KadDHT extends EventEmitter {
options = options || {}
options.validators = options.validators || {}
options.selectors = options.selectors || {}
options.randomWalk = mergeOptions(c.defaultRandomWalk, options.randomWalk)

/**
* Local reference to the libp2p-switch instance
Expand Down Expand Up @@ -126,15 +125,7 @@ class KadDHT extends EventEmitter {
*
* @type {RandomWalk}
*/
this.randomWalk = new RandomWalk(this)

/**
* Random walk state, default true
*/
this.randomWalkEnabled = Boolean(options.randomWalk.enabled)
this.randomWalkQueriesPerPeriod = parseInt(options.randomWalk.queriesPerPeriod)
this.randomWalkInterval = parseInt(options.randomWalk.interval)
this.randomWalkTimeout = parseInt(options.randomWalk.timeout)
this.randomWalk = new RandomWalk(this, options.randomWalk)

/**
* Keeps track of running queries
Expand Down Expand Up @@ -167,8 +158,8 @@ class KadDHT extends EventEmitter {
return callback(err)
}

// Start random walk if enabled
this.randomWalkEnabled && this.randomWalk.start(this.randomWalkQueriesPerPeriod, this.randomWalkInterval, this.randomWalkTimeout)
// Start random walk, it will not run if it's disabled
this.randomWalk.start()
callback()
})
}
Expand Down
56 changes: 36 additions & 20 deletions src/random-walk.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,45 +8,56 @@ const multihashing = require('multihashing-async')
const PeerId = require('peer-id')
const assert = require('assert')
const c = require('./constants')
const { logger } = require('./utils')

const errcode = require('err-code')

class RandomWalk {
constructor (kadDHT) {
assert(kadDHT, 'Random Walk needs an instance of the Kademlia DHT')
/**
* @constructor
* @param {DHT} dht
* @param {object} options
* @param {randomWalkOptions.enabled} options.enabled
* @param {randomWalkOptions.queriesPerPeriod} options.queriesPerPeriod
* @param {randomWalkOptions.interval} options.interval
* @param {randomWalkOptions.timeout} options.timeout
* @param {randomWalkOptions.delay} options.delay
* @param {DHT} options.dht
*/
constructor (dht, options) {
this._options = { ...c.defaultRandomWalk, ...options }
assert(dht, 'Random Walk needs an instance of the Kademlia DHT')
this._runningHandle = null
this._kadDHT = kadDHT
this._kadDHT = dht
this.log = logger(dht.peerInfo.id, 'random-walk')
}

/**
* Start the Random Walk process. This means running a number of queries
* every interval requesting random data. This is done to keep the dht
* healthy over time.
*
* @param {number} [queries=1] - how many queries to run per period
* @param {number} [period=300000] - how often to run the the random-walk process, in milliseconds (5min)
* @param {number} [timeout=10000] - how long to wait for the the random-walk query to run, in milliseconds (10s)
* @returns {void}
*/
start (queries = c.defaultRandomWalk.queriesPerPeriod, period = c.defaultRandomWalk.interval, timeout = c.defaultRandomWalk.timeout) {
start () {
// Don't run twice
if (this._running) { return }
if (this._running || !this._options.enabled) { return }

// Create running handle
const runningHandle = {
_onCancel: null,
_timeoutId: null,
runPeriodically: (fn, period) => {
runPeriodically: (walk, period) => {
runningHandle._timeoutId = setTimeout(() => {
runningHandle._timeoutId = null

fn((nextPeriod) => {
walk((nextPeriod) => {
// Was walk cancelled while fn was being called?
if (runningHandle._onCancel) {
return runningHandle._onCancel()
}
// Schedule next
runningHandle.runPeriodically(fn, nextPeriod)
runningHandle.runPeriodically(walk, nextPeriod)
})
}, period)
},
Expand All @@ -61,10 +72,15 @@ class RandomWalk {
}
}

// Start runner
runningHandle.runPeriodically((done) => {
this._walk(queries, timeout, () => done(period))
}, period)
// Start doing random walks after `this._options.delay`
runningHandle._timeoutId = setTimeout(() => {
// Start runner immediately
runningHandle.runPeriodically((done) => {
// Each subsequent walk should run on a `this._options.interval` interval
this._walk(this._options.queriesPerPeriod, this._options.timeout, () => done(this._options.interval))
}, 0)
}, this._options.delay)

this._runningHandle = runningHandle
}

Expand Down Expand Up @@ -96,7 +112,7 @@ class RandomWalk {
* @private
*/
_walk (queries, walkTimeout, callback) {
this._kadDHT._log('random-walk:start')
this.log('start')

times(queries, (i, cb) => {
waterfall([
Expand All @@ -106,11 +122,11 @@ class RandomWalk {
}, walkTimeout)(cb)
], (err) => {
if (err) {
this._kadDHT._log.error('random-walk:error', err)
this.log.error('query finished with error', err)
return callback(err)
}

this._kadDHT._log('random-walk:done')
this.log('done')
callback(null)
})
})
Expand All @@ -126,7 +142,7 @@ class RandomWalk {
* @private
*/
_query (id, callback) {
this._kadDHT._log('random-walk:query:%s', id.toB58String())
this.log('query:%s', id.toB58String())

this._kadDHT.findPeer(id, (err, peer) => {
if (err.code === 'ERR_NOT_FOUND') {
Expand All @@ -136,7 +152,7 @@ class RandomWalk {
if (err) {
return callback(err)
}
this._kadDHT._log('random-walk:query:found', err, peer)
this.log('query:found', peer)

// wait what, there was something found? Lucky day!
callback(errcode(new Error(`random-walk: ACTUALLY FOUND PEER: ${peer}, ${id.toB58String()}`), 'ERR_FOUND_RANDOM_PEER'))
Expand Down
25 changes: 18 additions & 7 deletions test/kad-dht.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ function connect (a, b, callback) {

function bootstrap (dhts) {
dhts.forEach((dht) => {
// dht.randomWalk._walk(3, 10000, () => {}) // don't need to know when it finishes
dht.randomWalk.start(1, 1000) // don't need to know when it finishes
dht.randomWalk._walk(1, 1000, () => {})
})
}

Expand Down Expand Up @@ -212,7 +211,7 @@ describe('KadDHT', () => {
(cb) => dht.start(cb),
(cb) => {
expect(dht.network.start.calledOnce).to.equal(true)
expect(dht.randomWalk.start.calledOnce).to.equal(false)
expect(dht.randomWalk._runningHandle).to.not.exist()

cb()
},
Expand All @@ -231,7 +230,11 @@ describe('KadDHT', () => {
sw.transport.add('tcp', new TCP())
sw.connection.addStreamMuxer(Mplex)
sw.connection.reuse()
const dht = new KadDHT(sw, { enabledDiscovery: false })
const dht = new KadDHT(sw, {
randomWalk: {
enabled: false
}
})

series([
(cb) => dht.start(cb),
Expand All @@ -247,7 +250,11 @@ describe('KadDHT', () => {
sw.transport.add('tcp', new TCP())
sw.connection.addStreamMuxer(Mplex)
sw.connection.reuse()
const dht = new KadDHT(sw, { enabledDiscovery: false })
const dht = new KadDHT(sw, {
randomWalk: {
enabled: false
}
})

series([
(cb) => dht.stop(cb)
Expand Down Expand Up @@ -567,13 +574,17 @@ describe('KadDHT', () => {
})

it('random-walk', function (done) {
this.timeout(40 * 1000)
this.timeout(10 * 1000)

const nDHTs = 20
const tdht = new TestDHT()

// random walk disabled for a manual usage
tdht.spawn(nDHTs, { enabledDiscovery: false }, (err, dhts) => {
tdht.spawn(nDHTs, {
randomWalk: {
enabled: false
}
}, (err, dhts) => {
expect(err).to.not.exist()

series([
Expand Down