Skip to content

Commit

Permalink
Run queries on multiple disjoint paths.
Browse files Browse the repository at this point in the history
Fixes libp2p#37.
  • Loading branch information
jhiesey committed Nov 30, 2018
1 parent b731a1d commit 7842259
Show file tree
Hide file tree
Showing 7 changed files with 378 additions and 111 deletions.
3 changes: 3 additions & 0 deletions src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,7 @@ exports.K = 20
// Alpha is the concurrency for asynchronous requests
exports.ALPHA = 3

// Number of disjoint query paths to use
exports.DISJOINT_PATHS = 10

exports.maxMessageSize = 2 << 22 // 4MB
131 changes: 80 additions & 51 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ class KadDHT {
}

this._log('getMany %b (%s)', key, nvals)
const vals = []
let vals = []

this._getLocal(key, (err, localRec) => {
if (err && nvals === 0) {
Expand All @@ -271,6 +271,7 @@ class KadDHT {
return callback(null, vals)
}

const paths = []
waterfall([
(cb) => utils.convertBuffer(key, cb),
(id, cb) => {
Expand All @@ -284,38 +285,49 @@ class KadDHT {
return cb(errcode(new Error(errMsg), 'ERR_NO_PEERS_IN_ROUTING_TABLE'))
}

// we have peers, lets do the actualy query to them
const query = new Query(this, key, (peer, cb) => {
this._getValueOrPeers(peer, key, (err, rec, peers) => {
if (err) {
// If we have an invalid record we just want to continue and fetch a new one.
if (!(err.code === 'ERR_INVALID_RECORD')) {
return cb(err)
// we have peers, lets do the actual query to them
const query = new Query(this, key, (pathIndex, numPaths) => {
// This function body runs once per disjoint path
const pathSize = utils.pathSize(nvals - vals.length, pathIndex, numPaths)
const pathVals = []
paths.push(pathVals)

// Here we return the query function to use on this particular disjoint path
return (peer, cb) => {
this._getValueOrPeers(peer, key, (err, rec, peers) => {
if (err) {
// If we have an invalid record we just want to continue and fetch a new one.
if (!(err.code === 'ERR_INVALID_RECORD')) {
return cb(err)
}
}
}

const res = { closerPeers: peers }
const res = { closerPeers: peers }

if ((rec && rec.value) || (err && err.code === 'ERR_INVALID_RECORD')) {
vals.push({
val: rec && rec.value,
from: peer
})
}
if ((rec && rec.value) || (err && err.code === 'ERR_INVALID_RECORD')) {
pathVals.push({
val: rec && rec.value,
from: peer
})
}

// enough is enough
if (vals.length >= nvals) {
res.success = true
}
// enough is enough
if (pathVals.length >= pathSize) {
res.success = true
}

cb(null, res)
})
cb(null, res)
})
}
})

// run our query
timeout((cb) => query.run(rtp, cb), options.maxTimeout)(cb)
}
], (err) => {
// combine vals from each path
vals = [].concat.apply(vals, paths)

if (err && vals.length === 0) {
return callback(err)
}
Expand All @@ -341,15 +353,20 @@ class KadDHT {

const tablePeers = this.routingTable.closestPeers(id, c.ALPHA)

const q = new Query(this, key, (peer, callback) => {
waterfall([
(cb) => this._closerPeersSingle(key, peer, cb),
(closer, cb) => {
cb(null, {
closerPeers: closer
})
}
], callback)
const q = new Query(this, key, () => {
// There is no distinction between the disjoint paths,
// so there are no per-path variables in this scope.
// Just return the actual query function.
return (peer, callback) => {
waterfall([
(cb) => this._closerPeersSingle(key, peer, cb),
(closer, cb) => {
cb(null, {
closerPeers: closer
})
}
], callback)
}
})

q.run(tablePeers, (err, res) => {
Expand Down Expand Up @@ -549,37 +566,49 @@ class KadDHT {
}

// query the network
const query = new Query(this, id.id, (peer, cb) => {
waterfall([
(cb) => this._findPeerSingle(peer, id, cb),
(msg, cb) => {
const match = msg.closerPeers.find((p) => p.id.isEqual(id))

// found it
if (match) {
return cb(null, {
peer: match,
success: true
const query = new Query(this, id.id, () => {
// There is no distinction between the disjoint paths,
// so there are no per-path variables in this scope.
// Just return the actual query function.
return (peer, cb) => {
waterfall([
(cb) => this._findPeerSingle(peer, id, cb),
(msg, cb) => {
const match = msg.closerPeers.find((p) => p.id.isEqual(id))

// found it
if (match) {
return cb(null, {
peer: match,
success: true
})
}

cb(null, {
closerPeers: msg.closerPeers
})
}

cb(null, {
closerPeers: msg.closerPeers
})
}
], cb)
], cb)
}
})

timeout((cb) => {
query.run(peers, cb)
}, options.maxTimeout)(cb)
},
(result, cb) => {
this._log('findPeer %s: %s', id.toB58String(), result.success)
if (!result.peer) {
let success = false
result.paths.forEach((result) => {
if (result.success) {
success = true
this.peerBook.put(result.peer)
}
})
this._log('findPeer %s: %s', id.toB58String(), success)
if (!success) {
return cb(errcode(new Error('No peer found'), 'ERR_NOT_FOUND'))
}
cb(null, result.peer)
cb(null, this.peerBook.get(id))
}
], callback)
})
Expand Down
54 changes: 35 additions & 19 deletions src/private.js
Original file line number Diff line number Diff line change
Expand Up @@ -489,33 +489,49 @@ module.exports = (dht) => ({
}

// need more, query the network
const query = new Query(dht, key.buffer, (peer, cb) => {
waterfall([
(cb) => dht._findProvidersSingle(peer, key, cb),
(msg, cb) => {
const provs = msg.providerPeers
dht._log('(%s) found %s provider entries', dht.peerInfo.id.toB58String(), provs.length)
const paths = []
const query = new Query(dht, key.buffer, (pathIndex, numPaths) => {
// This function body runs once per disjoint path
const pathSize = utils.pathSize(out.length - n, pathIndex, numPaths)
const pathProviders = new LimitedPeerList(pathSize)
paths.push(pathProviders)

// Here we return the query function to use on this particular disjoint path
return (peer, cb) => {
waterfall([
(cb) => dht._findProvidersSingle(peer, key, cb),
(msg, cb) => {
const provs = msg.providerPeers
dht._log('(%s) found %s provider entries', dht.peerInfo.id.toB58String(), provs.length)

provs.forEach((prov) => {
pathProviders.push(dht.peerBook.put(prov))
})

provs.forEach((prov) => {
out.push(dht.peerBook.put(prov))
})
// hooray we have all that we want
if (pathProviders.length >= pathSize) {
return cb(null, {success: true})
}

// hooray we have all that we want
if (out.length >= n) {
return cb(null, {success: true})
// it looks like we want some more
cb(null, {
closerPeers: msg.closerPeers
})
}

// it looks like we want some more
cb(null, {
closerPeers: msg.closerPeers
})
}
], cb)
], cb)
}
})

const peers = dht.routingTable.closestPeers(key.buffer, c.ALPHA)

timeout((cb) => query.run(peers, cb), maxTimeout)((err) => {
// combine peers from each path
paths.forEach((path) => {
path.toArray().forEach((peer) => {
out.push(peer)
})
})

if (err) {
if (err.code === 'ETIMEDOUT' && out.length > 0) {
return callback(null, out.toArray())
Expand Down
Loading

0 comments on commit 7842259

Please sign in to comment.