Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
fix: use utils.mapParallel for parallel processing of peers (#166)
Browse files Browse the repository at this point in the history
  • Loading branch information
kumavis authored Jun 16, 2020
1 parent eacdc6b commit 534a2d9
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 4 deletions.
4 changes: 2 additions & 2 deletions src/content-fetching/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,15 @@ module.exports = (dht) => {
let counterAll = 0
let counterSuccess = 0

for await (const peer of dht.getClosestPeers(key, { shallow: true })) {
await utils.mapParallel(dht.getClosestPeers(key, { shallow: true }), async (peer) => {
try {
counterAll += 1
await dht._putValueToPeer(key, record, peer)
counterSuccess += 1
} catch (err) {
dht._log.error('Failed to put to peer (%b): %s', peer.id, err)
}
}
})

// verify if we were able to put to enough peers
const minPeers = options.minPeers || counterAll // Ensure we have a default `minPeers`
Expand Down
4 changes: 2 additions & 2 deletions src/content-routing/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ module.exports = (dht) => {
}]

// Notify closest peers
for await (const peer of dht.getClosestPeers(key.buffer)) {
await utils.mapParallel(dht.getClosestPeers(key.buffer), async (peer) => {
dht._log('putProvider %s to %s', key.toBaseEncodedString(), peer.toB58String())
try {
await dht.network.sendMessage(peer, msg)
} catch (err) {
errors.push(err)
}
}
})

if (errors.length) {
// TODO:
Expand Down
19 changes: 19 additions & 0 deletions src/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,22 @@ exports.withTimeout = (asyncFn, time) => {
])
}
}

/**
* Iterates the given `asyncIterator` and runs each item through the given `asyncFn` in parallel.
* Returns a promise that resolves when all items of the `asyncIterator` have been passed
* through `asyncFn`.
*
* @param {AsyncIterable} [asyncIterator]
* @param {Function} [asyncFn]
* @returns {Array}
*
* @private
*/
exports.mapParallel = async function (asyncIterator, asyncFn) {
const tasks = []
for await (const item of asyncIterator) {
tasks.push(asyncFn(item))
}
return Promise.all(tasks)
}

0 comments on commit 534a2d9

Please sign in to comment.