diff --git a/src/index.js b/src/index.js index efc46bda..c06ebb83 100644 --- a/src/index.js +++ b/src/index.js @@ -359,7 +359,7 @@ class KadDHT extends EventEmitter { // enough is enough if (pathVals.length >= pathSize) { - res.success = true + res.pathComplete = true } cb(null, res) @@ -368,7 +368,12 @@ class KadDHT extends EventEmitter { }) // run our query - timeout((cb) => query.run(rtp, cb), options.timeout)(cb) + timeout((_cb) => { + query.run(rtp, _cb) + }, options.timeout)((err, res) => { + query.stop() + cb(err, res) + }) } ], (err) => { // combine vals from each path @@ -419,7 +424,7 @@ class KadDHT extends EventEmitter { (closer, cb) => { cb(null, { closerPeers: closer, - success: options.shallow ? true : undefined + pathComplete: options.shallow ? true : undefined }) } ], callback) @@ -636,7 +641,7 @@ class KadDHT extends EventEmitter { if (match) { return cb(null, { peer: match, - success: true + queryComplete: true }) } @@ -648,9 +653,12 @@ class KadDHT extends EventEmitter { } }) - timeout((cb) => { - query.run(peers, cb) - }, options.timeout)(cb) + timeout((_cb) => { + query.run(peers, _cb) + }, options.timeout)((err, res) => { + query.stop() + cb(err, res) + }) }, (result, cb) => { let success = false diff --git a/src/private.js b/src/private.js index 6ee75663..a82c7de6 100644 --- a/src/private.js +++ b/src/private.js @@ -510,7 +510,7 @@ module.exports = (dht) => ({ // hooray we have all that we want if (pathProviders.length >= pathSize) { - return cb(null, { success: true }) + return cb(null, { pathComplete: true }) } // it looks like we want some more @@ -525,6 +525,8 @@ module.exports = (dht) => ({ const peers = dht.routingTable.closestPeers(key.buffer, c.ALPHA) timeout((cb) => query.run(peers, cb), providerTimeout)((err) => { + query.stop() + // combine peers from each path paths.forEach((path) => { path.toArray().forEach((peer) => { diff --git a/src/query.js b/src/query.js index a11cab40..97e044b0 100644 --- a/src/query.js +++ b/src/query.js @@ -85,6 +85,8 @@ class Query { } }) + // Create a manager to keep track of the worker queue for each path + this.workerManager = new WorkerManager() each(run.paths, (path, cb) => { waterfall([ (cb) => PeerQueue.fromKey(this.key, cb), @@ -92,10 +94,16 @@ class Query { path.peersToQuery = q each(path.peers, (p, cb) => addPeerToQuery(p, this.dht, path, cb), cb) }, - (cb) => workerQueue(this, path, cb) + (cb) => { + this.workerManager.workerQueue(this, path, cb) + } ], cb) }, (err, results) => { this._log('query:done') + + // Ensure worker queues for all paths are stopped at the end of the query + this.workerManager.stop() + if (err) { return callback(err) } @@ -110,7 +118,8 @@ class Query { } run.paths.forEach((path) => { - if (path.res && path.res.success) { + if (path.res && (path.res.pathComplete || path.res.queryComplete)) { + path.res.success = true run.res.paths.push(path.res) } }) @@ -118,101 +127,165 @@ class Query { callback(null, run.res) }) } + + /** + * Stop the query + */ + stop () { + this.workerManager && this.workerManager.stop() + } } /** - * Use the queue from async to keep `concurrency` amount items running - * per path. - * - * @param {Query} query - * @param {Object} path - * @param {function(Error)} callback - * @returns {void} - * @private + * Manages the worker queues for each path through the DHT */ -function workerQueue (query, path, callback) { - let killed = false - const q = queue((next, cb) => { - query._log('queue:work') - execQuery(next, query, path, (err, done) => { - // Ignore after kill - if (killed) { - return cb() - } - query._log('queue:work:done', err, done) - if (err) { - return cb(err) - } - if (done) { - q.kill() - killed = true - return callback() - } - cb() - }) - }, query.concurrency) +class WorkerManager { + /** + * Creates a new WorkerManager + */ + constructor () { + this.running = true + this.workers = [] + } - const fill = () => { - query._log('queue:fill') - while (q.length() < query.concurrency && - path.peersToQuery.length > 0) { - q.push(path.peersToQuery.dequeue()) + /** + * Stop all the workers + */ + stop () { + this.running = false + for (const worker of this.workers) { + worker.stop() } } - fill() + /** + * Use the queue from async to keep `concurrency` amount items running + * per path. + * + * @param {Query} query + * @param {Object} path + * @param {function(Error)} callback + */ + workerQueue (query, path, callback) { + let workerRunning = true + const q = queue((next, cb) => { + query._log('queue:work') + this.execQuery(next, query, path, (err, state) => { + // Ignore response after worker killed + if (!workerRunning || !this.running) { + return cb() + } - // callback handling - q.error = (err) => { - query._log.error('queue', err) - callback(err) - } + query._log('queue:work:done', err, state) + if (err) { + return cb(err) + } - q.drain = () => { - query._log('queue:drain') - callback() - } + // If query is complete, stop all workers. + // Note: this.stop() calls stop() on all the workers, which kills the + // queue and calls callback(), so we don't need to call cb() here + if (state && state.queryComplete) { + query._log('query:complete') + return this.stop() + } - q.unsaturated = () => { - query._log('queue:unsatured') - fill() - } + // If path is complete, just stop this worker. + // Note: worker.stop() kills the queue and calls callback() so we don't + // need to call cb() here + if (state && state.pathComplete) { + return worker.stop() + } - q.buffer = 0 -} + // Otherwise, process next peer + cb() + }) + }, query.concurrency) -/** - * Execute a query on the `next` peer. - * - * @param {PeerId} next - * @param {Query} query - * @param {Object} path - * @param {function(Error)} callback - * @returns {void} - * @private - */ -function execQuery (next, query, path, callback) { - path.query(next, (err, res) => { - if (err) { - path.run.errors.push(err) - callback() - } else if (res.success) { - path.res = res - callback(null, true) - } else if (res.closerPeers && res.closerPeers.length > 0) { - each(res.closerPeers, (closer, cb) => { - // don't add ourselves - if (query.dht._isSelf(closer.id)) { - return cb() + // Keeps track of a running worker + const worker = { + stop: (err) => { + if (workerRunning) { + q.kill() + workerRunning = false + callback(err) } - closer = query.dht.peerBook.put(closer) - query.dht._peerDiscovered(closer) - addPeerToQuery(closer.id, query.dht, path, cb) - }, callback) - } else { - callback() + } + } + this.workers.push(worker) + + // Add peers to the queue until there are enough to satisfy the concurrency + const fill = () => { + query._log('queue:fill') + while (q.length() < query.concurrency && + path.peersToQuery.length > 0) { + q.push(path.peersToQuery.dequeue()) + } + } + + fill() + + // If there's an error, stop the worker + q.error = (err) => { + query._log.error('queue', err) + worker.stop(err) } - }) + + // When all peers in the queue have been processed, stop the worker + q.drain = () => { + query._log('queue:drain') + worker.stop() + } + + // When a space opens up in the queue, add some more peers + q.unsaturated = () => { + query._log('queue:unsaturated') + fill() + } + + q.buffer = 0 + } + + /** + * Execute a query on the `next` peer. + * + * @param {PeerId} next + * @param {Query} query + * @param {Object} path + * @param {function(Error)} callback + * @returns {void} + * @private + */ + execQuery (next, query, path, callback) { + path.query(next, (err, res) => { + // If the run has completed, bail out + if (!this.running) { + return callback() + } + + if (err) { + path.run.errors.push(err) + callback() + } else if (res.pathComplete || res.queryComplete) { + path.res = res + callback(null, { + pathComplete: res.pathComplete, + queryComplete: res.queryComplete + }) + } else if (res.closerPeers && res.closerPeers.length > 0) { + each(res.closerPeers, (closer, cb) => { + // don't add ourselves + if (query.dht._isSelf(closer.id)) { + return cb() + } + closer = query.dht.peerBook.put(closer) + query.dht._peerDiscovered(closer) + addPeerToQuery(closer.id, query.dht, path, cb) + }, callback) + } else { + callback() + } + }) + } } /** diff --git a/test/query.spec.js b/test/query.spec.js index 778ee684..8f96135f 100644 --- a/test/query.spec.js +++ b/test/query.spec.js @@ -8,6 +8,7 @@ const PeerBook = require('peer-book') const Switch = require('libp2p-switch') const TCP = require('libp2p-tcp') const Mplex = require('libp2p-mplex') +const setImmediate = require('async/setImmediate') const DHT = require('../src') const Query = require('../src/query') @@ -21,7 +22,7 @@ describe('Query', () => { before(function (done) { this.timeout(5 * 1000) - createPeerInfo(10, (err, result) => { + createPeerInfo(12, (err, result) => { if (err) { return done(err) } @@ -50,7 +51,7 @@ describe('Query', () => { return cb(null, { value: Buffer.from('cool'), - success: true + pathComplete: true }) } expect(p.id).to.eql(peerInfos[1].id.id) @@ -69,6 +70,37 @@ describe('Query', () => { }) }) + it('does not return an error if only some queries error', (done) => { + const peer = peerInfos[0] + + // mock this so we can dial non existing peers + dht.switch.dial = (peer, callback) => callback() + + let i = 0 + const query = (p, cb) => { + if (i++ === 1) { + return cb(new Error('fail')) + } + cb(null, { + closerPeers: [peerInfos[2]] + }) + } + + const q = new Query(dht, peer.id.id, () => query) + q.run([peerInfos[1].id], (err, res) => { + expect(err).not.to.exist() + + // Should have visited + // - the initial peer passed to the query: peerInfos[1] + // - the peer returned in closerPeers: peerInfos[2] + expect(res.finalSet.size).to.eql(2) + expect(res.finalSet.has(peerInfos[1].id)).to.equal(true) + expect(res.finalSet.has(peerInfos[2].id)).to.equal(true) + + done() + }) + }) + it('returns an error if all queries error', (done) => { const peer = peerInfos[0] @@ -105,6 +137,309 @@ describe('Query', () => { }) }) + it('only closerPeers concurrent', (done) => { + const peer = peerInfos[0] + + // mock this so we can dial non existing peers + dht.switch.dial = (peer, callback) => callback() + + // 1 -> 8 + // 2 -> 4 -> 5 + // 6 -> 7 + // 3 -> 9 -> 10 + const topology = { + [peerInfos[1].id.toB58String()]: [ + peerInfos[8] + ], + + [peerInfos[2].id.toB58String()]: [ + peerInfos[4], + peerInfos[6] + ], + [peerInfos[4].id.toB58String()]: [ + peerInfos[5] + ], + [peerInfos[6].id.toB58String()]: [ + peerInfos[7] + ], + + [peerInfos[3].id.toB58String()]: [ + peerInfos[9] + ], + [peerInfos[9].id.toB58String()]: [ + peerInfos[10] + ] + } + + const query = (p, cb) => { + const closer = topology[p.toB58String()] + cb(null, { + closerPeers: closer || [] + }) + } + + const q = new Query(dht, peer.id.id, () => query) + q.run([peerInfos[1].id, peerInfos[2].id, peerInfos[3].id], (err, res) => { + expect(err).to.not.exist() + + // Should visit all peers + expect(res.finalSet.size).to.eql(10) + done() + }) + }) + + it('early success', (done) => { + const peer = peerInfos[0] + + // mock this so we can dial non existing peers + dht.switch.dial = (peer, callback) => callback() + + // 1 -> 2 -> 3 -> 4 + const topology = { + [peerInfos[1].id.toB58String()]: { + closer: [peerInfos[2]] + }, + // Should stop here because pathComplete is true + [peerInfos[2].id.toB58String()]: { + closer: [peerInfos[3]], + pathComplete: true + }, + // Should not reach here because previous query returns pathComplete + [peerInfos[3].id.toB58String()]: { + closer: [peerInfos[4]] + } + } + + const query = (p, cb) => { + const res = topology[p.toB58String()] || {} + cb(null, { + closerPeers: res.closer || [], + value: res.value, + pathComplete: res.pathComplete + }) + } + + const q = new Query(dht, peer.id.id, () => query) + q.run([peerInfos[1].id], (err, res) => { + expect(err).to.not.exist() + + // Should complete successfully + expect(res.paths.length).to.eql(1) + expect(res.paths[0].success).to.eql(true) + + // Should only visit peers up to the success peer + expect(res.finalSet.size).to.eql(2) + + done() + }) + }) + + it('disjoint path values', (done) => { + const peer = peerInfos[0] + const values = ['v0', 'v1'].map(Buffer.from) + + // mock this so we can dial non existing peers + dht.switch.dial = (peer, callback) => callback() + + // 1 -> 2 -> 3 (v0) + // 4 -> 5 (v1) + const topology = { + // Top level node + [peerInfos[1].id.toB58String()]: { + closer: [peerInfos[2]] + }, + [peerInfos[2].id.toB58String()]: { + closer: [peerInfos[3]] + }, + // v0 + [peerInfos[3].id.toB58String()]: { + value: values[0], + pathComplete: true + }, + + // Top level node + [peerInfos[4].id.toB58String()]: { + closer: [peerInfos[5]] + }, + // v1 + [peerInfos[5].id.toB58String()]: { + value: values[1], + pathComplete: true + } + } + + const query = (p, cb) => { + const res = topology[p.toB58String()] || {} + setTimeout(() => { + cb(null, { + closerPeers: res.closer || [], + value: res.value, + pathComplete: res.pathComplete + }) + }, res.delay) + } + + const q = new Query(dht, peer.id.id, () => query) + q.run([peerInfos[1].id, peerInfos[4].id], (err, res) => { + expect(err).to.not.exist() + + // We should get back the values from both paths + expect(res.paths.length).to.eql(2) + expect(res.paths[0].value).to.eql(values[0]) + expect(res.paths[0].success).to.eql(true) + expect(res.paths[1].value).to.eql(values[1]) + expect(res.paths[1].success).to.eql(true) + + done() + }) + }) + + it('disjoint path values with early completion', (done) => { + const peer = peerInfos[0] + const values = ['v0', 'v1'].map(Buffer.from) + + // mock this so we can dial non existing peers + dht.switch.dial = (peer, callback) => callback() + + // 1 -> 2 (delay) -> 3 + // 4 -> 5 [query complete] + const topology = { + // Top level node + [peerInfos[1].id.toB58String()]: { + closer: [peerInfos[2]] + }, + // This query has a delay which means it only returns after the other + // path has already indicated the query is complete, so its result + // should be ignored + [peerInfos[2].id.toB58String()]: { + delay: 100, + closer: [peerInfos[3]] + }, + // Query has stopped by the time we reach here, should be ignored + [peerInfos[3].id.toB58String()]: { + value: values[0], + pathComplete: true + }, + + // Top level node + [peerInfos[4].id.toB58String()]: { + closer: [peerInfos[5]] + }, + // This peer indicates that the query is complete + [peerInfos[5].id.toB58String()]: { + closer: [peerInfos[2]], + value: values[1], + queryComplete: true + } + } + + const visited = [] + const query = (p, cb) => { + visited.push(p) + + const res = topology[p.toB58String()] || {} + setTimeout(() => { + cb(null, { + closerPeers: res.closer || [], + value: res.value, + pathComplete: res.pathComplete, + queryComplete: res.queryComplete + }) + }, res.delay) + } + + const q = new Query(dht, peer.id.id, () => query) + q.run([peerInfos[1].id, peerInfos[4].id], (err, res) => { + expect(err).to.not.exist() + + // We should only get back the value from the path 4 -> 5 + expect(res.paths.length).to.eql(1) + expect(res.paths[0].value).to.eql(values[1]) + expect(res.paths[0].success).to.eql(true) + + // Wait a little bit to make sure we don't continue down another path + // after finding a successful path + setTimeout(() => { + if (visited.indexOf(peerInfos[3].id) !== -1) { + expect.fail('Query continued after success was returned') + } + done() + }, 300) + }) + }) + + it('disjoint path continue other paths after error on one path', (done) => { + const peer = peerInfos[0] + const values = ['v0', 'v1'].map(Buffer.from) + + // mock this so we can dial non existing peers + dht.switch.dial = (peer, callback) => callback() + + // 1 -> 2 (delay) -> 3 [pathComplete] + // 4 -> 5 [error] -> 6 + const topology = { + // Top level node + [peerInfos[1].id.toB58String()]: { + closer: [peerInfos[2]] + }, + // This query has a delay which means it only returns after the other + // path has already returned an error + [peerInfos[2].id.toB58String()]: { + delay: 100, + closer: [peerInfos[3]] + }, + // Success peer, should get this value back at the end + [peerInfos[3].id.toB58String()]: { + value: values[0], + pathComplete: true + }, + + // Top level node + [peerInfos[4].id.toB58String()]: { + closer: [peerInfos[5]] + }, + // Return an error at this point + [peerInfos[5].id.toB58String()]: { + closer: [peerInfos[6]], + error: true + }, + // Should never reach here + [peerInfos[6].id.toB58String()]: { + value: values[1], + pathComplete: true + } + } + + const visited = [] + const query = (p, cb) => { + visited.push(p) + + const res = topology[p.toB58String()] || {} + setTimeout(() => { + if (res.error) { + return cb(new Error('path error')) + } + cb(null, { + closerPeers: res.closer || [], + value: res.value, + pathComplete: res.pathComplete + }) + }, res.delay) + } + + const q = new Query(dht, peer.id.id, () => query) + q.run([peerInfos[1].id, peerInfos[4].id], (err, res) => { + expect(err).to.not.exist() + + // We should only get back the value from the path 1 -> 2 -> 3 + expect(res.paths.length).to.eql(1) + expect(res.paths[0].value).to.eql(values[0]) + expect(res.paths[0].success).to.eql(true) + + done() + }) + }) + /* * This test creates two disjoint tracks of peers, one for * each of the query's two paths to follow. The "good" @@ -130,18 +465,20 @@ describe('Query', () => { // mock this so we can dial non existing peers dht.switch.dial = (peer, callback) => callback() let badEndVisited = false + let targetVisited = false const q = new Query(dht, targetId, (trackNum) => { return (p, cb) => { const response = getResponse(p, trackNum) expect(response).to.exist() // or we aren't on the right track - if (response.end && !response.success) { + if (response.end && !response.pathComplete) { badEndVisited = true } - if (response.success) { + if (response.pathComplete) { + targetVisited = true expect(badEndVisited).to.eql(false) } - cb(null, response) + setImmediate(() => cb(null, response)) } }) q.concurrency = 1 @@ -149,6 +486,8 @@ describe('Query', () => { // path is good, second bad q.run(starts, (err, res) => { expect(err).to.not.exist() + // we should reach the target node + expect(targetVisited).to.eql(true) // we should visit all nodes (except the target) expect(res.finalSet.size).to.eql(peerInfos.length - 1) // there should be one successful path diff --git a/test/utils/create-disjoint-tracks.js b/test/utils/create-disjoint-tracks.js index 6dbdb7ef..8bec7db8 100644 --- a/test/utils/create-disjoint-tracks.js +++ b/test/utils/create-disjoint-tracks.js @@ -70,10 +70,10 @@ function createDisjointTracks (peerInfos, goodLength, callback) { const nextPos = pos + 1 // if we're at the end of the track if (nextPos === track.length) { - if (trackNum === 0) { // good track; success + if (trackNum === 0) { // good track; pathComplete return { end: true, - success: true + pathComplete: true } } else { // bad track; dead end return {