Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Ensure queries stop after error or success #93

Merged
merged 1 commit into from
Mar 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ class KadDHT extends EventEmitter {

// enough is enough
if (pathVals.length >= pathSize) {
res.success = true
res.pathComplete = true
}

cb(null, res)
Expand All @@ -368,7 +368,12 @@ class KadDHT extends EventEmitter {
})

// run our query
timeout((cb) => query.run(rtp, cb), options.timeout)(cb)
timeout((_cb) => {
query.run(rtp, _cb)
}, options.timeout)((err, res) => {
query.stop()
cb(err, res)
})
}
], (err) => {
// combine vals from each path
Expand Down Expand Up @@ -419,7 +424,7 @@ class KadDHT extends EventEmitter {
(closer, cb) => {
cb(null, {
closerPeers: closer,
success: options.shallow ? true : undefined
pathComplete: options.shallow ? true : undefined
})
}
], callback)
Expand Down Expand Up @@ -636,7 +641,7 @@ class KadDHT extends EventEmitter {
if (match) {
return cb(null, {
peer: match,
success: true
queryComplete: true
})
}

Expand All @@ -648,9 +653,12 @@ class KadDHT extends EventEmitter {
}
})

timeout((cb) => {
query.run(peers, cb)
}, options.timeout)(cb)
timeout((_cb) => {
query.run(peers, _cb)
}, options.timeout)((err, res) => {
query.stop()
cb(err, res)
})
},
(result, cb) => {
let success = false
Expand Down
4 changes: 3 additions & 1 deletion src/private.js
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ module.exports = (dht) => ({

// hooray we have all that we want
if (pathProviders.length >= pathSize) {
return cb(null, { success: true })
return cb(null, { pathComplete: true })
}

// it looks like we want some more
Expand All @@ -525,6 +525,8 @@ module.exports = (dht) => ({
const peers = dht.routingTable.closestPeers(key.buffer, c.ALPHA)

timeout((cb) => query.run(peers, cb), providerTimeout)((err) => {
query.stop()

// combine peers from each path
paths.forEach((path) => {
path.toArray().forEach((peer) => {
Expand Down
237 changes: 155 additions & 82 deletions src/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,25 @@ class Query {
}
})

// Create a manager to keep track of the worker queue for each path
this.workerManager = new WorkerManager()
each(run.paths, (path, cb) => {
waterfall([
(cb) => PeerQueue.fromKey(this.key, cb),
(q, cb) => {
path.peersToQuery = q
each(path.peers, (p, cb) => addPeerToQuery(p, this.dht, path, cb), cb)
},
(cb) => workerQueue(this, path, cb)
(cb) => {
this.workerManager.workerQueue(this, path, cb)
}
], cb)
}, (err, results) => {
this._log('query:done')

// Ensure worker queues for all paths are stopped at the end of the query
this.workerManager.stop()

if (err) {
return callback(err)
}
Expand All @@ -110,109 +118,174 @@ class Query {
}

run.paths.forEach((path) => {
if (path.res && path.res.success) {
if (path.res && (path.res.pathComplete || path.res.queryComplete)) {
path.res.success = true
run.res.paths.push(path.res)
}
})

callback(null, run.res)
})
}

/**
* Stop the query
*/
stop () {
this.workerManager && this.workerManager.stop()
}
}

/**
* Use the queue from async to keep `concurrency` amount items running
* per path.
*
* @param {Query} query
* @param {Object} path
* @param {function(Error)} callback
* @returns {void}
* @private
* Manages the worker queues for each path through the DHT
*/
function workerQueue (query, path, callback) {
let killed = false
const q = queue((next, cb) => {
query._log('queue:work')
execQuery(next, query, path, (err, done) => {
// Ignore after kill
if (killed) {
return cb()
}
query._log('queue:work:done', err, done)
if (err) {
return cb(err)
}
if (done) {
q.kill()
killed = true
return callback()
}
cb()
})
}, query.concurrency)
class WorkerManager {
/**
* Creates a new WorkerManager
*/
constructor () {
this.running = true
this.workers = []
}

const fill = () => {
query._log('queue:fill')
while (q.length() < query.concurrency &&
path.peersToQuery.length > 0) {
q.push(path.peersToQuery.dequeue())
/**
* Stop all the workers
*/
stop () {
this.running = false
for (const worker of this.workers) {
worker.stop()
}
}

fill()
/**
* Use the queue from async to keep `concurrency` amount items running
* per path.
*
* @param {Query} query
* @param {Object} path
* @param {function(Error)} callback
*/
workerQueue (query, path, callback) {
let workerRunning = true
const q = queue((next, cb) => {
query._log('queue:work')
this.execQuery(next, query, path, (err, state) => {
// Ignore response after worker killed
if (!workerRunning || !this.running) {
return cb()
}

// callback handling
q.error = (err) => {
query._log.error('queue', err)
callback(err)
}
query._log('queue:work:done', err, state)
if (err) {
return cb(err)
}

q.drain = () => {
query._log('queue:drain')
callback()
}
// If query is complete, stop all workers.
// Note: this.stop() calls stop() on all the workers, which kills the
// queue and calls callback(), so we don't need to call cb() here
if (state && state.queryComplete) {
query._log('query:complete')
return this.stop()
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
}

q.unsaturated = () => {
query._log('queue:unsatured')
fill()
}
// If path is complete, just stop this worker.
// Note: worker.stop() kills the queue and calls callback() so we don't
// need to call cb() here
if (state && state.pathComplete) {
return worker.stop()
}

q.buffer = 0
}
// Otherwise, process next peer
cb()
})
}, query.concurrency)

/**
* Execute a query on the `next` peer.
*
* @param {PeerId} next
* @param {Query} query
* @param {Object} path
* @param {function(Error)} callback
* @returns {void}
* @private
*/
function execQuery (next, query, path, callback) {
path.query(next, (err, res) => {
if (err) {
path.run.errors.push(err)
callback()
} else if (res.success) {
path.res = res
callback(null, true)
} else if (res.closerPeers && res.closerPeers.length > 0) {
each(res.closerPeers, (closer, cb) => {
// don't add ourselves
if (query.dht._isSelf(closer.id)) {
return cb()
// Keeps track of a running worker
const worker = {
stop: (err) => {
if (workerRunning) {
q.kill()
workerRunning = false
callback(err)
}
closer = query.dht.peerBook.put(closer)
query.dht._peerDiscovered(closer)
addPeerToQuery(closer.id, query.dht, path, cb)
}, callback)
} else {
callback()
}
}
this.workers.push(worker)

// Add peers to the queue until there are enough to satisfy the concurrency
const fill = () => {
query._log('queue:fill')
while (q.length() < query.concurrency &&
path.peersToQuery.length > 0) {
q.push(path.peersToQuery.dequeue())
}
}

fill()

// If there's an error, stop the worker
q.error = (err) => {
query._log.error('queue', err)
worker.stop(err)
}
})

// When all peers in the queue have been processed, stop the worker
q.drain = () => {
query._log('queue:drain')
worker.stop()
}

// When a space opens up in the queue, add some more peers
q.unsaturated = () => {
query._log('queue:unsaturated')
fill()
}

q.buffer = 0
}

/**
* Execute a query on the `next` peer.
*
* @param {PeerId} next
* @param {Query} query
* @param {Object} path
* @param {function(Error)} callback
* @returns {void}
* @private
*/
execQuery (next, query, path, callback) {
path.query(next, (err, res) => {
// If the run has completed, bail out
if (!this.running) {
return callback()
}

if (err) {
path.run.errors.push(err)
callback()
} else if (res.pathComplete || res.queryComplete) {
path.res = res
callback(null, {
pathComplete: res.pathComplete,
queryComplete: res.queryComplete
})
} else if (res.closerPeers && res.closerPeers.length > 0) {
each(res.closerPeers, (closer, cb) => {
// don't add ourselves
if (query.dht._isSelf(closer.id)) {
return cb()
}
closer = query.dht.peerBook.put(closer)
query.dht._peerDiscovered(closer)
jacobheun marked this conversation as resolved.
Show resolved Hide resolved
addPeerToQuery(closer.id, query.dht, path, cb)
}, callback)
} else {
callback()
}
})
}
}

/**
Expand Down
Loading