Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
fix: stop running queries on shutdown (#95)
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc authored and vasco-santos committed Apr 4, 2019
1 parent 440b391 commit e137297
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 32 deletions.
12 changes: 11 additions & 1 deletion src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const privateApi = require('./private')
const Providers = require('./providers')
const Message = require('./message')
const RandomWalk = require('./random-walk')
const QueryManager = require('./query-manager')
const assert = require('assert')
const mergeOptions = require('merge-options')

Expand Down Expand Up @@ -121,7 +122,7 @@ class KadDHT extends EventEmitter {
Object.keys(pa).forEach((name) => { this[name] = pa[name] })

/**
* Provider management
* Random walk management
*
* @type {RandomWalk}
*/
Expand All @@ -134,6 +135,13 @@ class KadDHT extends EventEmitter {
this.randomWalkQueriesPerPeriod = parseInt(options.randomWalk.queriesPerPeriod)
this.randomWalkInterval = parseInt(options.randomWalk.interval)
this.randomWalkTimeout = parseInt(options.randomWalk.timeout)

/**
* Keeps track of running queries
*
* @type {QueryManager}
*/
this._queryManager = new QueryManager()
}

/**
Expand All @@ -153,6 +161,7 @@ class KadDHT extends EventEmitter {
*/
start (callback) {
this._running = true
this._queryManager.start()
this.network.start((err) => {
if (err) {
return callback(err)
Expand All @@ -177,6 +186,7 @@ class KadDHT extends EventEmitter {
this.providers.stop()
this.network.stop(callback)
})
this._queryManager.stop()
}

/**
Expand Down
52 changes: 52 additions & 0 deletions src/query-manager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
'use strict'

/**
* Keeps track of all running queries.
*/
class QueryManager {
/**
* Creates a new QueryManager.
*/
constructor () {
this.queries = new Set()
this.running = false
}

/**
* Called when a query is started.
*
* @param {Query} query
*/
queryStarted (query) {
this.queries.add(query)
}

/**
* Called when a query completes.
*
* @param {Query} query
*/
queryCompleted (query) {
this.queries.delete(query)
}

/**
* Starts the query manager.
*/
start () {
this.running = true
}

/**
* Stops all queries.
*/
stop () {
this.running = false
for (const query of this.queries) {
query.stop()
}
this.queries.clear()
}
}

module.exports = QueryManager
18 changes: 13 additions & 5 deletions src/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,21 @@ class Query {
* @returns {void}
*/
run (peers, callback) {
if (!this.dht._queryManager.running) {
this._log.error('Attempt to run query after shutdown')
return callback(null, { finalSet: new Set(), paths: [] })
}
if (peers.length === 0) {
this._log.error('Running query with no peers')
return callback(null, { finalSet: new Set(), paths: [] })
}

const run = {
peersSeen: new Set(),
errors: [],
paths: null // array of states per disjoint path
}

if (peers.length === 0) {
this._log.error('Running query with no peers')
return callback()
}

// create correct number of paths
const numPaths = Math.min(c.DISJOINT_PATHS, peers.length)
const pathPeers = []
Expand All @@ -85,6 +89,9 @@ class Query {
}
})

// Register this query so we stop it if the DHT stops
this.dht._queryManager.queryStarted(this)

// Create a manager to keep track of the worker queue for each path
this.workerManager = new WorkerManager()
each(run.paths, (path, cb) => {
Expand Down Expand Up @@ -133,6 +140,7 @@ class Query {
*/
stop () {
this.workerManager && this.workerManager.stop()
this.dht._queryManager.queryCompleted(this)
}
}

Expand Down
41 changes: 21 additions & 20 deletions test/kad-dht.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -953,29 +953,30 @@ describe('KadDHT', () => {
sw.connection.addStreamMuxer(Mplex)
sw.connection.reuse()
const dht = new KadDHT(sw)
dht.start(() => {
const key = Buffer.from('/v/hello')
const value = Buffer.from('world')
const rec = new Record(key, value)

const stubs = [
// Simulate returning a peer id to query
sinon.stub(dht.routingTable, 'closestPeers').returns([peerInfos[1].id]),
// Simulate going out to the network and returning the record
sinon.stub(dht, '_getValueOrPeers').callsFake((peer, k, cb) => {
cb(null, rec)
})
]

const key = Buffer.from('/v/hello')
const value = Buffer.from('world')
const rec = new Record(key, value)
dht.getMany(key, 1, (err, res) => {
expect(err).to.not.exist()
expect(res.length).to.eql(1)
expect(res[0].val).to.eql(value)

const stubs = [
// Simulate returning a peer id to query
sinon.stub(dht.routingTable, 'closestPeers').returns([peerInfos[1].id]),
// Simulate going out to the network and returning the record
sinon.stub(dht, '_getValueOrPeers').callsFake((peer, k, cb) => {
cb(null, rec)
for (const stub of stubs) {
stub.restore()
}
done()
})
]

dht.getMany(key, 1, (err, res) => {
expect(err).to.not.exist()
expect(res.length).to.eql(1)
expect(res[0].val).to.eql(value)

for (const stub of stubs) {
stub.restore()
}
done()
})
})
})
Expand Down
142 changes: 136 additions & 6 deletions test/query.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ const Query = require('../src/query')
const createPeerInfo = require('./utils/create-peer-info')
const createDisjointTracks = require('./utils/create-disjoint-tracks')

const createDHT = (peerInfos, cb) => {
const sw = new Switch(peerInfos[0], new PeerBook())
sw.transport.add('tcp', new TCP())
sw.connection.addStreamMuxer(Mplex)
sw.connection.reuse()
const d = new DHT(sw)
d.start(() => cb(null, d))
}

describe('Query', () => {
let peerInfos
let dht
Expand All @@ -28,13 +37,14 @@ describe('Query', () => {
}

peerInfos = result
const sw = new Switch(peerInfos[0], new PeerBook())
sw.transport.add('tcp', new TCP())
sw.connection.addStreamMuxer(Mplex)
sw.connection.reuse()
dht = new DHT(sw)
createDHT(peerInfos, (err, d) => {
if (err) {
return done(err)
}

done()
dht = d
done()
})
})
})

Expand Down Expand Up @@ -117,6 +127,23 @@ describe('Query', () => {
})
})

it('returns empty run if initial peer list is empty', (done) => {
const peer = peerInfos[0]

const query = (p, cb) => {}

const q = new Query(dht, peer.id.id, () => query)
q.run([], (err, res) => {
expect(err).to.not.exist()

// Should not visit any peers
expect(res.paths.length).to.eql(0)
expect(res.finalSet.size).to.eql(0)

done()
})
})

it('only closerPeers', (done) => {
const peer = peerInfos[0]

Expand Down Expand Up @@ -234,6 +261,109 @@ describe('Query', () => {
})
})

it('all queries stop after shutdown', (done) => {
createDHT(peerInfos, (err, dhtA) => {
if (err) {
return done(err)
}

const peer = peerInfos[0]

// mock this so we can dial non existing peers
dhtA.switch.dial = (peer, callback) => callback()

// 1 -> 2 -> 3 -> 4
const topology = {
[peerInfos[1].id.toB58String()]: {
closer: [peerInfos[2]]
},
[peerInfos[2].id.toB58String()]: {
closer: [peerInfos[3]]
},
// Should not reach here because query gets shut down
[peerInfos[3].id.toB58String()]: {
closer: [peerInfos[4]]
}
}

const visited = []
const query = (p, cb) => {
visited.push(p)

const invokeCb = () => {
const res = topology[p.toB58String()] || {}
cb(null, {
closerPeers: res.closer || []
})
}

// Shut down after visiting peerInfos[2]
if (p.toB58String() === peerInfos[2].id.toB58String()) {
dhtA.stop(invokeCb)
setTimeout(checkExpectations, 100)
} else {
invokeCb()
}
}

const q = new Query(dhtA, peer.id.id, () => query)
q.run([peerInfos[1].id], (err, res) => {
expect(err).to.not.exist()
})

function checkExpectations () {
// Should only visit peers up to the point where we shut down
expect(visited).to.eql([peerInfos[1].id, peerInfos[2].id])

done()
}
})
})

it('queries run after shutdown return immediately', (done) => {
createDHT(peerInfos, (err, dhtA) => {
if (err) {
return done(err)
}

const peer = peerInfos[0]

// mock this so we can dial non existing peers
dhtA.switch.dial = (peer, callback) => callback()

// 1 -> 2 -> 3
const topology = {
[peerInfos[1].id.toB58String()]: {
closer: [peerInfos[2]]
},
[peerInfos[2].id.toB58String()]: {
closer: [peerInfos[3]]
}
}

const query = (p, cb) => {
const res = topology[p.toB58String()] || {}
cb(null, {
closerPeers: res.closer || []
})
}

const q = new Query(dhtA, peer.id.id, () => query)

dhtA.stop(() => {
q.run([peerInfos[1].id], (err, res) => {
expect(err).to.not.exist()

// Should not visit any peers
expect(res.paths.length).to.eql(0)
expect(res.finalSet.size).to.eql(0)

done()
})
})
})
})

it('disjoint path values', (done) => {
const peer = peerInfos[0]
const values = ['v0', 'v1'].map(Buffer.from)
Expand Down

0 comments on commit e137297

Please sign in to comment.