Skip to content

Commit

Permalink
refactor: use modular async
Browse files Browse the repository at this point in the history
Fixes #14
  • Loading branch information
dignifiedquire authored and daviddias committed Sep 9, 2016
1 parent 4cf7cad commit 5f6c002
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 55 deletions.
8 changes: 4 additions & 4 deletions src/decision/engine.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const debug = require('debug')
const async = require('async')
const eachSeries = require('async/eachSeries')
const mh = require('multihashes')
const pull = require('pull-stream')
const generate = require('pull-generate')
Expand Down Expand Up @@ -115,11 +115,11 @@ module.exports = class Engine {

this._processBlocks(msg.blocks, ledger)
log('wantlist', Array.from(msg.wantlist.values()).map((e) => e.toString()))
async.eachSeries(
eachSeries(
msg.wantlist.values(),
this._processWantlist.bind(this, ledger, peerId),
(err) => {
const done = (err) => async.setImmediate(() => cb(err))
const done = (err) => setImmediate(() => cb(err))
if (err) return done(err)
this._outbox()
done()
Expand Down Expand Up @@ -148,7 +148,7 @@ module.exports = class Engine {
log('cancel %s', mh.toB58String(entry.key))
ledger.cancelWant(entry.key)
this.peerRequestQueue.remove(entry.key, peerId)
async.setImmediate(() => cb())
setImmediate(() => cb())
} else {
log('wants %s - %s', mh.toB58String(entry.key), entry.priority)
ledger.wants(entry.key, entry.priority)
Expand Down
13 changes: 8 additions & 5 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
'use strict'

const async = require('async')
const eachLimit = require('async/eachLimit')
const series = require('async/series')
const retry = require('async/retry')
const parallel = require('async/parallel')
const debug = require('debug')
const log = debug('bitswap')
log.error = debug('bitswap:error')
Expand Down Expand Up @@ -64,8 +67,8 @@ module.exports = class Bitwap {

this.wm.cancelWants(keys)

async.eachLimit(iblocks.values(), 10, (block, next) => {
async.series([
eachLimit(iblocks.values(), 10, (block, next) => {
series([
(innerCb) => this._updateReceiveCounters(block, (err) => {
if (err) {
// ignore, as these have been handled in _updateReceiveCounters
Expand Down Expand Up @@ -106,7 +109,7 @@ module.exports = class Bitwap {

_tryPutBlock (block, times, cb) {
log('trying to put block %s', block.data.toString())
async.retry({times, interval: 400}, (done) => {
retry({times, interval: 400}, (done) => {
this.datastore.put(block, done)
}, cb)
}
Expand Down Expand Up @@ -210,7 +213,7 @@ module.exports = class Bitwap {
addListeners()
this.wm.wantBlocks(keys)

async.parallel(keys.map((key) => (cb) => {
parallel(keys.map((key) => (cb) => {
// We don't want to announce looking for blocks
// when we might have them ourselves.
this.datastore.has(key, (err, exists) => {
Expand Down
5 changes: 2 additions & 3 deletions src/network/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict'

const bl = require('bl')
const async = require('async')
const debug = require('debug')
const lps = require('length-prefixed-stream')

Expand Down Expand Up @@ -87,7 +86,7 @@ module.exports = class Network {
// Connect to the given peer
connectTo (peerId, cb) {
log('connecting to %s', peerId.toB58String())
const done = (err) => async.setImmediate(() => cb(err))
const done = (err) => setImmediate(() => cb(err))
// NOTE: For now, all this does is ensure that we are
// connected. Once we have Peer Routing, we will be able
// to find the Peer
Expand All @@ -102,7 +101,7 @@ module.exports = class Network {
sendMessage (peerId, msg, cb) {
log('sendMessage to %s', peerId.toB58String())
log('msg %s', msg.full, msg.wantlist, msg.blocks)
const done = (err) => async.setImmediate(() => cb(err))
const done = (err) => setImmediate(() => cb(err))
let peerInfo
try {
peerInfo = this.peerBook.getByMultihash(peerId.toBytes())
Expand Down
4 changes: 2 additions & 2 deletions src/wantmanager/msg-queue.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const debug = require('debug')
const async = require('async')
const queue = require('async/queue')

const Message = require('../message')

Expand All @@ -14,7 +14,7 @@ module.exports = class MsgQueue {
this.network = network
this.refcnt = 1

this.queue = async.queue(this.doWork.bind(this), 1)
this.queue = queue(this.doWork.bind(this), 1)
// only start when `run` is called
this.queue.pause()
}
Expand Down
4 changes: 2 additions & 2 deletions test/browser.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const async = require('async')
const eachSeries = require('async/eachSeries')
const store = require('idb-plus-blob-store')
const _ = require('lodash')
const IPFSRepo = require('ipfs-repo')
Expand Down Expand Up @@ -29,7 +29,7 @@ function createRepo (id, done) {

dbs.push(id)

async.eachSeries(repoData, (file, cb) => {
eachSeries(repoData, (file, cb) => {
if (_.startsWith(file.key, 'datastore/')) {
return cb()
}
Expand Down
20 changes: 12 additions & 8 deletions test/decision/engine-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ const expect = require('chai').expect
const PeerId = require('peer-id')
const _ = require('lodash')
const Block = require('ipfs-block')
const async = require('async')
const parallel = require('async/parallel')
const eachLimit = require('async/eachLimit')
const each = require('async/each')
const series = require('async/series')
const eachSeries = require('async/eachSeries')

const Message = require('../../src/message')
const Engine = require('../../src/decision/engine')
Expand All @@ -32,7 +36,7 @@ module.exports = (repo) => {
})

it('consistent accounting', (done) => {
async.parallel([
parallel([
(cb) => newEngine('Ernie', cb),
(cb) => newEngine('Bert', cb)
], (err, res) => {
Expand All @@ -41,7 +45,7 @@ module.exports = (repo) => {
const sender = res[0]
const receiver = res[1]

async.eachLimit(_.range(1000), 100, (i, cb) => {
eachLimit(_.range(1000), 100, (i, cb) => {
const m = new Message(false)
const content = `this is message ${i}`
m.addBlock(new Block(content))
Expand Down Expand Up @@ -80,7 +84,7 @@ module.exports = (repo) => {
})

it('peer is added to peers when message receiver or sent', (done) => {
async.parallel([
parallel([
(cb) => newEngine('sf', cb),
(cb) => newEngine('sea', cb)
], (err, res) => {
Expand Down Expand Up @@ -129,7 +133,7 @@ module.exports = (repo) => {
repo.create('p', (err, repo) => {
expect(err).to.not.exist

async.each(alphabet, (letter, cb) => {
each(alphabet, (letter, cb) => {
const block = new Block(letter)
repo.datastore.put(block, cb)
}, (err) => {
Expand All @@ -155,8 +159,8 @@ module.exports = (repo) => {
e.messageReceived(p, cancels, cb)
}

async.eachSeries(_.range(numRounds), (i, cb) => {
async.eachSeries(testCases, (testcase, innerCb) => {
eachSeries(_.range(numRounds), (i, cb) => {
eachSeries(testCases, (testcase, innerCb) => {
const set = testcase[0]
const cancels = testcase[1]
const keeps = _.difference(set, cancels)
Expand All @@ -178,7 +182,7 @@ module.exports = (repo) => {
const e = new Engine(repo.datastore, network)
e.start()
const partner = PeerId.create({bits: 64})
async.series([
series([
(c) => partnerWants(e, set, partner, c),
(c) => partnerCancels(e, cancels, partner, c)
], (err) => {
Expand Down
23 changes: 13 additions & 10 deletions test/index-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
/* eslint max-nested-callbacks: ["error", 8]*/
'use strict'

const async = require('async')
const map = require('async/map')
const eachSeries = require('async/eachSeries')
const waterfall = require('async/waterfall')
const each = require('async/each')
const _ = require('lodash')
const expect = require('chai').expect
const PeerId = require('peer-id')
Expand Down Expand Up @@ -62,7 +65,7 @@ module.exports = (repo) => {
expect(bs.blocksRecvd).to.be.eql(2)
expect(bs.dupBlocksRecvd).to.be.eql(0)

async.map([b1, b1],
map([b1, b1],
(val, cb) => store.get(val.key, cb),
(err, res) => {
if (err) return done(err)
Expand Down Expand Up @@ -116,7 +119,7 @@ module.exports = (repo) => {
return m
})
let i = 0
async.eachSeries(others, (other, cb) => {
eachSeries(others, (other, cb) => {
const msg = messages[i]
i++
bs._receiveMessage(other, msg, (err) => {
Expand Down Expand Up @@ -166,7 +169,7 @@ module.exports = (repo) => {
const block = makeBlock()

let mockNet
async.waterfall([
waterfall([
(cb) => utils.createMockNet(repo, 2, cb),
(net, cb) => {
mockNet = net
Expand Down Expand Up @@ -218,13 +221,13 @@ module.exports = (repo) => {
if (id.toHexString() !== other.toHexString()) {
err = new Error('unkown peer')
}
async.setImmediate(() => cb(err))
setImmediate(() => cb(err))
},
sendMessage (id, msg, cb) {
if (id.toHexString() === other.toHexString()) {
bs2._receiveMessage(me, msg, cb)
} else {
async.setImmediate(() => cb(new Error('unkown peer')))
setImmediate(() => cb(new Error('unkown peer')))
}
},
start () {},
Expand All @@ -236,13 +239,13 @@ module.exports = (repo) => {
if (id.toHexString() !== me.toHexString()) {
err = new Error('unkown peer')
}
async.setImmediate(() => cb(err))
setImmediate(() => cb(err))
},
sendMessage (id, msg, cb) {
if (id.toHexString() === me.toHexString()) {
bs1._receiveMessage(other, msg, cb)
} else {
async.setImmediate(() => cb(new Error('unkown peer')))
setImmediate(() => cb(new Error('unkown peer')))
}
},
start () {},
Expand All @@ -254,7 +257,7 @@ module.exports = (repo) => {

let store2

async.waterfall([
waterfall([
(cb) => repo.create('world', cb),
(repo, cb) => {
store2 = repo.datastore
Expand Down Expand Up @@ -335,7 +338,7 @@ module.exports = (repo) => {
}

function hasBlocks (msg, store, cb) {
async.each(Array.from(msg.blocks.values()), (b, next) => {
each(Array.from(msg.blocks.values()), (b, next) => {
if (!b.cancel) {
store.has(b.key, next)
} else {
Expand Down
24 changes: 13 additions & 11 deletions test/network/gen-bitswap-network.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

const expect = require('chai').expect
const utils = require('../utils')
const async = require('async')
const series = require('async/series')
const parallel = require('async/parallel')
const each = require('async/each')
const _ = require('lodash')
const Block = require('ipfs-block')
const Buffer = require('safe-buffer').Buffer
Expand All @@ -23,15 +25,15 @@ describe('gen Bitswap network', function () {
return new Block(b)
})

async.series([
series([
(cb) => {
async.parallel(blocks.map((b) => (cb) => {
parallel(blocks.map((b) => (cb) => {
node.bitswap.hasBlock(b, cb)
}), cb)
},
(cb) => {
async.each(_.range(100), (i, cb) => {
async.parallel(blocks.map((b) => (cb) => {
each(_.range(100), (i, cb) => {
parallel(blocks.map((b) => (cb) => {
node.bitswap.getBlock(b.key, (err, res) => {
expect(err).to.not.exist
expect(res).to.be.eql(b)
Expand Down Expand Up @@ -80,17 +82,17 @@ describe('gen Bitswap network', function () {

const d = (new Date()).getTime()

async.parallel(_.map(nodeArr, (node, i) => (callback) => {
parallel(_.map(nodeArr, (node, i) => (callback) => {
node.bitswap.start()
async.parallel([
parallel([
(finish) => {
async.parallel(_.range(blockFactor).map((j) => (cb) => {
parallel(_.range(blockFactor).map((j) => (cb) => {
// console.log('has node:%s block %s', i, i * blockFactor + j)
node.bitswap.hasBlock(blocks[i * blockFactor + j], cb)
}), finish)
},
(finish) => {
async.parallel(_.map(blocks, (b, j) => (cb) => {
parallel(_.map(blocks, (b, j) => (cb) => {
node.bitswap.getBlock(b.key, (err, res) => {
// console.log('node:%s got block: %s', i, j)
expect(err).to.not.exist
Expand All @@ -107,13 +109,13 @@ describe('gen Bitswap network', function () {
})
}

async.series(
series(
_.range(2).map((i) => (cb) => round(i, cb)),
(err) => {
// setTimeout is used to avoid closing the TCP socket while spdy is
// still sending a ton of signalling data
setTimeout(() => {
async.parallel(nodeArr.map((node) => (cb) => {
parallel(nodeArr.map((node) => (cb) => {
node.bitswap.stop()
node.libp2p.stop(cb)
}), (err2) => {
Expand Down
4 changes: 2 additions & 2 deletions test/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const ncp = require('ncp')
const rimraf = require('rimraf')
const fs = require('fs-blob-store')
const testRepoPath = path.join(__dirname, 'test-repo')
const async = require('async')
const each = require('async/each')

// book keeping
const repos = []
Expand All @@ -24,7 +24,7 @@ function createRepo (id, done) {
}

function removeRepos (done) {
async.each(repos, (repo, cb) => {
each(repos, (repo, cb) => {
rimraf(repo, cb)
}, done)
}
Expand Down
Loading

0 comments on commit 5f6c002

Please sign in to comment.