Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
fix: ensure queries stop after error or success (#93)
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc authored and vasco-santos committed Mar 28, 2019
1 parent 478ee88 commit 0e55b20
Show file tree
Hide file tree
Showing 5 changed files with 519 additions and 97 deletions.
22 changes: 15 additions & 7 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ class KadDHT extends EventEmitter {

// enough is enough
if (pathVals.length >= pathSize) {
res.success = true
res.pathComplete = true
}

cb(null, res)
Expand All @@ -368,7 +368,12 @@ class KadDHT extends EventEmitter {
})

// run our query
timeout((cb) => query.run(rtp, cb), options.timeout)(cb)
timeout((_cb) => {
query.run(rtp, _cb)
}, options.timeout)((err, res) => {
query.stop()
cb(err, res)
})
}
], (err) => {
// combine vals from each path
Expand Down Expand Up @@ -419,7 +424,7 @@ class KadDHT extends EventEmitter {
(closer, cb) => {
cb(null, {
closerPeers: closer,
success: options.shallow ? true : undefined
pathComplete: options.shallow ? true : undefined
})
}
], callback)
Expand Down Expand Up @@ -636,7 +641,7 @@ class KadDHT extends EventEmitter {
if (match) {
return cb(null, {
peer: match,
success: true
queryComplete: true
})
}

Expand All @@ -648,9 +653,12 @@ class KadDHT extends EventEmitter {
}
})

