diff --git a/src/content-fetching/index.js b/src/content-fetching/index.js index 3ca14dd3..ec460c51 100644 --- a/src/content-fetching/index.js +++ b/src/content-fetching/index.js @@ -109,7 +109,7 @@ module.exports = (dht) => { let counterAll = 0 let counterSuccess = 0 - for await (const peer of dht.getClosestPeers(key, { shallow: true })) { + await utils.mapParallel(dht.getClosestPeers(key, { shallow: true }), async (peer) => { try { counterAll += 1 await dht._putValueToPeer(key, record, peer) @@ -117,7 +117,7 @@ module.exports = (dht) => { } catch (err) { dht._log.error('Failed to put to peer (%b): %s', peer.id, err) } - } + }) // verify if we were able to put to enough peers const minPeers = options.minPeers || counterAll // Ensure we have a default `minPeers` diff --git a/src/content-routing/index.js b/src/content-routing/index.js index fe7c239d..0c8642ee 100644 --- a/src/content-routing/index.js +++ b/src/content-routing/index.js @@ -44,14 +44,14 @@ module.exports = (dht) => { }] // Notify closest peers - for await (const peer of dht.getClosestPeers(key.buffer)) { + await utils.mapParallel(dht.getClosestPeers(key.buffer), async (peer) => { dht._log('putProvider %s to %s', key.toBaseEncodedString(), peer.toB58String()) try { await dht.network.sendMessage(peer, msg) } catch (err) { errors.push(err) } - } + }) if (errors.length) { // TODO: diff --git a/src/utils.js b/src/utils.js index 44c9deb4..cde0e5ae 100644 --- a/src/utils.js +++ b/src/utils.js @@ -206,3 +206,22 @@ exports.withTimeout = (asyncFn, time) => { ]) } } + +/** + * Iterates the given `asyncIterator` and runs each item through the given `asyncFn` in parallel. + * Returns a promise that resolves when all items of the `asyncIterator` have been passed + * through `asyncFn`. + * + * @param {AsyncIterable} [asyncIterator] + * @param {Function} [asyncFn] + * @returns {Array} + * + * @private + */ +exports.mapParallel = async function (asyncIterator, asyncFn) { + const tasks = [] + for await (const item of asyncIterator) { + tasks.push(asyncFn(item)) + } + return Promise.all(tasks) +}