From 5f6c0023961d9de2bcab3d400309126b13ca4676 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Thu, 18 Aug 2016 11:13:55 +0200 Subject: [PATCH] refactor: use modular async Fixes #14 --- src/decision/engine.js | 8 ++++---- src/index.js | 13 ++++++++----- src/network/index.js | 5 ++--- src/wantmanager/msg-queue.js | 4 ++-- test/browser.js | 4 ++-- test/decision/engine-test.js | 20 ++++++++++++-------- test/index-test.js | 23 +++++++++++++---------- test/network/gen-bitswap-network.node.js | 24 +++++++++++++----------- test/node.js | 4 ++-- test/utils.js | 18 ++++++++++-------- 10 files changed, 68 insertions(+), 55 deletions(-) diff --git a/src/decision/engine.js b/src/decision/engine.js index 0a6f5f0e..5e3bc892 100644 --- a/src/decision/engine.js +++ b/src/decision/engine.js @@ -1,7 +1,7 @@ 'use strict' const debug = require('debug') -const async = require('async') +const eachSeries = require('async/eachSeries') const mh = require('multihashes') const pull = require('pull-stream') const generate = require('pull-generate') @@ -115,11 +115,11 @@ module.exports = class Engine { this._processBlocks(msg.blocks, ledger) log('wantlist', Array.from(msg.wantlist.values()).map((e) => e.toString())) - async.eachSeries( + eachSeries( msg.wantlist.values(), this._processWantlist.bind(this, ledger, peerId), (err) => { - const done = (err) => async.setImmediate(() => cb(err)) + const done = (err) => setImmediate(() => cb(err)) if (err) return done(err) this._outbox() done() @@ -148,7 +148,7 @@ module.exports = class Engine { log('cancel %s', mh.toB58String(entry.key)) ledger.cancelWant(entry.key) this.peerRequestQueue.remove(entry.key, peerId) - async.setImmediate(() => cb()) + setImmediate(() => cb()) } else { log('wants %s - %s', mh.toB58String(entry.key), entry.priority) ledger.wants(entry.key, entry.priority) diff --git a/src/index.js b/src/index.js index 94bf16ec..524b3e78 100644 --- a/src/index.js +++ b/src/index.js @@ -1,6 +1,9 @@ 'use strict' -const async = require('async') +const eachLimit = require('async/eachLimit') +const series = require('async/series') +const retry = require('async/retry') +const parallel = require('async/parallel') const debug = require('debug') const log = debug('bitswap') log.error = debug('bitswap:error') @@ -64,8 +67,8 @@ module.exports = class Bitwap { this.wm.cancelWants(keys) - async.eachLimit(iblocks.values(), 10, (block, next) => { - async.series([ + eachLimit(iblocks.values(), 10, (block, next) => { + series([ (innerCb) => this._updateReceiveCounters(block, (err) => { if (err) { // ignore, as these have been handled in _updateReceiveCounters @@ -106,7 +109,7 @@ module.exports = class Bitwap { _tryPutBlock (block, times, cb) { log('trying to put block %s', block.data.toString()) - async.retry({times, interval: 400}, (done) => { + retry({times, interval: 400}, (done) => { this.datastore.put(block, done) }, cb) } @@ -210,7 +213,7 @@ module.exports = class Bitwap { addListeners() this.wm.wantBlocks(keys) - async.parallel(keys.map((key) => (cb) => { + parallel(keys.map((key) => (cb) => { // We don't want to announce looking for blocks // when we might have them ourselves. this.datastore.has(key, (err, exists) => { diff --git a/src/network/index.js b/src/network/index.js index 3300da33..ee37af95 100644 --- a/src/network/index.js +++ b/src/network/index.js @@ -1,7 +1,6 @@ 'use strict' const bl = require('bl') -const async = require('async') const debug = require('debug') const lps = require('length-prefixed-stream') @@ -87,7 +86,7 @@ module.exports = class Network { // Connect to the given peer connectTo (peerId, cb) { log('connecting to %s', peerId.toB58String()) - const done = (err) => async.setImmediate(() => cb(err)) + const done = (err) => setImmediate(() => cb(err)) // NOTE: For now, all this does is ensure that we are // connected. Once we have Peer Routing, we will be able // to find the Peer @@ -102,7 +101,7 @@ module.exports = class Network { sendMessage (peerId, msg, cb) { log('sendMessage to %s', peerId.toB58String()) log('msg %s', msg.full, msg.wantlist, msg.blocks) - const done = (err) => async.setImmediate(() => cb(err)) + const done = (err) => setImmediate(() => cb(err)) let peerInfo try { peerInfo = this.peerBook.getByMultihash(peerId.toBytes()) diff --git a/src/wantmanager/msg-queue.js b/src/wantmanager/msg-queue.js index d8cbae5c..f20d0f8e 100644 --- a/src/wantmanager/msg-queue.js +++ b/src/wantmanager/msg-queue.js @@ -1,7 +1,7 @@ 'use strict' const debug = require('debug') -const async = require('async') +const queue = require('async/queue') const Message = require('../message') @@ -14,7 +14,7 @@ module.exports = class MsgQueue { this.network = network this.refcnt = 1 - this.queue = async.queue(this.doWork.bind(this), 1) + this.queue = queue(this.doWork.bind(this), 1) // only start when `run` is called this.queue.pause() } diff --git a/test/browser.js b/test/browser.js index b42a8e57..e3596df8 100644 --- a/test/browser.js +++ b/test/browser.js @@ -1,6 +1,6 @@ 'use strict' -const async = require('async') +const eachSeries = require('async/eachSeries') const store = require('idb-plus-blob-store') const _ = require('lodash') const IPFSRepo = require('ipfs-repo') @@ -29,7 +29,7 @@ function createRepo (id, done) { dbs.push(id) - async.eachSeries(repoData, (file, cb) => { + eachSeries(repoData, (file, cb) => { if (_.startsWith(file.key, 'datastore/')) { return cb() } diff --git a/test/decision/engine-test.js b/test/decision/engine-test.js index af3943f0..f4a1a732 100644 --- a/test/decision/engine-test.js +++ b/test/decision/engine-test.js @@ -5,7 +5,11 @@ const expect = require('chai').expect const PeerId = require('peer-id') const _ = require('lodash') const Block = require('ipfs-block') -const async = require('async') +const parallel = require('async/parallel') +const eachLimit = require('async/eachLimit') +const each = require('async/each') +const series = require('async/series') +const eachSeries = require('async/eachSeries') const Message = require('../../src/message') const Engine = require('../../src/decision/engine') @@ -32,7 +36,7 @@ module.exports = (repo) => { }) it('consistent accounting', (done) => { - async.parallel([ + parallel([ (cb) => newEngine('Ernie', cb), (cb) => newEngine('Bert', cb) ], (err, res) => { @@ -41,7 +45,7 @@ module.exports = (repo) => { const sender = res[0] const receiver = res[1] - async.eachLimit(_.range(1000), 100, (i, cb) => { + eachLimit(_.range(1000), 100, (i, cb) => { const m = new Message(false) const content = `this is message ${i}` m.addBlock(new Block(content)) @@ -80,7 +84,7 @@ module.exports = (repo) => { }) it('peer is added to peers when message receiver or sent', (done) => { - async.parallel([ + parallel([ (cb) => newEngine('sf', cb), (cb) => newEngine('sea', cb) ], (err, res) => { @@ -129,7 +133,7 @@ module.exports = (repo) => { repo.create('p', (err, repo) => { expect(err).to.not.exist - async.each(alphabet, (letter, cb) => { + each(alphabet, (letter, cb) => { const block = new Block(letter) repo.datastore.put(block, cb) }, (err) => { @@ -155,8 +159,8 @@ module.exports = (repo) => { e.messageReceived(p, cancels, cb) } - async.eachSeries(_.range(numRounds), (i, cb) => { - async.eachSeries(testCases, (testcase, innerCb) => { + eachSeries(_.range(numRounds), (i, cb) => { + eachSeries(testCases, (testcase, innerCb) => { const set = testcase[0] const cancels = testcase[1] const keeps = _.difference(set, cancels) @@ -178,7 +182,7 @@ module.exports = (repo) => { const e = new Engine(repo.datastore, network) e.start() const partner = PeerId.create({bits: 64}) - async.series([ + series([ (c) => partnerWants(e, set, partner, c), (c) => partnerCancels(e, cancels, partner, c) ], (err) => { diff --git a/test/index-test.js b/test/index-test.js index 6a6fc1b0..1d315935 100644 --- a/test/index-test.js +++ b/test/index-test.js @@ -2,7 +2,10 @@ /* eslint max-nested-callbacks: ["error", 8]*/ 'use strict' -const async = require('async') +const map = require('async/map') +const eachSeries = require('async/eachSeries') +const waterfall = require('async/waterfall') +const each = require('async/each') const _ = require('lodash') const expect = require('chai').expect const PeerId = require('peer-id') @@ -62,7 +65,7 @@ module.exports = (repo) => { expect(bs.blocksRecvd).to.be.eql(2) expect(bs.dupBlocksRecvd).to.be.eql(0) - async.map([b1, b1], + map([b1, b1], (val, cb) => store.get(val.key, cb), (err, res) => { if (err) return done(err) @@ -116,7 +119,7 @@ module.exports = (repo) => { return m }) let i = 0 - async.eachSeries(others, (other, cb) => { + eachSeries(others, (other, cb) => { const msg = messages[i] i++ bs._receiveMessage(other, msg, (err) => { @@ -166,7 +169,7 @@ module.exports = (repo) => { const block = makeBlock() let mockNet - async.waterfall([ + waterfall([ (cb) => utils.createMockNet(repo, 2, cb), (net, cb) => { mockNet = net @@ -218,13 +221,13 @@ module.exports = (repo) => { if (id.toHexString() !== other.toHexString()) { err = new Error('unkown peer') } - async.setImmediate(() => cb(err)) + setImmediate(() => cb(err)) }, sendMessage (id, msg, cb) { if (id.toHexString() === other.toHexString()) { bs2._receiveMessage(me, msg, cb) } else { - async.setImmediate(() => cb(new Error('unkown peer'))) + setImmediate(() => cb(new Error('unkown peer'))) } }, start () {}, @@ -236,13 +239,13 @@ module.exports = (repo) => { if (id.toHexString() !== me.toHexString()) { err = new Error('unkown peer') } - async.setImmediate(() => cb(err)) + setImmediate(() => cb(err)) }, sendMessage (id, msg, cb) { if (id.toHexString() === me.toHexString()) { bs1._receiveMessage(other, msg, cb) } else { - async.setImmediate(() => cb(new Error('unkown peer'))) + setImmediate(() => cb(new Error('unkown peer'))) } }, start () {}, @@ -254,7 +257,7 @@ module.exports = (repo) => { let store2 - async.waterfall([ + waterfall([ (cb) => repo.create('world', cb), (repo, cb) => { store2 = repo.datastore @@ -335,7 +338,7 @@ module.exports = (repo) => { } function hasBlocks (msg, store, cb) { - async.each(Array.from(msg.blocks.values()), (b, next) => { + each(Array.from(msg.blocks.values()), (b, next) => { if (!b.cancel) { store.has(b.key, next) } else { diff --git a/test/network/gen-bitswap-network.node.js b/test/network/gen-bitswap-network.node.js index faf10c7c..2057029f 100644 --- a/test/network/gen-bitswap-network.node.js +++ b/test/network/gen-bitswap-network.node.js @@ -4,7 +4,9 @@ const expect = require('chai').expect const utils = require('../utils') -const async = require('async') +const series = require('async/series') +const parallel = require('async/parallel') +const each = require('async/each') const _ = require('lodash') const Block = require('ipfs-block') const Buffer = require('safe-buffer').Buffer @@ -23,15 +25,15 @@ describe('gen Bitswap network', function () { return new Block(b) }) - async.series([ + series([ (cb) => { - async.parallel(blocks.map((b) => (cb) => { + parallel(blocks.map((b) => (cb) => { node.bitswap.hasBlock(b, cb) }), cb) }, (cb) => { - async.each(_.range(100), (i, cb) => { - async.parallel(blocks.map((b) => (cb) => { + each(_.range(100), (i, cb) => { + parallel(blocks.map((b) => (cb) => { node.bitswap.getBlock(b.key, (err, res) => { expect(err).to.not.exist expect(res).to.be.eql(b) @@ -80,17 +82,17 @@ describe('gen Bitswap network', function () { const d = (new Date()).getTime() - async.parallel(_.map(nodeArr, (node, i) => (callback) => { + parallel(_.map(nodeArr, (node, i) => (callback) => { node.bitswap.start() - async.parallel([ + parallel([ (finish) => { - async.parallel(_.range(blockFactor).map((j) => (cb) => { + parallel(_.range(blockFactor).map((j) => (cb) => { // console.log('has node:%s block %s', i, i * blockFactor + j) node.bitswap.hasBlock(blocks[i * blockFactor + j], cb) }), finish) }, (finish) => { - async.parallel(_.map(blocks, (b, j) => (cb) => { + parallel(_.map(blocks, (b, j) => (cb) => { node.bitswap.getBlock(b.key, (err, res) => { // console.log('node:%s got block: %s', i, j) expect(err).to.not.exist @@ -107,13 +109,13 @@ describe('gen Bitswap network', function () { }) } - async.series( + series( _.range(2).map((i) => (cb) => round(i, cb)), (err) => { // setTimeout is used to avoid closing the TCP socket while spdy is // still sending a ton of signalling data setTimeout(() => { - async.parallel(nodeArr.map((node) => (cb) => { + parallel(nodeArr.map((node) => (cb) => { node.bitswap.stop() node.libp2p.stop(cb) }), (err2) => { diff --git a/test/node.js b/test/node.js index 2842a8f6..4f59cd8d 100644 --- a/test/node.js +++ b/test/node.js @@ -6,7 +6,7 @@ const ncp = require('ncp') const rimraf = require('rimraf') const fs = require('fs-blob-store') const testRepoPath = path.join(__dirname, 'test-repo') -const async = require('async') +const each = require('async/each') // book keeping const repos = [] @@ -24,7 +24,7 @@ function createRepo (id, done) { } function removeRepos (done) { - async.each(repos, (repo, cb) => { + each(repos, (repo, cb) => { rimraf(repo, cb) }, done) } diff --git a/test/utils.js b/test/utils.js index cbe24956..753f9efd 100644 --- a/test/utils.js +++ b/test/utils.js @@ -1,6 +1,8 @@ 'use strict' -const async = require('async') +const each = require('async/each') +const eachSeries = require('async/eachSeries') +const map = require('async/map') const _ = require('lodash') const PeerId = require('peer-id') const PeerInfo = require('peer-info') @@ -28,13 +30,13 @@ exports.mockNetwork = (calls, done) => { return { connectTo (p, cb) { - async.setImmediate(() => { + setImmediate(() => { connects.push(p) cb() }) }, sendMessage (p, msg, cb) { - async.setImmediate(() => { + setImmediate(() => { messages.push([p, msg]) cb() finish() @@ -46,7 +48,7 @@ exports.mockNetwork = (calls, done) => { } exports.createMockNet = (repo, count, cb) => { - async.map(_.range(count), (i, cb) => repo.create(`repo-${i}`, (err, res) => { + map(_.range(count), (i, cb) => repo.create(`repo-${i}`, (err, res) => { if (err) return cb(err) cb(null, res.datastore) }), (err, stores) => { @@ -58,7 +60,7 @@ exports.createMockNet = (repo, count, cb) => { const networks = _.range(count).map((i) => { return { connectTo (id, cb) { - const done = (err) => async.setImmediate(() => cb(err)) + const done = (err) => setImmediate(() => cb(err)) if (!_.includes(hexIds, id.toHexString())) { return done(new Error('unkown peer')) } @@ -132,7 +134,7 @@ exports.genBitswapNetwork = (n, callback) => { }) // start every libp2pNode - async.each(netArray, (net, cb) => { + each(netArray, (net, cb) => { net.libp2p.start(cb) }, (err) => { if (err) { @@ -151,8 +153,8 @@ exports.genBitswapNetwork = (n, callback) => { // connect all the nodes between each other function establishLinks () { - async.eachSeries(netArray, (from, cbI) => { - async.eachSeries(netArray, (to, cbJ) => { + eachSeries(netArray, (from, cbI) => { + eachSeries(netArray, (to, cbJ) => { if (from.peerInfo.id.toB58String() === to.peerInfo.id.toB58String()) { return cbJ()