timeout((cb) => {
query.run(peers, cb)
}, options.timeout)(cb)
timeout((_cb) => {
query.run(peers, _cb)
}, options.timeout)((err, res) => {
query.stop()
cb(err, res)
})
},
(result, cb) => {
let success = false
Expand Down
4 changes: 3 additions & 1 deletion src/private.js
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ module.exports = (dht) => ({

// hooray we have all that we want
if (pathProviders.length >= pathSize) {
return cb(null, { success: true })
return cb(null, { pathComplete: true })
}

// it looks like we want some more
Expand All @@ -525,6 +525,8 @@ module.exports = (dht) => ({
const peers = dht.routingTable.closestPeers(key.buffer, c.ALPHA)

timeout((cb) => query.run(peers, cb), providerTimeout)((err) => {
query.stop()

// combine peers from each path
paths.forEach((path) => {
path.toArray().forEach((peer) => {
Expand Down
237 changes: 155 additions & 82 deletions src/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,25 @@ class Query {
}
})

// Create a manager to keep track of the worker queue for each path
this.workerManager = new WorkerManager()
each(run.paths, (path, cb) => {
waterfall([
(cb) => PeerQueue.fromKey(this.key, cb),
(q, cb) => {
path.peersToQuery = q
each(path.peers, (p, cb) => addPeerToQuery(p, this.dht, path, cb), cb)
},
(cb) => workerQueue(this, path, cb)
(cb) => {
this.workerManager.workerQueue(this, path, cb)
}
], cb)
}, (err, results) => {
this._log('query:done')

// Ensure worker queues for all paths are stopped at the end of the query
this.workerManager.stop()

if (err) {
return callback(err)
}
Expand All @@ -110,109 +118,174 @@ class Query {
}

run.paths.forEach((path) => {
if (path.res && path.res.success) {
if (path.res && (path.res.pathComplete || path.res.queryComplete)) {
path.res.success = true
run.res.paths.push(path.res)
}
})

callback(null, run.res)
})
}

/**
* Stop the query
*/
stop () {
this.workerManager && this.workerManager.stop()
}
}

/**
* Use the queue from async to keep `concurrency` amount items running
* per path.
*
* @param {Query} query
* @param {Object} path
* @param {function(Error)} callback
* @returns {void}
* @private
* Manages the worker queues for each path through the DHT
*/
function workerQueue (query, path, callback) {
let killed = false
const q = queue((next, cb) => {
query._log('queue:work')
execQuery(next, query, path, (err, done) => {
// Ignore after kill
if (killed) {
return cb()
}
query._log('queue:work:done', err, done)
if (err) {
return cb(err)
}
if (done) {
q.kill()
killed = true
return callback()
}
cb()
})
}, query.concurrency)
class WorkerManager {
/**
* Creates a new WorkerManager
*/
constructor () {
this.running = true
this.workers = []
}

const fill = () => {
query._log('queue:fill')
while (q.length() < query.concurrency &&
path.peersToQuery.length > 0) {
q.push(path.peersToQuery.dequeue())
/**
* Stop all the workers
*/
stop () {
this.running = false
for (const worker of this.workers) {
worker.stop()
}
}

fill()
/**
* Use the queue from async to keep `concurrency` amount items running
* per path.
*
* @param {Query} query
* @param {Object} path
* @param {function(Error)} callback
*/
workerQueue (query, path, callback) {
let workerRunning = true
const q = queue((next, cb) => {
query._log('queue:work')
this.execQuery(next, query, path, (err, state) => {
// Ignore response after worker killed
if (!workerRunning || !this.running) {
return cb()
}

// callback handling
q.error = (err) => {
query._log.error('queue', err)
callback(err)
}
query._log('queue:work:done', err, state)
if (err) {
return cb(err)
}

q.drain = () => {
query._log('queue:drain')
callback()
}
// If query is complete, stop all workers.
// Note: this.stop() calls stop() on all the workers, which kills the
// queue and calls callback(), so we don't need to call cb() here
if (state && state.queryComplete) {
query._log('query:complete')
return this.stop()
}

q.unsaturated = () => {
query._log('queue:unsatured')
fill()
}
// If path is complete, just stop this worker.
// Note: worker.stop() kills the queue and calls callback() so we don't
// need to call cb() here
if (state && state.pathComplete) {
return worker.stop()
}

q.buffer = 0
}
// Otherwise, process next peer
cb()
})
}, query.concurrency)

/**
* Execute a query on the `next` peer.
*
* @param {PeerId} next
* @param {Query} query
* @param {Object} path
* @param {function(Error)} callback
* @returns {void}
* @private
*/
function execQuery (next, query, path, callback) {
path.query(next, (err, res) => {
if (err) {
path.run.errors.push(err)
callback()
} else if (res.success) {
path.res = res
callback(null, true)
} else if (res.closerPeers && res.closerPeers.length > 0) {
each(res.closerPeers, (closer, cb) => {
// don't add ourselves
if (query.dht._isSelf(closer.id)) {
return cb()
// Keeps track of a running worker
const worker = {
stop: (err) => {
if (workerRunning) {
q.kill()
workerRunning = false
callback(err)
}
closer = query.dht.peerBook.put(closer)
query.dht._peerDiscovered(closer)
addPeerToQuery(closer.id, query.dht, path, cb)
}, callback)
} else {
callback()
}
}
this.workers.push(worker)

// Add peers to the queue until there are enough to satisfy the concurrency
const fill = () => {
query._log('queue:fill')
while (q.length() < query.concurrency &&
path.peersToQuery.length > 0) {
q.push(path.peersToQuery.dequeue())
}
}

fill()

// If there's an error, stop the worker
q.error = (err) => {
query._log.error('queue', err)
worker.stop(err)
}
})

// When all peers in the queue have been processed, stop the worker
q.drain = () => {
query._log('queue:drain')
worker.stop()
}

// When a space opens up in the queue, add some more peers
q.unsaturated = () => {
query._log('queue:unsaturated')
fill()
}

q.buffer = 0
}

/**
* Execute a query on the `next` peer.
*
* @param {PeerId} next
* @param {Query} query
* @param {Object} path
* @param {function(Error)} callback
* @returns {void}
* @private
*/
execQuery (next, query, path, callback) {
path.query(next, (err, res) => {
// If the run has completed, bail out
if (!this.running) {
return callback()
}

if (err) {
path.run.errors.push(err)
callback()
} else if (res.pathComplete || res.queryComplete) {
path.res = res
callback(null, {
pathComplete: res.pathComplete,
queryComplete: res.queryComplete
})
} else if (res.closerPeers && res.closerPeers.length > 0) {
each(res.closerPeers, (closer, cb) => {
// don't add ourselves
if (query.dht._isSelf(closer.id)) {
return cb()
}
closer = query.dht.peerBook.put(closer)
query.dht._peerDiscovered(closer)
addPeerToQuery(closer.id, query.dht, path, cb)
}, callback)
} else {
callback()
}
})
}
}

/**
Expand Down
Loading

0 comments on commit 0e55b20

Please sign in to comment.