diff --git a/.circleci/config.yml b/.circleci/config.yml new file mode 100644 index 00000000..d1c4d6fd --- /dev/null +++ b/.circleci/config.yml @@ -0,0 +1,31 @@ +version: 2 +jobs: + build: + working_directory: ~/js-ipfs-bitswap + docker: + - image: circleci/node:6-browsers + environment: + CHROME_BIN: "/usr/bin/google-chrome" + steps: + - checkout + - restore_cache: + key: dependency-cache-{{ checksum "package.json" }} + - run: + name: install-deps + command: npm install + - save_cache: + key: dependency-cache-{{ checksum "package.json" }} + paths: + - ./node_modules + - run: + name: lint + command: npm run lint + - run: + name: test:node + command: npm run test:node + - run: + name: test:browser + command: npm run test:browser + - run: + name: coverage + command: npm run coverage diff --git a/.gitignore b/.gitignore index f39de95a..9547fedb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ # While testing new npm package-lock.json +yarn.lock # Logs logs diff --git a/README.md b/README.md index 4add771a..53dc3449 100644 --- a/README.md +++ b/README.md @@ -34,13 +34,7 @@ > npm install ipfs-bitswap ``` -### Use in Node.js - -```js -const Bitswap = require('ipfs-bitswap') -``` - -### Use in a browser with browserify, webpack or any other bundler +### Use in Node.js or in the browser with browserify, webpack or any other bundler ```js const Bitswap = require('ipfs-bitswap') @@ -56,10 +50,6 @@ Loading this module through a script tag will make the `IpfsBitswap` object avai ``` -## Usage - -See https://ipfs.github.io/js-ipfs-bitswap - ## API See https://ipfs.github.io/js-ipfs-bitswap @@ -73,24 +63,22 @@ See https://ipfs.github.io/js-ipfs-bitswap ```sh » tree src src -├── components -│   ├── decision -│   │   ├── engine.js -│   │   ├── index.js -│   │   └── ledger.js -│   ├── network # Handles peerSet and open new conns -│   │   └── index.js -│   └── want-manager # Keeps track of all blocks the peer wants (not the others which it is connected) -│   ├── index.js -│   └── msg-queue.js # Messages to send queue, one per peer ├── constants.js +├── decision-engine +│   ├── index.js +│   └── ledger.js ├── index.js -└── types - ├── message # (Type) message that is put in the wire +├── network.js # Handles peerSet and open new conns +├── notifications.js # Handles tracking of incomning blocks and wants/unwants. +├─── want-manager # Keeps track of all blocks the peer (self) wants +│   ├── index.js +│   └── msg-queue.js # Messages to send queue, one per peer +└─── types + ├── message # (Type) message that is put in the wire │   ├── entry.js │   ├── index.js │   └── message.proto.js - └── wantlist # (Type) track wanted blocks + └── wantlist # (Type) track wanted blocks ├── entry.js └── index.js ``` diff --git a/circle.yml b/circle.yml deleted file mode 100644 index 56f7efbe..00000000 --- a/circle.yml +++ /dev/null @@ -1,14 +0,0 @@ -machine: - node: - version: stable - -dependencies: - pre: - - google-chrome --version - - curl -L -o google-chrome.deb https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb - - sudo dpkg -i google-chrome.deb || true - - sudo apt-get update - - sudo apt-get install -f - - sudo apt-get install --only-upgrade lsb-base - - sudo dpkg -i google-chrome.deb - - google-chrome --version diff --git a/package.json b/package.json index 30fc9f3a..791695b8 100644 --- a/package.json +++ b/package.json @@ -4,21 +4,21 @@ "description": "Node.js implementation of the Bitswap data exchange protocol used by IPFS", "main": "src/index.js", "browser": { - "./test/libp2p-bundle": false + "./test/utils/create-libp2p-node": false, + "./test/utils/create-temp-repo-nodejs.js": "./test/utils/create-temp-repo-browser.js" }, "scripts": { - "test": "aegir-test", - "test:browser": "aegir-test browser", - "test:node": "aegir-test node", - "lint": "aegir-lint", - "release": "aegir-release --docs", - "release-minor": "aegir-release --type minor --docs", - "release-major": "aegir-release --type major --docs", + "test": "aegir test --target node --target browser", + "test:browser": "aegir test --target browser", + "test:node": "aegir test --target node", + "lint": "aegir lint", + "release": "aegir release", + "release-minor": "aegir release --type minor", + "release-major": "aegir release --type major", "bench": "node benchmarks/index", - "build": "aegir-build", - "coverage": "aegir-coverage", - "coverage-publish": "aegir-coverage publish", - "docs": "aegir-docs" + "build": "aegir build", + "coverage": "aegir coverage -u", + "docs": "aegir docs" }, "repository": { "type": "git", @@ -37,12 +37,13 @@ }, "homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme", "devDependencies": { - "aegir": "^11.0.2", + "aegir": "ipfs/aegir", "benchmark": "^2.1.4", - "chai": "^4.1.0", + "chai": "^4.1.1", "dirty-chai": "^2.0.1", "ipfs-repo": "~0.17.0", "libp2p": "^0.11.0", + "libp2p-kad-dht": "^0.4.1", "libp2p-multiplex": "^0.4.4", "libp2p-secio": "^0.7.1", "libp2p-tcp": "^0.10.2", @@ -58,7 +59,7 @@ "dependencies": { "async": "^2.5.0", "cids": "~0.5.1", - "debug": "^2.6.8", + "debug": "^3.0.0", "ipfs-block": "~0.6.0", "lodash.debounce": "^4.0.8", "lodash.find": "^4.6.0", @@ -75,6 +76,7 @@ "pull-length-prefixed": "^1.3.0", "pull-pushable": "^2.1.1", "pull-stream": "^3.6.0", + "safe-buffer": "^5.1.1", "varint-decoder": "^0.1.1" }, "contributors": [ diff --git a/src/components/network/index.js b/src/components/network/index.js deleted file mode 100644 index 64520131..00000000 --- a/src/components/network/index.js +++ /dev/null @@ -1,194 +0,0 @@ -'use strict' - -const debug = require('debug') -const lp = require('pull-length-prefixed') -const pull = require('pull-stream') -const setImmediate = require('async/setImmediate') - -const Message = require('../../types/message') -const CONSTANTS = require('../../constants') -const log = debug('bitswap:network') -log.error = debug('bitswap:network:error') - -const BITSWAP100 = '/ipfs/bitswap/1.0.0' -const BITSWAP110 = '/ipfs/bitswap/1.1.0' - -class Network { - constructor (libp2p, peerBook, bitswap, b100Only) { - this.libp2p = libp2p - this.peerBook = peerBook - this.bitswap = bitswap - this.b100Only = b100Only || false - - // increase event listener max - this._running = false - this.libp2p.swarm.setMaxListeners(CONSTANTS.maxListeners) - } - - start () { - this._running = true - // bind event listeners - this._onPeerConnect = this._onPeerConnect.bind(this) - this._onPeerDisconnect = this._onPeerDisconnect.bind(this) - - this._onConnection = this._onConnection.bind(this) - this.libp2p.handle(BITSWAP100, this._onConnection) - if (!this.b100Only) { - this.libp2p.handle(BITSWAP110, this._onConnection) - } - - this.libp2p.on('peer:connect', this._onPeerConnect) - this.libp2p.on('peer:disconnect', this._onPeerDisconnect) - - // All existing connections are like new ones for us - const pKeys = Object.keys(this.peerBook.getAll()) - pKeys.forEach((k) => this._onPeerConnect(this.peerBook.get(k))) - } - - stop () { - this._running = false - - this.libp2p.unhandle(BITSWAP100) - if (!this.b100Only) { - this.libp2p.unhandle(BITSWAP110) - } - - this.libp2p.removeListener('peer:connect', this._onPeerConnect) - this.libp2p.removeListener('peer:disconnect', this._onPeerDisconnect) - } - - // Handles both types of bitswap messgages - _onConnection (protocol, conn) { - if (!this._running) { - return - } - log('incomming new bitswap connection: %s', protocol) - pull( - conn, - lp.decode(), - pull.asyncMap((data, cb) => Message.deserialize(data, cb)), - pull.asyncMap((msg, cb) => { - conn.getPeerInfo((err, peerInfo) => { - if (err) { - return cb(err) - } - // log('data from', peerInfo.id.toB58String()) - this.bitswap._receiveMessage(peerInfo.id, msg, cb) - }) - }), - pull.onEnd((err) => { - log('ending connection') - if (err) { - return this.bitswap._receiveError(err) - } - }) - ) - } - - _onPeerConnect (peerInfo) { - if (!this._running) { - return - } - this.bitswap._onPeerConnected(peerInfo.id) - } - - _onPeerDisconnect (peerInfo) { - if (!this._running) { - return - } - this.bitswap._onPeerDisconnected(peerInfo.id) - } - - // Connect to the given peer - connectTo (peerId, callback) { - const done = (err) => setImmediate(() => callback(err)) - - if (!this._running) { - return done(new Error('No running network')) - } - - // NOTE: For now, all this does is ensure that we are - // connected. Once we have Peer Routing, we will be able - // to find the Peer - if (this.libp2p.swarm.muxedConns[peerId.toB58String()]) { - done() - } else { - done(new Error('Could not connect to peer with peerId:', peerId.toB58String())) - } - } - - // Send the given msg (instance of Message) to the given peer - sendMessage (peerId, msg, callback) { - if (!this._running) { - return callback(new Error('No running network')) - } - - const stringId = peerId.toB58String() - log('sendMessage to %s', stringId, msg) - let peerInfo - try { - peerInfo = this.peerBook.get(stringId) - } catch (err) { - return callback(err) - } - - this._dialPeer(peerInfo, (err, conn, protocol) => { - if (err) { - return callback(err) - } - - let serialized - switch (protocol) { - case BITSWAP100: - serialized = msg.serializeToBitswap100() - break - case BITSWAP110: - serialized = msg.serializeToBitswap110() - break - default: - return callback(new Error('Unkown protocol: ' + protocol)) - } - writeMessage(conn, serialized, (err) => { - if (err) { - log(err) - } - }) - callback() - }) - } - - _dialPeer (peerInfo, callback) { - // dialByPeerInfo throws if no network is there - try { - // Attempt Bitswap 1.1.0 - this.libp2p.dial(peerInfo, BITSWAP110, (err, conn) => { - if (err) { - // Attempt Bitswap 1.0.0 - this.libp2p.dial(peerInfo, BITSWAP100, (err, conn) => { - if (err) { - return callback(err) - } - - callback(null, conn, BITSWAP100) - }) - return - } - - callback(null, conn, BITSWAP110) - }) - } catch (err) { - return callback(err) - } - } -} - -function writeMessage (conn, msg, callback) { - pull( - pull.values([msg]), - lp.encode(), - conn, - pull.onEnd(callback) - ) -} - -module.exports = Network diff --git a/src/components/decision-engine/index.js b/src/decision-engine/index.js similarity index 79% rename from src/components/decision-engine/index.js rename to src/decision-engine/index.js index de3370d1..ebd60346 100644 --- a/src/components/decision-engine/index.js +++ b/src/decision-engine/index.js @@ -1,9 +1,10 @@ 'use strict' -const debug = require('debug') const each = require('async/each') const eachSeries = require('async/eachSeries') const waterfall = require('async/waterfall') +const setImmediate = require('async/setImmediate') + const map = require('async/map') const debounce = require('lodash.debounce') const uniqWith = require('lodash.uniqwith') @@ -12,17 +13,16 @@ const values = require('lodash.values') const groupBy = require('lodash.groupby') const pullAllWith = require('lodash.pullallwith') -const log = debug('bitswap:engine') -log.error = debug('bitswap:engine:error') - -const Message = require('../../types/message') -const Wantlist = require('../../types/wantlist') +const Message = require('../types/message') +const Wantlist = require('../types/wantlist') const Ledger = require('./ledger') +const logger = require('../utils').logger const MAX_MESSAGE_SIZE = 512 * 1024 class DecisionEngine { - constructor (blockstore, network) { + constructor (peerId, blockstore, network) { + this._log = logger(peerId, 'engine') this.blockstore = blockstore this.network = network @@ -36,28 +36,38 @@ class DecisionEngine { this._outbox = debounce(this._processTasks.bind(this), 100) } - _sendBlocks (env, cb) { + _sendBlocks (peer, blocks, cb) { // split into messges of max 512 * 1024 bytes - const blocks = env.blocks const total = blocks.reduce((acc, b) => { return acc + b.data.byteLength }, 0) if (total < MAX_MESSAGE_SIZE) { - return this._sendSafeBlocks(env.peer, blocks, cb) + return this._sendSafeBlocks(peer, blocks, cb) } let size = 0 let batch = [] + let outstanding = blocks.length eachSeries(blocks, (b, cb) => { + outstanding-- batch.push(b) size += b.data.byteLength - if (size >= MAX_MESSAGE_SIZE) { + if (size >= MAX_MESSAGE_SIZE || + // need to ensure the last remaining items get sent + outstanding === 0) { const nextBatch = batch.slice() batch = [] - this._sendSafeBlocks(env.peer, nextBatch, cb) + this._sendSafeBlocks(peer, nextBatch, (err) => { + if (err) { + this._log('sendblock error: %s', err.message) + } + // not returning the error, so we send as much as we can + // as otherwise `eachSeries` would cancel + cb() + }) } else { cb() } @@ -66,18 +76,9 @@ class DecisionEngine { _sendSafeBlocks (peer, blocks, cb) { const msg = new Message(false) + blocks.forEach((b) => msg.addBlock(b)) - blocks.forEach((b) => { - msg.addBlock(b) - }) - - // console.log('sending %s blocks', msg.blocks.size) - this.network.sendMessage(peer, msg, (err) => { - if (err) { - log('sendblock error: %s', err.message) - } - cb() - }) + this.network.sendMessage(peer, msg, cb) } _processTasks () { @@ -103,23 +104,22 @@ class DecisionEngine { return find(blocks, (b) => b.cid.equals(cid)) }) - this._sendBlocks({ - peer: peer, - blocks: blockList - }, (err) => { + this._sendBlocks(peer, blockList, (err) => { if (err) { - log.error('failed to send', err) + // `_sendBlocks` actually doesn't return any errors + this._log.error('should never happen: ', err) + } else { + blockList.forEach((block) => this.messageSent(peer, block)) } - blockList.forEach((block) => { - this.messageSent(peer, block) - }) + cb() }) }) ], (err) => { this._tasks = [] + if (err) { - log.error(err) + this._log.error(err) } }) } @@ -206,7 +206,7 @@ class DecisionEngine { // If we already have the block, serve it this.blockstore.has(entry.cid, (err, exists) => { if (err) { - log.error('failed existence check') + this._log.error('failed existence check') } else if (exists) { this._tasks.push({ entry: entry.entry, @@ -224,7 +224,7 @@ class DecisionEngine { _processBlocks (blocks, ledger, callback) { const cids = [] blocks.forEach((b, cidStr) => { - log('got block (%s bytes)', b.data.length) + this._log('got block (%s bytes)', b.data.length) ledger.receivedBytes(b.data.length) cids.push(b.cid) }) @@ -271,12 +271,14 @@ class DecisionEngine { return l } - start () { + start (callback) { this._running = true + setImmediate(() => callback()) } - stop () { + stop (callback) { this._running = false + setImmediate(() => callback()) } } diff --git a/src/components/decision-engine/ledger.js b/src/decision-engine/ledger.js similarity index 93% rename from src/components/decision-engine/ledger.js rename to src/decision-engine/ledger.js index a4619078..9a97660c 100644 --- a/src/components/decision-engine/ledger.js +++ b/src/decision-engine/ledger.js @@ -1,6 +1,6 @@ 'use strict' -const Wantlist = require('../../types/wantlist') +const Wantlist = require('../types/wantlist') class Ledger { constructor (peerId) { diff --git a/src/index.js b/src/index.js index de5a5684..2abe7004 100644 --- a/src/index.js +++ b/src/index.js @@ -3,55 +3,58 @@ const waterfall = require('async/waterfall') const reject = require('async/reject') const each = require('async/each') -const EventEmitter = require('events').EventEmitter -const debug = require('debug') +const series = require('async/series') +const map = require('async/map') +const once = require('once') -const CONSTANTS = require('./constants') -const WantManager = require('./components/want-manager') -const Network = require('./components/network') -const DecisionEngine = require('./components/decision-engine') +const WantManager = require('./want-manager') +const Network = require('./network') +const DecisionEngine = require('./decision-engine') +const Notifications = require('./notifications') +const logger = require('./utils').logger -const log = debug('bitswap') -log.error = debug('bitswap:error') - -/** - * - */ class Bitswap { /** * Create a new bitswap instance. * * @param {Libp2p} libp2p * @param {Blockstore} blockstore - * @param {PeerBook} peerBook * @returns {Bitswap} */ - constructor (libp2p, blockstore, peerBook) { - this.libp2p = libp2p + constructor (libp2p, blockstore) { + this._libp2p = libp2p + this._log = logger(this.peerInfo.id) + // the network delivers messages - this.network = new Network(libp2p, peerBook, this) + this.network = new Network(libp2p, this) // local database this.blockstore = blockstore - this.engine = new DecisionEngine(blockstore, this.network) + this.engine = new DecisionEngine(this.peerInfo.id, blockstore, this.network) // handle message sending - this.wm = new WantManager(this.network) + this.wm = new WantManager(this.peerInfo.id, this.network) this.blocksRecvd = 0 this.dupBlocksRecvd = 0 this.dupDataRecvd = 0 - this.notifications = new EventEmitter() - this.notifications.setMaxListeners(CONSTANTS.maxListeners) + this.notifications = new Notifications(this.peerInfo.id) + } + + get peerInfo () { + return this._libp2p.peerInfo } // handle messages received through the network _receiveMessage (peerId, incoming, callback) { this.engine.messageReceived(peerId, incoming, (err) => { if (err) { - log('failed to receive message', incoming) + // Only logging the issue to process as much as possible + // of the message. Currently `messageReceived` does not + // return any errors, but this could change in the future. + this._log('failed to receive message', incoming) } if (incoming.blocks.size === 0) { @@ -76,7 +79,7 @@ class Bitswap { } _handleReceivedBlock (peerId, block, callback) { - log('received block') + this._log('received block') waterfall([ (cb) => this.blockstore.has(block.cid, cb), @@ -95,14 +98,14 @@ class Bitswap { this.blocksRecvd++ if (exists) { - this.dupBlocksRecvd ++ + this.dupBlocksRecvd++ this.dupDataRecvd += block.data.length } } // handle errors on the receiving channel _receiveError (err) { - log.error('ReceiveError: %s', err.message) + this._log.error('ReceiveError: %s', err.message) } // handle new peers @@ -122,10 +125,13 @@ class Bitswap { return callback(err) } - this.notifications.emit( - `block:${block.cid.buffer.toString()}`, - block - ) + this.notifications.hasBlock(block) + this.network.provide(block.cid, (err) => { + if (err) { + this._log.error('Failed to provide: %s', err.message) + } + }) + this.engine.receivedBlocks([block.cid]) callback() }) @@ -150,55 +156,100 @@ class Bitswap { * @returns {void} */ get (cid, callback) { - const unwantListeners = {} - const blockListeners = {} - const cidStr = cid.buffer.toString() - const unwantEvent = `unwant:${cidStr}` - const blockEvent = `block:${cidStr}` - - log('get: %s', cidStr) - const cleanupListener = () => { - if (unwantListeners[cidStr]) { - this.notifications.removeListener(unwantEvent, unwantListeners[cidStr]) - delete unwantListeners[cidStr] - } - - if (blockListeners[cidStr]) { - this.notifications.removeListener(blockEvent, blockListeners[cidStr]) - delete blockListeners[cidStr] + this.getMany([cid], (err, blocks) => { + if (err) { + return callback(err) } - } - const addListener = () => { - unwantListeners[cidStr] = () => { - log(`manual unwant: ${cidStr}`) - cleanupListener() - this.wm.cancelWants([cid]) + if (blocks && blocks.length > 0) { + callback(null, blocks[0]) + } else { + // when a unwant happens callback() } + }) + } - blockListeners[cidStr] = (block) => { - this.wm.cancelWants([cid]) - cleanupListener(cid) - callback(null, block) - } + /** + * Fetch a a list of blocks by cid. If the blocks are in the local + * blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us. + * + * @param {Array} cids + * @param {function(Error, Blocks)} callback + * @returns {void} + */ + getMany (cids, callback) { + const retrieved = [] + const locals = [] + const missing = [] + const canceled = [] + + const finish = once(() => { + map(locals, (cid, cb) => { + this.blockstore.get(cid, cb) + }, (err, localBlocks) => { + if (err) { + return callback(err) + } + + callback(null, localBlocks.concat(retrieved)) + }) + }) - this.notifications.once(unwantEvent, unwantListeners[cidStr]) - this.notifications.once(blockEvent, blockListeners[cidStr]) + this._log('getMany', cids.length) + + const addListeners = (cids) => { + cids.forEach((cid) => { + this.notifications.wantBlock( + cid, + // called on block receive + (block) => { + this.wm.cancelWants([cid]) + retrieved.push(block) + + if (retrieved.length === missing.length) { + finish() + } + }, + // called on unwant + () => { + this.wm.cancelWants([cid]) + canceled.push(cid) + if (canceled.length + retrieved.length === missing.length) { + finish() + } + } + ) + }) } - this.blockstore.has(cid, (err, has) => { - if (err) { - return callback(err) - } + each(cids, (cid, cb) => { + this.blockstore.has(cid, (err, has) => { + if (err) { + return cb(err) + } - if (has) { - log('already have block: %s', cidStr) - return this.blockstore.get(cid, callback) + if (has) { + locals.push(cid) + } else { + missing.push(cid) + } + cb() + }) + }, () => { + if (missing.length === 0) { + // already finished + finish() } - addListener() - this.wm.wantBlocks([cid]) + addListeners(missing) + this.wm.wantBlocks(missing) + + this.network.findAndConnect(cids[0], (err) => { + if (err) { + this._log.error(err) + } + }) }) } @@ -209,9 +260,7 @@ class Bitswap { } this.wm.unwantBlocks(cids) - cids.forEach((cid) => { - this.notifications.emit(`unwant:${cid.buffer.toString()}`) - }) + cids.forEach((cid) => this.notifications.unwantBlock(cid)) } // removes the given keys from the want list @@ -231,7 +280,7 @@ class Bitswap { * @returns {void} */ put (block, callback) { - log('putting block') + this._log('putting block') waterfall([ (cb) => this.blockstore.has(block.cid, cb), @@ -264,11 +313,13 @@ class Bitswap { } newBlocks.forEach((block) => { - this.notifications.emit( - `block:${block.cid.buffer.toString()}`, - block - ) + this.notifications.hasBlock(block) this.engine.receivedBlocks([block.cid]) + this.network.provide(block.cid, (err) => { + if (err) { + this._log.error('Failed to provide: %s', err.message) + } + }) }) cb() }) @@ -302,23 +353,31 @@ class Bitswap { /** * Start the bitswap node. * + * @param {function(Error)} callback + * * @returns {void} */ - start () { - this.wm.run() - this.network.start() - this.engine.start() + start (callback) { + series([ + (cb) => this.wm.start(cb), + (cb) => this.network.start(cb), + (cb) => this.engine.start(cb) + ], callback) } /** - * Stooop the bitswap node. + * Stop the bitswap node. + * + * @param {function(Error)} callback * * @returns {void} */ - stop () { - this.wm.stop(this.libp2p.peerInfo.id) - this.network.stop() - this.engine.stop() + stop (callback) { + series([ + (cb) => this.wm.stop(cb), + (cb) => this.network.stop(cb), + (cb) => this.engine.stop(cb) + ], callback) } } diff --git a/src/network.js b/src/network.js new file mode 100644 index 00000000..9ac85590 --- /dev/null +++ b/src/network.js @@ -0,0 +1,190 @@ +'use strict' + +const lp = require('pull-length-prefixed') +const pull = require('pull-stream') +const waterfall = require('async/waterfall') +const each = require('async/each') +const setImmediate = require('async/setImmediate') + +const Message = require('./types/message') +const CONSTANTS = require('./constants') +const logger = require('./utils').logger + +const BITSWAP100 = '/ipfs/bitswap/1.0.0' +const BITSWAP110 = '/ipfs/bitswap/1.1.0' + +class Network { + constructor (libp2p, bitswap, options) { + this._log = logger(libp2p.peerInfo.id, 'network') + options = options || {} + this.libp2p = libp2p + this.bitswap = bitswap + this.b100Only = options.b100Only || false + + this._running = false + } + + start (callback) { + this._running = true + // bind event listeners + this._onPeerConnect = this._onPeerConnect.bind(this) + this._onPeerDisconnect = this._onPeerDisconnect.bind(this) + + this._onConnection = this._onConnection.bind(this) + this.libp2p.handle(BITSWAP100, this._onConnection) + if (!this.b100Only) { this.libp2p.handle(BITSWAP110, this._onConnection) } + + this.libp2p.on('peer:connect', this._onPeerConnect) + this.libp2p.on('peer:disconnect', this._onPeerDisconnect) + + // All existing connections are like new ones for us + this.libp2p.peerBook + .getAllArray() + .filter((peer) => peer.isConnected()) + .forEach((peer) => this._onPeerConnect((peer))) + + setImmediate(() => callback()) + } + + stop (callback) { + this._running = false + + this.libp2p.unhandle(BITSWAP100) + if (!this.b100Only) { this.libp2p.unhandle(BITSWAP110) } + + this.libp2p.removeListener('peer:connect', this._onPeerConnect) + this.libp2p.removeListener('peer:disconnect', this._onPeerDisconnect) + + setImmediate(() => callback()) + } + + // Handles both types of bitswap messgages + _onConnection (protocol, conn) { + if (!this._running) { return } + this._log('incomming new bitswap connection: %s', protocol) + + pull( + conn, + lp.decode(), + pull.asyncMap((data, cb) => Message.deserialize(data, cb)), + pull.asyncMap((msg, cb) => { + conn.getPeerInfo((err, peerInfo) => { + if (err) { return cb(err) } + + // this._log('data from', peerInfo.id.toB58String()) + this.bitswap._receiveMessage(peerInfo.id, msg, cb) + }) + }), + pull.onEnd((err) => { + this._log('ending connection') + if (err) { + this.bitswap._receiveError(err) + } + }) + ) + } + + _onPeerConnect (peerInfo) { + if (!this._running) { return } + + this.bitswap._onPeerConnected(peerInfo.id) + } + + _onPeerDisconnect (peerInfo) { + if (!this._running) { return } + + this.bitswap._onPeerDisconnected(peerInfo.id) + } + + findProviders (cid, maxProviders, callback) { + // TODO + // consider if we want to trickleDown maxProviders, currently this is + // not an exposed option: + // https://github.com/libp2p/js-libp2p-kad-dht/blob/master/src/index.js#L416 + this.libp2p.contentRouting.findProviders(cid, CONSTANTS.providerRequestTimeout, callback) + } + + findAndConnect (cid, callback) { + waterfall([ + (cb) => this.findProviders(cid, CONSTANTS.maxProvidersPerRequest, cb), + (provs, cb) => { + this._log('connecting to providers', provs.map((p) => p.id.toB58String())) + each(provs, (p, cb) => this.connectTo(p, cb)) + } + ], callback) + } + + provide (cid, callback) { + this.libp2p.contentRouting.provide(cid, callback) + } + + // Connect to the given peer + // Send the given msg (instance of Message) to the given peer + sendMessage (peer, msg, callback) { + if (!this._running) { return callback(new Error(`network isn't running`)) } + + const stringId = peer.toB58String() ? peer.toB58String() : peer.id.toB58String() + this._log('sendMessage to %s', stringId, msg) + + this._dialPeer(peer, (err, conn, protocol) => { + if (err) { + return callback(err) + } + + let serialized + switch (protocol) { + case BITSWAP100: + serialized = msg.serializeToBitswap100() + break + case BITSWAP110: + serialized = msg.serializeToBitswap110() + break + default: + return callback(new Error('Unkown protocol: ' + protocol)) + } + // TODO: why doesn't the error get propageted back?? + writeMessage(conn, serialized, (err) => { + if (err) { + this._log.error(err) + } + }) + callback() + }) + } + + connectTo (peer, callback) { + if (!this._running) { return callback(new Error(`network isn't running`)) } + + this.libp2p.dial(peer, callback) + } + + // Dial to the peer and try to use the most recent Bitswap + _dialPeer (peer, callback) { + // Attempt Bitswap 1.1.0 + this.libp2p.dial(peer, BITSWAP110, (err, conn) => { + if (err) { + // Attempt Bitswap 1.0.0 + this.libp2p.dial(peer, BITSWAP100, (err, conn) => { + if (err) { return callback(err) } + + callback(null, conn, BITSWAP100) + }) + + return + } + + callback(null, conn, BITSWAP110) + }) + } +} + +function writeMessage (conn, msg, callback) { + pull( + pull.values([msg]), + lp.encode(), + conn, + pull.onEnd(callback) + ) +} + +module.exports = Network diff --git a/src/notifications.js b/src/notifications.js new file mode 100644 index 00000000..2e8f515a --- /dev/null +++ b/src/notifications.js @@ -0,0 +1,111 @@ +'use strict' + +const EventEmitter = require('events').EventEmitter + +const CONSTANTS = require('./constants') +const logger = require('./utils').logger + +const unwantEvent = (c) => `unwant:${c}` +const blockEvent = (c) => `block:${c}` + +/** + * Internal module used to track events about incoming blocks, + * wants and unwants. + */ +class Notifications extends EventEmitter { + constructor (peerId) { + super() + + this.setMaxListeners(CONSTANTS.maxListeners) + + this._log = logger(peerId, 'notif') + + this._unwantListeners = {} + this._blockListeners = {} + } + + /** + * Signal the system that we received `block`. + * + * @param {Block} block + * @return {void} + */ + hasBlock (block) { + const str = `block:${block.cid.buffer.toString()}` + this._log(str) + this.emit(str, block) + } + + /** + * Signal the system that we are waiting to receive the + * block associated with the given `cid`. + * + * @param {CID} cid + * @param {function(Block)} onBlock - called when the block is received + * @param {function()} onUnwant - called when the block is unwanted + * @returns {void} + */ + wantBlock (cid, onBlock, onUnwant) { + const cidStr = cid.buffer.toString() + this._log(`wantBlock:${cidStr}`) + + this._unwantListeners[cidStr] = () => { + this._log(`manual unwant: ${cidStr}`) + this._cleanup(cidStr) + onUnwant() + } + + this._blockListeners[cidStr] = (block) => { + this._cleanup(cidStr) + onBlock(block) + } + + this.once( + unwantEvent(cidStr), + this._unwantListeners[cidStr] + ) + this.once( + blockEvent(cidStr), + this._blockListeners[cidStr] + ) + } + + /** + * Signal that the block is not wanted anymore. + * + * @param {CID} cid - the CID of the block that is not wanted anymore. + * @returns {void} + */ + unwantBlock (cid) { + const str = `unwant:${cid.buffer.toString()}` + this._log(str) + this.emit(str) + } + + /** + * Internal method to clean up once a block was received or unwanted. + * + * @private + * @param {string} cidStr + * @returns {void} + */ + _cleanup (cidStr) { + if (this._unwantListeners[cidStr]) { + this.removeListener( + unwantEvent(cidStr), + this._unwantListeners[cidStr] + ) + delete this._unwantListeners[cidStr] + } + + if (this._blockListeners[cidStr]) { + this.removeListener( + blockEvent(cidStr), + this._blockListeners[cidStr] + ) + delete this._blockListeners[cidStr] + } + } +} + +module.exports = Notifications diff --git a/src/types/message/index.js b/src/types/message/index.js index d7b6bd5c..88e118ff 100644 --- a/src/types/message/index.js +++ b/src/types/message/index.js @@ -120,7 +120,7 @@ class BitswapMessage { if (this.full !== other.full || !isEqualWith(this.wantlist, other.wantlist, cmp) || !isEqualWith(this.blocks, other.blocks, cmp) - ) { + ) { return false } diff --git a/src/types/wantlist/index.js b/src/types/wantlist/index.js index edb6d436..a269375d 100644 --- a/src/types/wantlist/index.js +++ b/src/types/wantlist/index.js @@ -48,6 +48,10 @@ class Wantlist { } } + forEach (fn) { + return this.set.forEach(fn) + } + entries () { return this.set.entries() } diff --git a/src/utils.js b/src/utils.js new file mode 100644 index 00000000..017f5c77 --- /dev/null +++ b/src/utils.js @@ -0,0 +1,26 @@ +'use strict' + +const debug = require('debug') + +/** + * Creates a logger for the given subsystem + * + * @param {PeerId} [id] + * @param {string} [subsystem] + * @returns {debug} + * + * @private + */ +exports.logger = (id, subsystem) => { + const name = ['bitswap'] + if (subsystem) { + name.push(subsystem) + } + if (id) { + name.push(`${id.toB58String().slice(0, 8)}`) + } + const logger = debug(name.join(':')) + logger.error = debug(name.concat(['error']).join(':')) + + return logger +} diff --git a/src/components/want-manager/index.js b/src/want-manager/index.js similarity index 68% rename from src/components/want-manager/index.js rename to src/want-manager/index.js index c963a819..db4c2c12 100644 --- a/src/components/want-manager/index.js +++ b/src/want-manager/index.js @@ -1,21 +1,22 @@ 'use strict' -const debug = require('debug') +const setImmediate = require('async/setImmediate') -const Message = require('../../types/message') -const Wantlist = require('../../types/wantlist') -const CONSTANTS = require('../../constants') +const Message = require('../types/message') +const Wantlist = require('../types/wantlist') +const CONSTANTS = require('../constants') const MsgQueue = require('./msg-queue') - -const log = debug('bitswap:wantmanager') -log.error = debug('bitswap:wantmanager:error') +const logger = require('../utils').logger module.exports = class WantManager { - constructor (network) { + constructor (peerId, network) { this.peers = new Map() this.wantlist = new Wantlist() this.network = network + + this._peerId = peerId + this._log = logger(peerId, 'want') } _addEntries (cids, cancel, force) { @@ -32,7 +33,7 @@ module.exports = class WantManager { this.wantlist.remove(e.cid) } } else { - log('adding to wl') + this._log('adding to wl') this.wantlist.add(e.cid, e.priority) } }) @@ -47,11 +48,11 @@ module.exports = class WantManager { let mq = this.peers.get(peerId.toB58String()) if (mq) { - mq.refcnt ++ + mq.refcnt++ return } - mq = new MsgQueue(peerId, this.network) + mq = new MsgQueue(this._peerId, peerId, this.network) // new peer, give them the full wantlist const fullwantlist = new Message(true) @@ -73,7 +74,7 @@ module.exports = class WantManager { return } - mq.refcnt -- + mq.refcnt-- if (mq.refcnt > 0) { return } @@ -88,13 +89,13 @@ module.exports = class WantManager { // remove blocks of all the given keys without respecting refcounts unwantBlocks (cids) { - log('unwant blocks: %s', cids.length) + this._log('unwant blocks: %s', cids.length) this._addEntries(cids, true, true) } // cancel wanting all of the given keys cancelWants (cids) { - log('cancel wants: %s', cids.length) + this._log('cancel wants: %s', cids.length) this._addEntries(cids, true) } @@ -111,24 +112,25 @@ module.exports = class WantManager { this._stopPeerHandler(peerId) } - run () { + start (callback) { + // resend entire wantlist every so often this.timer = setInterval(() => { - // resend entirew wantlist every so often + this._log('resend full-wantlist') const fullwantlist = new Message(true) - for (let entry of this.wantlist.entries()) { - fullwantlist.addEntry(entry[1].cid, entry[1].priority) - } - - this.peers.forEach((p) => { - p.addMessage(fullwantlist) + this.wantlist.forEach((entry) => { + fullwantlist.addEntry(entry.cid, entry.priority) }) - }, 10 * 1000) + + this.peers.forEach((p) => p.addMessage(fullwantlist)) + }, 60 * 1000) + + setImmediate(() => callback()) } - stop () { - for (let mq of this.peers.values()) { - this.disconnected(mq.peerId) - } + stop (callback) { + this.peers.forEach((mq) => this.disconnected(mq.peerId)) + clearInterval(this.timer) + setImmediate(() => callback()) } } diff --git a/src/components/want-manager/msg-queue.js b/src/want-manager/msg-queue.js similarity index 68% rename from src/components/want-manager/msg-queue.js rename to src/want-manager/msg-queue.js index f9f25652..9158741b 100644 --- a/src/components/want-manager/msg-queue.js +++ b/src/want-manager/msg-queue.js @@ -1,19 +1,18 @@ 'use strict' -const debug = require('debug') const debounce = require('lodash.debounce') -const Message = require('../../types/message') -const log = debug('bitswap:wantmanager:queue') -log.error = debug('bitswap:wantmanager:queue:error') +const Message = require('../types/message') +const logger = require('../utils').logger module.exports = class MsgQueue { - constructor (peerId, network) { - this.peerId = peerId + constructor (selfPeerId, otherPeerId, network) { + this.peerId = otherPeerId this.network = network this.refcnt = 1 this._entries = [] + this._log = logger(selfPeerId, 'msgqueue', otherPeerId.toB58String().slice(0, 8)) this.sendEntries = debounce(this._sendEntries.bind(this), 200) } @@ -50,13 +49,14 @@ module.exports = class MsgQueue { send (msg) { this.network.connectTo(this.peerId, (err) => { if (err) { - log.error('cant connect to peer %s: %s', this.peerId.toB58String(), err.message) + this._log.error('cant connect to peer %s: %s', this.peerId.toB58String(), err.message) return } - log('sending message') + + this._log('sending message') this.network.sendMessage(this.peerId, msg, (err) => { if (err) { - log.error('send error: %s', err.message) + this._log.error('send error: %s', err.message) } }) }) diff --git a/test/bitswap-mock-internals.js b/test/bitswap-mock-internals.js new file mode 100644 index 00000000..47e03fa6 --- /dev/null +++ b/test/bitswap-mock-internals.js @@ -0,0 +1,366 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 8] */ +'use strict' + +const eachSeries = require('async/eachSeries') +const waterfall = require('async/waterfall') +const map = require('async/map') +const parallel = require('async/parallel') +const setImmediate = require('async/setImmediate') +const _ = require('lodash') +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const PeerId = require('peer-id') + +const Message = require('../src/types/message') +const Bitswap = require('../src') + +const createTempRepo = require('./utils/create-temp-repo-nodejs') +const mockNetwork = require('./utils/mocks').mockNetwork +const applyNetwork = require('./utils/mocks').applyNetwork +const mockLibp2pNode = require('./utils/mocks').mockLibp2pNode +const storeHasBlocks = require('./utils/store-has-blocks') +const makeBlock = require('./utils/make-block') +const orderedFinish = require('./utils/helpers').orderedFinish + +describe('bitswap with mocks', () => { + let repo + let blocks + let ids + + before((done) => { + parallel([ + (cb) => createTempRepo(cb), + (cb) => map(_.range(15), (i, cb) => makeBlock(cb), cb), + (cb) => map(_.range(2), (i, cb) => PeerId.create({bits: 1024}, cb), cb) + ], (err, results) => { + if (err) { + return done(err) + } + + repo = results[0] + blocks = results[1] + ids = results[2] + + done() + }) + }) + + after((done) => repo.teardown(done)) + + describe('receive message', () => { + it('simple block message', (done) => { + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + bs.start((err) => { + expect(err).to.not.exist() + + const other = ids[1] + + const b1 = blocks[0] + const b2 = blocks[1] + + const msg = new Message(false) + msg.addBlock(b1) + msg.addBlock(b2) + + bs._receiveMessage(other, msg, (err) => { + expect(err).to.not.exist() + expect(bs.blocksRecvd).to.equal(2) + expect(bs.dupBlocksRecvd).to.equal(0) + + map([b1.cid, b2.cid], (cid, cb) => repo.blocks.get(cid, cb), (err, blocks) => { + expect(err).to.not.exist() + + expect(blocks[0].data).to.eql(b1.data) + expect(blocks[1].data).to.eql(b2.data) + done() + }) + }) + }) + }) + + it('simple want message', (done) => { + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + bs.start((err) => { + expect(err).to.not.exist() + const other = ids[1] + const b1 = blocks[0] + const b2 = blocks[1] + + const msg = new Message(false) + + msg.addEntry(b1.cid, 1, false) + msg.addEntry(b2.cid, 1, false) + + bs._receiveMessage(other, msg, (err) => { + expect(err).to.not.exist() + + expect(bs.blocksRecvd).to.be.eql(0) + expect(bs.dupBlocksRecvd).to.be.eql(0) + + const wl = bs.wantlistForPeer(other) + + expect(wl.has(b1.cid.buffer.toString())).to.eql(true) + expect(wl.has(b2.cid.buffer.toString())).to.eql(true) + + done() + }) + }) + }) + + it('multi peer', function (done) { + this.timeout(40 * 1000) + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + + let others + let blocks + + bs.start((err) => { + expect(err).to.not.exist() + + parallel([ + (cb) => map(_.range(5), (i, cb) => PeerId.create(cb), cb), + (cb) => map(_.range(10), (i, cb) => makeBlock(cb), cb) + ], (err, results) => { + expect(err).to.not.exist() + + others = results[0] + blocks = results[1] + test() + }) + + function test () { + map(_.range(5), (i, cb) => { + const msg = new Message(false) + msg.addBlock(blocks[i]) + msg.addBlock(blocks[5 + 1]) + cb(null, msg) + }, (err, messages) => { + expect(err).to.not.exist() + let i = 0 + eachSeries(others, (other, cb) => { + const msg = messages[i] + i++ + bs._receiveMessage(other, msg, (err) => { + expect(err).to.not.exist() + storeHasBlocks(msg, repo.blocks, cb) + }) + }, done) + }) + } + }) + }) + }) + + describe('get', () => { + it('block exists locally', (done) => { + const block = blocks[4] + + repo.blocks.put(block, (err) => { + expect(err).to.not.exist() + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + + bs.get(block.cid, (err, res) => { + expect(err).to.not.exist() + expect(res).to.eql(block) + done() + }) + }) + }) + + it('blocks exist locally', (done) => { + const b1 = blocks[3] + const b2 = blocks[14] + const b3 = blocks[13] + + repo.blocks.putMany([b1, b2, b3], (err) => { + expect(err).to.not.exist() + + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + + bs.getMany([b1.cid, b2.cid, b3.cid], (err, res) => { + expect(err).to.not.exist() + expect(res).to.be.eql([b1, b2, b3]) + done() + }) + }) + }) + + it('getMany', (done) => { + const b1 = blocks[5] + const b2 = blocks[6] + const b3 = blocks[7] + + repo.blocks.putMany([b1, b2, b3], (err) => { + expect(err).to.not.exist() + + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + + map([b1.cid, b2.cid, b3.cid], (cid, cb) => bs.get(cid, cb), (err, res) => { + expect(err).to.not.exist() + expect(res).to.eql([b1, b2, b3]) + done() + }) + }) + }) + + it('block is added locally afterwards', (done) => { + const finish = orderedFinish(2, done) + const block = blocks[9] + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + const net = mockNetwork() + + bs.network = net + bs.wm.network = net + bs.engine.network = net + bs.start((err) => { + expect(err).to.not.exist() + bs.get(block.cid, (err, res) => { + expect(err).to.not.exist() + expect(res).to.eql(block) + finish(2) + }) + + setTimeout(() => { + finish(1) + bs.put(block, () => {}) + }, 200) + }) + }) + + it('block is sent after local add', (done) => { + const me = ids[0] + const other = ids[1] + const block = blocks[10] + let bs1 + let bs2 + + const n1 = { + connectTo (id, cb) { + let err + if (id.toHexString() !== other.toHexString()) { + err = new Error('unknown peer') + } + setImmediate(() => cb(err)) + }, + sendMessage (id, msg, cb) { + if (id.toHexString() === other.toHexString()) { + bs2._receiveMessage(me, msg, cb) + } else { + setImmediate(() => cb(new Error('unkown peer'))) + } + }, + start (callback) { + setImmediate(() => callback()) + }, + stop (callback) { + setImmediate(() => callback()) + }, + findAndConnect (cid, callback) { + setImmediate(() => callback()) + }, + provide (cid, callback) { + setImmediate(() => callback()) + } + } + const n2 = { + connectTo (id, cb) { + let err + if (id.toHexString() !== me.toHexString()) { + err = new Error('unkown peer') + } + setImmediate(() => cb(err)) + }, + sendMessage (id, msg, cb) { + if (id.toHexString() === me.toHexString()) { + bs1._receiveMessage(other, msg, cb) + } else { + setImmediate(() => cb(new Error('unkown peer'))) + } + }, + start (callback) { + setImmediate(() => callback()) + }, + stop (callback) { + setImmediate(() => callback()) + }, + findAndConnect (cid, callback) { + setImmediate(() => callback()) + }, + provide (cid, callback) { + setImmediate(() => callback()) + } + } + bs1 = new Bitswap(mockLibp2pNode(), repo.blocks) + applyNetwork(bs1, n1) + + bs1.start((err) => { + expect(err).to.not.exist() + + let repo2 + + waterfall([ + (cb) => createTempRepo(cb), + (repo, cb) => { + repo2 = repo + bs2 = new Bitswap(mockLibp2pNode(), repo2.blocks) + applyNetwork(bs2, n2) + bs2.start((err) => { + expect(err).to.not.exist() + + bs1._onPeerConnected(other) + bs2._onPeerConnected(me) + + bs1.get(block.cid, (err, res) => { + expect(err).to.not.exist() + cb(null, res) + }) + setTimeout(() => bs2.put(block, () => {}), 1000) + }) + }, + (res, cb) => { + expect(res).to.eql(block) + cb() + } + ], done) + }) + }) + }) + + describe('stat', () => { + it('has initial stats', () => { + const bs = new Bitswap(mockLibp2pNode(), {}) + + const stats = bs.stat() + expect(stats).to.have.property('wantlist') + expect(stats).to.have.property('blocksReceived', 0) + expect(stats).to.have.property('dupBlksReceived', 0) + expect(stats).to.have.property('dupDataReceived', 0) + expect(stats).to.have.property('peers') + }) + }) + + describe('unwant', () => { + it('removes blocks that are wanted multiple times', (done) => { + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + bs.start((err) => { + expect(err).to.not.exist() + const b = blocks[11] + + let counter = 0 + const check = (err, res) => { + expect(err).to.not.exist() + expect(res).to.not.exist() + + if (++counter === 2) { done() } + } + + bs.get(b.cid, check) + bs.get(b.cid, check) + + setTimeout(() => bs.unwant(b.cid), 10) + }) + }) + }) +}) diff --git a/test/bitswap.js b/test/bitswap.js new file mode 100644 index 00000000..0ea43d7b --- /dev/null +++ b/test/bitswap.js @@ -0,0 +1,148 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 8] */ +'use strict' + +const waterfall = require('async/waterfall') +const series = require('async/series') +const each = require('async/each') +const parallel = require('async/parallel') + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect + +const Bitswap = require('../src') + +const createTempRepo = require('./utils/create-temp-repo-nodejs') +const createLibp2pNode = require('./utils/create-libp2p-node') +const makeBlock = require('./utils/make-block') +const orderedFinish = require('./utils/helpers').orderedFinish + +// Creates a repo + libp2pNode + Bitswap with or without DHT +function createThing (dht, callback) { + waterfall([ + (cb) => createTempRepo(cb), + (repo, cb) => { + createLibp2pNode({ + DHT: dht ? repo.datastore : undefined + }, (err, node) => cb(err, repo, node)) + }, + (repo, libp2pNode, cb) => { + const bitswap = new Bitswap(libp2pNode, repo.blocks) + bitswap.start((err) => cb(err, repo, libp2pNode, bitswap)) + } + ], (err, repo, libp2pNode, bitswap) => { + expect(err).to.not.exist() + + callback(null, { + repo: repo, + libp2pNode: libp2pNode, + bitswap: bitswap + }) + }) +} + +describe('bitswap without DHT', function () { + this.timeout(20 * 1000) + + let nodes + + before((done) => { + parallel([ + (cb) => createThing(false, cb), + (cb) => createThing(false, cb), + (cb) => createThing(false, cb) + ], (err, results) => { + expect(err).to.not.exist() + expect(results).to.have.length(3) + nodes = results + done() + }) + }) + + after((done) => { + each(nodes, (node, cb) => { + series([ + (cb) => node.bitswap.stop(cb), + (cb) => node.libp2pNode.stop(cb), + (cb) => node.repo.teardown(cb) + ], cb) + }, done) + }) + + it('connect 0 -> 1 && 1 -> 2', (done) => { + parallel([ + (cb) => nodes[0].libp2pNode.dial(nodes[1].libp2pNode.peerInfo, cb), + (cb) => nodes[1].libp2pNode.dial(nodes[2].libp2pNode.peerInfo, cb) + ], done) + }) + + it('put a block in 2, fail to get it in 0', (done) => { + const finish = orderedFinish(2, done) + + waterfall([ + (cb) => makeBlock(cb), + (block, cb) => nodes[2].bitswap.put(block, () => cb(null, block)) + ], (err, block) => { + expect(err).to.not.exist() + nodes[0].bitswap.get(block.cid, (err, block) => { + expect(err).to.not.exist() + expect(block).to.not.exist() + finish(2) + }) + + setTimeout(() => { + finish(1) + nodes[0].bitswap.unwant(block.cid) + }, 200) + }) + }) +}) + +describe('bitswap with DHT', () => { + let nodes + + before((done) => { + parallel([ + (cb) => createThing(true, cb), + (cb) => createThing(true, cb), + (cb) => createThing(true, cb) + ], (err, results) => { + expect(err).to.not.exist() + expect(results).to.have.length(3) + nodes = results + done() + }) + }) + + after((done) => { + each(nodes, (node, cb) => { + series([ + (cb) => node.bitswap.stop(cb), + (cb) => node.libp2pNode.stop(cb), + (cb) => node.repo.teardown(cb) + ], cb) + }, done) + }) + + it('connect 0 -> 1 && 1 -> 2', (done) => { + parallel([ + (cb) => nodes[0].libp2pNode.dial(nodes[1].libp2pNode.peerInfo, cb), + (cb) => nodes[1].libp2pNode.dial(nodes[2].libp2pNode.peerInfo, cb) + ], done) + }) + + it('put a block in 2, get it in 0', (done) => { + waterfall([ + (cb) => makeBlock(cb), + (block, cb) => nodes[2].bitswap.put(block, () => cb(null, block)), + (block, cb) => setTimeout(() => cb(null, block), 400), + (block, cb) => nodes[0].bitswap.get(block.cid, (err, blockRetrieved) => { + expect(err).to.not.exist() + expect(block.data).to.eql(blockRetrieved.data) + expect(block.cid).to.eql(blockRetrieved.cid) + cb() + }) + ], done) + }) +}) diff --git a/test/browser.js b/test/browser.js index 67b47206..37f0d49c 100644 --- a/test/browser.js +++ b/test/browser.js @@ -1,45 +1,4 @@ -/* global self */ 'use strict' -const IPFSRepo = require('ipfs-repo') -const series = require('async/series') - -const idb = self.indexedDB || - self.mozIndexedDB || - self.webkitIndexedDB || - self.msIndexedDB - -// book keeping -let dbs = [] - -function createRepo (id, done) { - dbs.push(id) - - const repo = new IPFSRepo(id) - series([ - (cb) => repo.init({}, cb), - (cb) => repo.open(cb) - ], (err) => { - if (err) { - return done(err) - } - done(null, repo) - }) -} - -function removeRepos (done) { - dbs.forEach((db) => { - idb.deleteDatabase(db) - idb.deleteDatabase(`${db}/blocks`) - }) - dbs = [] - done() -} - -const repo = { - create: createRepo, - remove: removeRepos -} - -require('./index-test')(repo) -require('./components/decision-engine/index-test')(repo) +require('./bitswap-mock-internals.js') +require('./decision-engine/decision-engine') diff --git a/test/components/decision-engine/index-test.js b/test/components/decision-engine/index-test.js deleted file mode 100644 index 0e4ef68d..00000000 --- a/test/components/decision-engine/index-test.js +++ /dev/null @@ -1,252 +0,0 @@ -/* eslint max-nested-callbacks: ["error", 8] */ -/* eslint-env mocha */ -'use strict' - -const chai = require('chai') -chai.use(require('dirty-chai')) -const expect = chai.expect -const PeerId = require('peer-id') -const _ = require('lodash') -const Block = require('ipfs-block') -const parallel = require('async/parallel') -const series = require('async/series') -const map = require('async/map') -const each = require('async/each') -const waterfall = require('async/waterfall') -const eachSeries = require('async/eachSeries') -const CID = require('cids') -const multihashing = require('multihashing-async') - -const Message = require('../../../src/types/message') -const DecisionEngine = require('../../../src/components/decision-engine') - -const mockNetwork = require('../../utils').mockNetwork - -function messageToString (m) { - return Array.from(m[1].blocks.values()) - .map((b) => b.data.toString()) -} - -function stringifyMessages (messages) { - return _.flatten(messages.map(messageToString)) -} - -module.exports = (repo) => { - function newEngine (path, done, net) { - parallel([ - (cb) => repo.create(path, cb), - (cb) => PeerId.create(cb) - ], (err, results) => { - if (err) { - return done(err) - } - const blockstore = results[0].blocks - const engine = new DecisionEngine(blockstore, net || mockNetwork()) - engine.start() - - done(null, { peer: results[1], engine }) - }) - } - - describe('Engine', () => { - afterEach((done) => { - repo.remove(done) - }) - - it('consistent accounting', (done) => { - parallel([ - (cb) => newEngine('Ernie', cb), - (cb) => newEngine('Bert', cb) - ], (err, res) => { - expect(err).to.not.exist() - - const sender = res[0] - const receiver = res[1] - - map(_.range(1000), (i, cb) => { - const data = new Buffer(`this is message ${i}`) - multihashing(data, 'sha2-256', (err, hash) => { - expect(err).to.not.exist() - - const m = new Message(false) - const block = new Block(data, new CID(hash)) - m.addBlock(block) - sender.engine.messageSent(receiver.peer, block) - receiver.engine.messageReceived(sender.peer, m, cb) - }) - }, (err) => { - expect(err).to.not.exist() - expect(sender.engine.numBytesSentTo(receiver.peer)) - .to.be.above(0) - - expect(sender.engine.numBytesSentTo(receiver.peer)) - .to.eql(receiver.engine.numBytesReceivedFrom(sender.peer)) - - expect(receiver.engine.numBytesSentTo(sender.peer)) - .to.eql(0) - - expect(sender.engine.numBytesReceivedFrom(receiver.peer)) - .to.eql(0) - - done() - }) - }) - }) - - it('peer is added to peers when message receiver or sent', (done) => { - parallel([ - (cb) => newEngine('sf', cb), - (cb) => newEngine('sea', cb) - ], (err, res) => { - expect(err).to.not.exist() - - const sanfrancisco = res[0] - const seattle = res[1] - - const m = new Message(true) - sanfrancisco.engine.messageSent(seattle.peer) - seattle.engine.messageReceived(sanfrancisco.peer, m, (err) => { - expect(err).to.not.exist() - - expect(seattle.peer.toHexString()) - .to.not.eql(sanfrancisco.peer.toHexString()) - - expect(sanfrancisco.engine.peers()).to.include(seattle.peer) - - expect(seattle.engine.peers()) - .to.include(sanfrancisco.peer) - done() - }) - }) - }) - - it('partner wants then cancels', (done) => { - const numRounds = 10 - const alphabet = 'abcdefghijklmnopqrstuvwxyz'.split('') - const vowels = 'aeiou'.split('') - const testCases = [ - [alphabet, vowels], - [alphabet, _.difference(alphabet, vowels)] - ] - - function partnerWants (dEngine, values, partner, cb) { - const message = new Message(false) - - map(values, (v, cb) => multihashing(new Buffer(v), 'sha2-256', cb), (err, hashes) => { - expect(err).to.not.exist() - hashes.forEach((hash, i) => { - message.addEntry(new CID(hash), Math.pow(2, 32) - 1 - i) - }) - - dEngine.messageReceived(partner, message, cb) - }) - } - - function partnerCancels (dEngine, values, partner, cb) { - const message = new Message(false) - - map(values, (v, cb) => multihashing(new Buffer(v), 'sha2-256', cb), (err, hashes) => { - expect(err).to.not.exist() - hashes.forEach((hash) => { - message.cancel(new CID(hash)) - }) - dEngine.messageReceived(partner, message, cb) - }) - } - - repo.create('p', (err, repo) => { - expect(err).to.not.exist() - - waterfall([ - (cb) => map( - alphabet, - (v, cb) => multihashing(new Buffer(v), 'sha2-256', cb), - cb - ), - (hashes, cb) => each( - hashes.map((h, i) => { - return new Block(new Buffer(alphabet[i]), new CID(h)) - }), - (b, cb) => repo.blocks.put(b, cb), - cb - ), - (cb) => eachSeries(_.range(numRounds), (i, cb) => { - // 2 test cases - // a) want alphabet - cancel vowels - // b) want alphabet - cancels everything except vowels - - eachSeries(testCases, (testcase, innerCb) => { - const set = testcase[0] - const cancels = testcase[1] - const keeps = _.difference(set, cancels) - - const network = mockNetwork(1, (res) => { - const msgs = stringifyMessages(res.messages) - expect(msgs.sort()).to.eql(keeps.sort()) - innerCb() - }) - - const dEngine = new DecisionEngine(repo.blocks, network) - dEngine.start() - - let partner - series([ - (cb) => PeerId.create((err, id) => { - if (err) { - return cb(err) - } - partner = id - cb() - }), - (cb) => partnerWants(dEngine, set, partner, cb), - (cb) => partnerCancels(dEngine, cancels, partner, cb) - ], (err) => { - expect(err).to.not.exist() - }) - }, cb) - }, cb) - ], done) - }) - }) - - it('splits large block messages', (done) => { - const data = _.range(10).map((i) => { - const b = new Buffer(1024 * 256) - b.fill(i) - return b - }) - - const net = mockNetwork(5, (res) => { - expect(res.messages).to.have.length(5) - done() - }) - - parallel([ - (cb) => newEngine('sf', cb, net), - (cb) => map(data, (d, cb) => multihashing(d, 'sha2-256', (err, hash) => { - expect(err).to.not.exist() - cb(null, new Block(d, new CID(hash))) - }), cb) - ], (err, res) => { - expect(err).to.not.exist() - const sf = res[0].engine - const id = res[0].peer - - const blocks = res[1] - const cids = blocks.map((b) => b.cid) - - each(blocks, (b, cb) => sf.blockstore.put(b, cb), (err) => { - expect(err).to.not.exist() - const msg = new Message(false) - cids.forEach((c, i) => { - msg.addEntry(c, Math.pow(2, 32) - 1 - i) - }) - - sf.messageReceived(id, msg, (err) => { - expect(err).to.not.exist() - }) - }) - }) - }) - }) -} diff --git a/test/components/wantmanager/index.spec.js b/test/components/wantmanager/index.spec.js deleted file mode 100644 index 0cf00c20..00000000 --- a/test/components/wantmanager/index.spec.js +++ /dev/null @@ -1,96 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const chai = require('chai') -chai.use(require('dirty-chai')) -const expect = chai.expect -const PeerId = require('peer-id') -const parallel = require('async/parallel') -const series = require('async/series') -const map = require('async/map') -const _ = require('lodash') - -const cs = require('../../../src/constants') -const Message = require('../../../src/types/message') -const WantManager = require('../../../src/components/want-manager') - -const utils = require('../../utils') -const mockNetwork = utils.mockNetwork - -describe('WantManager', () => { - it('sends wantlist to all connected peers', (done) => { - let cids - let blocks - - parallel([ - (cb) => PeerId.create(cb), - (cb) => PeerId.create(cb), - (cb) => { - map(_.range(3), (i, cb) => utils.makeBlock(cb), (err, res) => { - expect(err).to.not.exist() - blocks = res - cids = blocks.map((b) => b.cid) - cb() - }) - } - ], (err, peerIds) => { - if (err) { - return done(err) - } - - const peer1 = peerIds[0] - const peer2 = peerIds[1] - const cid1 = cids[0] - const cid2 = cids[1] - const cid3 = cids[2] - - let wantManager - - const network = mockNetwork(6, (calls) => { - expect(calls.connects).to.have.length(6) - const m1 = new Message(true) - - m1.addEntry(cid1, cs.kMaxPriority) - m1.addEntry(cid2, cs.kMaxPriority - 1) - - const m2 = new Message(false) - - m2.cancel(cid2) - - const m3 = new Message(false) - - m3.addEntry(cid3, cs.kMaxPriority) - - const msgs = [m1, m1, m2, m2, m3, m3] - - calls.messages.forEach((m, i) => { - expect(m[0]).to.be.eql(calls.connects[i]) - expect(m[1].equals(msgs[i])).to.be.eql(true) - }) - - wantManager = null - done() - }) - - wantManager = new WantManager(network) - - wantManager.run() - wantManager.wantBlocks([cid1, cid2]) - - wantManager.connected(peer1) - wantManager.connected(peer2) - - series([ - (cb) => setTimeout(cb, 200), - (cb) => { - wantManager.cancelWants([cid2]) - cb() - }, - (cb) => setTimeout(cb, 200) - ], (err) => { - expect(err).to.not.exist() - wantManager.wantBlocks([cid3]) - }) - }) - }) -}) diff --git a/test/decision-engine/decision-engine.js b/test/decision-engine/decision-engine.js new file mode 100644 index 00000000..a205745f --- /dev/null +++ b/test/decision-engine/decision-engine.js @@ -0,0 +1,248 @@ +/* eslint max-nested-callbacks: ["error", 8] */ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const PeerId = require('peer-id') +const _ = require('lodash') +const Block = require('ipfs-block') +const parallel = require('async/parallel') +const series = require('async/series') +const map = require('async/map') +const each = require('async/each') +const waterfall = require('async/waterfall') +const eachSeries = require('async/eachSeries') +const CID = require('cids') +const multihashing = require('multihashing-async') +const Buffer = require('safe-buffer').Buffer + +const Message = require('../../src/types/message') +const DecisionEngine = require('../../src/decision-engine') +const createTempRepo = require('../utils/create-temp-repo-nodejs.js') + +const mockNetwork = require('../utils/mocks').mockNetwork + +function messageToString (m) { + return Array.from(m[1].blocks.values()) + .map((b) => b.data.toString()) +} + +function stringifyMessages (messages) { + return _.flatten(messages.map(messageToString)) +} + +function newEngine (network, callback) { + parallel([ + (cb) => createTempRepo(cb), + (cb) => PeerId.create({bits: 1024}, cb) + ], (err, results) => { + if (err) { + return callback(err) + } + const blockstore = results[0].blocks + const peerId = results[1] + const engine = new DecisionEngine(peerId, blockstore, network || mockNetwork()) + engine.start((err) => callback(err, { peer: peerId, engine: engine })) + }) +} + +describe('Engine', () => { + it('consistent accounting', (done) => { + parallel([ + (cb) => newEngine(false, cb), + (cb) => newEngine(false, cb) + ], (err, res) => { + expect(err).to.not.exist() + + const sender = res[0] + const receiver = res[1] + + map(_.range(1000), (i, cb) => { + const data = Buffer.from(`this is message ${i}`) + multihashing(data, 'sha2-256', (err, hash) => { + expect(err).to.not.exist() + + const m = new Message(false) + const block = new Block(data, new CID(hash)) + m.addBlock(block) + sender.engine.messageSent(receiver.peer, block) + receiver.engine.messageReceived(sender.peer, m, cb) + }) + }, (err) => { + expect(err).to.not.exist() + expect(sender.engine.numBytesSentTo(receiver.peer)) + .to.be.above(0) + + expect(sender.engine.numBytesSentTo(receiver.peer)) + .to.eql(receiver.engine.numBytesReceivedFrom(sender.peer)) + + expect(receiver.engine.numBytesSentTo(sender.peer)) + .to.eql(0) + + expect(sender.engine.numBytesReceivedFrom(receiver.peer)) + .to.eql(0) + + done() + }) + }) + }) + + it('peer is added to peers when message receiver or sent', (done) => { + parallel([ + (cb) => newEngine(false, cb), + (cb) => newEngine(false, cb) + ], (err, res) => { + expect(err).to.not.exist() + + const sanfrancisco = res[0] + const seattle = res[1] + + const m = new Message(true) + sanfrancisco.engine.messageSent(seattle.peer) + + seattle.engine.messageReceived(sanfrancisco.peer, m, (err) => { + expect(err).to.not.exist() + + expect(seattle.peer.toHexString()) + .to.not.eql(sanfrancisco.peer.toHexString()) + expect(sanfrancisco.engine.peers()).to.include(seattle.peer) + expect(seattle.engine.peers()).to.include(sanfrancisco.peer) + + done() + }) + }) + }) + + it('partner wants then cancels', function (done) { + this.timeout(40 * 1000) + + const numRounds = 10 + const alphabet = 'abcdefghijklmnopqrstuvwxyz'.split('') + const vowels = 'aeiou'.split('') + const testCases = [ + [alphabet, vowels], + [alphabet, _.difference(alphabet, vowels)] + ] + + function partnerWants (dEngine, values, partner, cb) { + const message = new Message(false) + + map(values, (v, cb) => multihashing(Buffer.from(v), 'sha2-256', cb), (err, hashes) => { + expect(err).to.not.exist() + hashes.forEach((hash, i) => { + message.addEntry(new CID(hash), Math.pow(2, 32) - 1 - i) + }) + + dEngine.messageReceived(partner, message, cb) + }) + } + + function partnerCancels (dEngine, values, partner, cb) { + const message = new Message(false) + + map(values, (v, cb) => multihashing(Buffer.from(v), 'sha2-256', cb), (err, hashes) => { + expect(err).to.not.exist() + hashes.forEach((hash) => { + message.cancel(new CID(hash)) + }) + dEngine.messageReceived(partner, message, cb) + }) + } + + createTempRepo((err, repo) => { + expect(err).to.not.exist() + + waterfall([ + (cb) => map(alphabet, + (v, cb) => multihashing(Buffer.from(v), 'sha2-256', cb), + cb + ), + (hashes, cb) => each( + hashes.map((h, i) => { + return new Block(Buffer.from(alphabet[i]), new CID(h)) + }), + (b, cb) => repo.blocks.put(b, cb), + cb + ), + (cb) => eachSeries(_.range(numRounds), (i, cb) => { + // 2 test cases + // a) want alphabet - cancel vowels + // b) want alphabet - cancels everything except vowels + + eachSeries(testCases, (testcase, innerCb) => { + const set = testcase[0] + const cancels = testcase[1] + const keeps = _.difference(set, cancels) + + const network = mockNetwork(1, (res) => { + const msgs = stringifyMessages(res.messages) + expect(msgs.sort()).to.eql(keeps.sort()) + innerCb() + }) + + PeerId.create({bits: 1024}, (err, id) => { + expect(err).to.not.exist() + const dEngine = new DecisionEngine(id, repo.blocks, network) + dEngine.start((err) => { + expect(err).to.not.exist() + + let partner + series([ + (cb) => PeerId.create({bits: 1024}, (err, id) => { + if (err) { return cb(err) } + partner = id + cb() + }), + (cb) => partnerWants(dEngine, set, partner, cb), + (cb) => partnerCancels(dEngine, cancels, partner, cb) + ], (err) => { + expect(err).to.not.exist() + }) + }) + }) + }, cb) + }, cb) + ], done) + }) + }) + + it('splits large block messages', (done) => { + const data = _.range(10).map((i) => { + const b = Buffer.alloc(1024 * 256) + b.fill(i) + return b + }) + + const net = mockNetwork(5, (res) => { + expect(res.messages).to.have.length(5) + done() + }) + + parallel([ + (cb) => newEngine(net, cb), + (cb) => map(data, (d, cb) => multihashing(d, 'sha2-256', (err, hash) => { + expect(err).to.not.exist() + cb(null, new Block(d, new CID(hash))) + }), cb) + ], (err, res) => { + expect(err).to.not.exist() + const sf = res[0].engine + const id = res[0].peer + + const blocks = res[1] + const cids = blocks.map((b) => b.cid) + + each(blocks, (b, cb) => sf.blockstore.put(b, cb), (err) => { + expect(err).to.not.exist() + const msg = new Message(false) + cids.forEach((c, i) => msg.addEntry(c, Math.pow(2, 32) - 1 - i)) + + sf.messageReceived(id, msg, (err) => { + expect(err).to.not.exist() + }) + }) + }) + }) +}) diff --git a/test/components/decision-engine/ledger.spec.js b/test/decision-engine/ledger.spec.js similarity index 85% rename from test/components/decision-engine/ledger.spec.js rename to test/decision-engine/ledger.spec.js index 8bba6e93..a3bc81b9 100644 --- a/test/components/decision-engine/ledger.spec.js +++ b/test/decision-engine/ledger.spec.js @@ -6,14 +6,14 @@ chai.use(require('dirty-chai')) const expect = chai.expect const PeerId = require('peer-id') -const Ledger = require('../../../src/components/decision-engine/ledger') +const Ledger = require('../../src/decision-engine/ledger') describe('Ledger', () => { let peerId let ledger before((done) => { - PeerId.create((err, _peerId) => { + PeerId.create({bits: 1024}, (err, _peerId) => { if (err) { return done(err) } diff --git a/test/test-repo/config b/test/fixtures/repo/config similarity index 100% rename from test/test-repo/config rename to test/fixtures/repo/config diff --git a/test/test-repo/version b/test/fixtures/repo/version similarity index 100% rename from test/test-repo/version rename to test/fixtures/repo/version diff --git a/test/test-data/serialized-from-go/bitswap110-message-full-wantlist b/test/fixtures/serialized-from-go/bitswap110-message-full-wantlist similarity index 100% rename from test/test-data/serialized-from-go/bitswap110-message-full-wantlist rename to test/fixtures/serialized-from-go/bitswap110-message-full-wantlist diff --git a/test/test-data/serialized-from-go/bitswap110-message-one-block b/test/fixtures/serialized-from-go/bitswap110-message-one-block similarity index 100% rename from test/test-data/serialized-from-go/bitswap110-message-one-block rename to test/fixtures/serialized-from-go/bitswap110-message-one-block diff --git a/test/index-test.js b/test/index-test.js deleted file mode 100644 index 9f2bc810..00000000 --- a/test/index-test.js +++ /dev/null @@ -1,353 +0,0 @@ -/* eslint-env mocha */ -/* eslint max-nested-callbacks: ["error", 8] */ -'use strict' - -const eachSeries = require('async/eachSeries') -const waterfall = require('async/waterfall') -const each = require('async/each') -const map = require('async/map') -const parallel = require('async/parallel') -const setImmediate = require('async/setImmediate') -const _ = require('lodash') -const chai = require('chai') -chai.use(require('dirty-chai')) -const expect = chai.expect -const PeerId = require('peer-id') -const PeerBook = require('peer-book') - -const Message = require('../src/types/message') -const Bitswap = require('../src') - -const utils = require('./utils') -const makeBlock = utils.makeBlock - -const hasBlocks = (msg, store, cb) => { - each(msg.blocks.values(), (b, cb) => { - store.has(b.cid, (err, has) => { - if (err) { - return cb(err) - } - if (!has) { - return cb(new Error('missing block')) - } - cb() - }) - }, cb) -} - -module.exports = (repo) => { - const libp2pMock = { - handle: function () {}, - on () {}, - swarm: { - muxedConns: {}, - setMaxListeners () {} - } - } - - describe('bitswap', () => { - let store - let blocks - let ids - - before((done) => { - parallel([ - (cb) => repo.create('hello', cb), - (cb) => map(_.range(12), (i, cb) => makeBlock(cb), cb), - (cb) => map(_.range(2), (i, cb) => PeerId.create(cb), cb) - ], (err, results) => { - if (err) { - return done(err) - } - - store = results[0].blocks - blocks = results[1] - ids = results[2] - - done() - }) - }) - - after((done) => { - repo.remove(done) - }) - - describe('receive message', () => { - it('simple block message', (done) => { - const book = new PeerBook() - const bs = new Bitswap(libp2pMock, store, book) - bs.start() - - const other = ids[1] - - const b1 = blocks[0] - const b2 = blocks[1] - - const msg = new Message(false) - msg.addBlock(b1) - msg.addBlock(b2) - - bs._receiveMessage(other, msg, (err) => { - expect(err).to.not.exist() - expect(bs.blocksRecvd).to.equal(2) - expect(bs.dupBlocksRecvd).to.equal(0) - - map([b1.cid, b2.cid], (cid, cb) => store.get(cid, cb), (err, blocks) => { - expect(err).to.not.exist() - - expect(blocks[0].data).to.eql(b1.data) - expect(blocks[1].data).to.eql(b2.data) - done() - }) - }) - }) - - it('simple want message', (done) => { - const book = new PeerBook() - const bs = new Bitswap(libp2pMock, store, book) - bs.start() - - const other = ids[1] - const b1 = blocks[0] - const b2 = blocks[1] - - const msg = new Message(false) - - msg.addEntry(b1.cid, 1, false) - msg.addEntry(b2.cid, 1, false) - - bs._receiveMessage(other, msg, (err) => { - expect(err).to.not.exist() - - expect(bs.blocksRecvd).to.be.eql(0) - expect(bs.dupBlocksRecvd).to.be.eql(0) - - const wl = bs.wantlistForPeer(other) - - expect(wl.has(b1.cid.buffer.toString())).to.eql(true) - expect(wl.has(b2.cid.buffer.toString())).to.eql(true) - - done() - }) - }) - - it('multi peer', (done) => { - const book = new PeerBook() - const bs = new Bitswap(libp2pMock, store, book) - - let others - let blocks - - bs.start() - - parallel([ - (cb) => map(_.range(5), (i, cb) => PeerId.create(cb), cb), - (cb) => map(_.range(10), (i, cb) => makeBlock(cb), cb) - ], (err, results) => { - if (err) { - return done(err) - } - - others = results[0] - blocks = results[1] - test() - }) - - function test () { - map(_.range(5), (i, cb) => { - const msg = new Message(false) - msg.addBlock(blocks[i]) - msg.addBlock(blocks[5 + 1]) - cb(null, msg) - }, (err, messages) => { - expect(err).to.not.exist() - let i = 0 - eachSeries(others, (other, cb) => { - const msg = messages[i] - i++ - bs._receiveMessage(other, msg, (err) => { - expect(err).to.not.exist() - hasBlocks(msg, store, cb) - }) - }, done) - }) - } - }) - }) - - describe('get', () => { - it('block exists locally', (done) => { - const block = blocks[4] - - store.put(block, (err) => { - expect(err).to.not.exist() - const book = new PeerBook() - const bs = new Bitswap(libp2pMock, store, book) - - bs.get(block.cid, (err, res) => { - expect(err).to.not.exist() - expect(res).to.eql(block) - done() - }) - }) - }) - - it('blocks exist locally', (done) => { - const b1 = blocks[5] - const b2 = blocks[6] - const b3 = blocks[7] - - store.putMany([b1, b2, b3], (err) => { - expect(err).to.not.exist() - - const book = new PeerBook() - const bs = new Bitswap(libp2pMock, store, book) - - map([b1.cid, b2.cid, b3.cid], (cid, cb) => bs.get(cid, cb), (err, res) => { - expect(err).to.not.exist() - expect(res).to.be.eql([b1, b2, b3]) - done() - }) - }) - }) - - it('block is added locally afterwards', (done) => { - const block = blocks[9] - const book = new PeerBook() - const bs = new Bitswap(libp2pMock, store, book) - const net = utils.mockNetwork() - - bs.network = net - bs.wm.network = net - bs.engine.network = net - bs.start() - - bs.get(block.cid, (err, res) => { - expect(err).to.not.exist() - expect(res).to.be.eql(block) - done() - }) - - setTimeout(() => { - bs.put(block, () => {}) - }, 200) - }) - - it('block is sent after local add', (done) => { - const me = ids[0] - const other = ids[1] - const block = blocks[10] - let bs1 - let bs2 - - const n1 = { - connectTo (id, cb) { - let err - if (id.toHexString() !== other.toHexString()) { - err = new Error('unkown peer') - } - setImmediate(() => cb(err)) - }, - sendMessage (id, msg, cb) { - if (id.toHexString() === other.toHexString()) { - bs2._receiveMessage(me, msg, cb) - } else { - setImmediate(() => cb(new Error('unkown peer'))) - } - }, - start () {}, - stop () {} - } - const n2 = { - connectTo (id, cb) { - let err - if (id.toHexString() !== me.toHexString()) { - err = new Error('unkown peer') - } - setImmediate(() => cb(err)) - }, - sendMessage (id, msg, cb) { - if (id.toHexString() === me.toHexString()) { - bs1._receiveMessage(other, msg, cb) - } else { - setImmediate(() => cb(new Error('unkown peer'))) - } - }, - start () {}, - stop () {} - } - bs1 = new Bitswap(libp2pMock, store, new PeerBook()) - utils.applyNetwork(bs1, n1) - bs1.start() - - let store2 - - waterfall([ - (cb) => repo.create('world', cb), - (repo, cb) => { - store2 = repo.blocks - bs2 = new Bitswap(libp2pMock, store2, new PeerBook()) - utils.applyNetwork(bs2, n2) - bs2.start() - bs1._onPeerConnected(other) - bs2._onPeerConnected(me) - - bs1.get(block.cid, (err, res) => { - expect(err).to.not.exist() - cb(null, res) - }) - setTimeout(() => { - bs2.put(block, () => {}) - }, 1000) - }, - (res, cb) => { - expect(res).to.eql(block) - cb() - } - ], done) - }) - }) - - describe('stat', () => { - it('has initial stats', () => { - const bs = new Bitswap(libp2pMock, {}, new PeerBook()) - - const stats = bs.stat() - expect(stats).to.have.property('wantlist') - expect(stats).to.have.property('blocksReceived', 0) - expect(stats).to.have.property('dupBlksReceived', 0) - expect(stats).to.have.property('dupDataReceived', 0) - expect(stats).to.have.property('peers') - }) - }) - - describe('unwant', () => { - it('removes blocks that are wanted multiple times', (done) => { - const bs = new Bitswap(libp2pMock, store, new PeerBook()) - bs.start() - const b = blocks[11] - - let i = 0 - const finish = () => { - i++ - if (i === 2) { - done() - } - } - - bs.get(b.cid, (err, res) => { - expect(err).to.not.exist() - expect(res).to.not.exist() - finish() - }) - bs.get(b.cid, (err, res) => { - expect(err).to.not.exist() - expect(res).to.not.exist() - finish() - }) - - setTimeout(() => bs.unwant(b.cid), 10) - }) - }) - }) -} diff --git a/test/libp2p-bundle.js b/test/libp2p-bundle.js deleted file mode 100644 index 9cb99512..00000000 --- a/test/libp2p-bundle.js +++ /dev/null @@ -1,25 +0,0 @@ -'use strict' - -const TCP = require('libp2p-tcp') -const multiplex = require('libp2p-multiplex') -const secio = require('libp2p-secio') -const libp2p = require('libp2p') - -class Node extends libp2p { - constructor (peerInfo, peerBook, options) { - options = options || {} - - const modules = { - transport: [ new TCP() ], - connection: { - muxer: multiplex, - crypto: [ secio ] - }, - discovery: [] - } - - super(modules, peerInfo, peerBook, options) - } -} - -module.exports = Node diff --git a/test/components/network/gen-bitswap-network.node.js b/test/network/gen-bitswap-network.node.js similarity index 95% rename from test/components/network/gen-bitswap-network.node.js rename to test/network/gen-bitswap-network.node.js index 67f49602..357dc5f3 100644 --- a/test/components/network/gen-bitswap-network.node.js +++ b/test/network/gen-bitswap-network.node.js @@ -16,14 +16,14 @@ const crypto = require('crypto') const CID = require('cids') const multihashing = require('multihashing-async') -const utils = require('../../utils') +const genBitswapNetwork = require('../utils/mocks').genBitswapNetwork describe('gen Bitswap network', function () { // CI is very slow this.timeout(300 * 1000) it('retrieves local blocks', (done) => { - utils.genBitswapNetwork(1, (err, nodes) => { + genBitswapNetwork(1, (err, nodes) => { expect(err).to.not.exist() const node = nodes[0] @@ -66,7 +66,7 @@ describe('gen Bitswap network', function () { describe('distributed blocks', () => { it('with 2 nodes', (done) => { const n = 2 - utils.genBitswapNetwork(n, (err, nodeArr) => { + genBitswapNetwork(n, (err, nodeArr) => { expect(err).to.not.exist() nodeArr.forEach((node) => { expect( diff --git a/test/components/network/network.node.js b/test/network/network.node.js similarity index 62% rename from test/components/network/network.node.js rename to test/network/network.node.js index 5ffd80e2..be6a9f6c 100644 --- a/test/components/network/network.node.js +++ b/test/network/network.node.js @@ -1,105 +1,89 @@ /* eslint-env mocha */ 'use strict' -const Node = require('../../libp2p-bundle') -const PeerInfo = require('peer-info') const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect -const PeerBook = require('peer-book') +const PeerInfo = require('peer-info') +const PeerId = require('peer-id') const lp = require('pull-length-prefixed') const pull = require('pull-stream') const parallel = require('async/parallel') +const waterfall = require('async/waterfall') const map = require('async/map') const _ = require('lodash') -const utils = require('../../utils') -const Network = require('../../../src/components/network') -const Message = require('../../../src/types/message') +const Node = require('../utils/create-libp2p-node').bundle +const makeBlock = require('../utils/make-block') +const Network = require('../../src/network') +const Message = require('../../src/types/message') + +// TODO send this to utils +function createP2PNode (multiaddrs, options, callback) { + if (typeof options === 'function') { + callback = options + options = {} + } + + if (!Array.isArray(multiaddrs)) { + multiaddrs = [multiaddrs] + } + + waterfall([ + (cb) => PeerId.create({ bits: 1024 }, cb), + (peerId, cb) => PeerInfo.create(peerId, cb), + (peerInfo, cb) => { + multiaddrs.map((ma) => peerInfo.multiaddrs.add(ma)) + cb(null, peerInfo) + }, + (peerInfo, cb) => { + const node = new Node(peerInfo, undefined, options) + cb(null, node) + } + ], callback) +} describe('network', () => { - let libp2pNodeA - let peerInfoA - let peerBookA + let p2pA let networkA - let libp2pNodeB - let peerInfoB - let peerBookB + let p2pB let networkB - let libp2pNodeC - let peerInfoC - let peerBookC + let p2pC let networkC let blocks before((done) => { - let counter = 0 parallel([ - (cb) => PeerInfo.create(cb), - (cb) => PeerInfo.create(cb), - (cb) => PeerInfo.create(cb), - (cb) => map(_.range(2), (i, cb) => utils.makeBlock(cb), cb) + (cb) => createP2PNode('/ip4/127.0.0.1/tcp/0', { bits: 1024 }, cb), + (cb) => createP2PNode('/ip4/127.0.0.1/tcp/0', { bits: 1024 }, cb), + (cb) => createP2PNode('/ip4/127.0.0.1/tcp/0', { bits: 1024 }, cb), + (cb) => map(_.range(2), (i, cb) => makeBlock(cb), cb) ], (err, results) => { - if (err) { - return done(err) - } + expect(err).to.not.exist() - peerInfoA = results[0] - peerInfoB = results[1] - peerInfoC = results[2] + p2pA = results[0] + p2pB = results[1] + p2pC = results[2] blocks = results[3] - const maA = '/ip4/127.0.0.1/tcp/10100/ipfs/' + peerInfoA.id.toB58String() - const maB = '/ip4/127.0.0.1/tcp/10300/ipfs/' + peerInfoB.id.toB58String() - const maC = '/ip4/127.0.0.1/tcp/10500/ipfs/' + peerInfoC.id.toB58String() - - peerInfoA.multiaddrs.add(maA) - peerInfoB.multiaddrs.add(maB) - peerInfoC.multiaddrs.add(maC) - - peerBookA = new PeerBook() - peerBookB = new PeerBook() - peerBookC = new PeerBook() - - peerBookA.put(peerInfoB) - peerBookA.put(peerInfoC) - - peerBookB.put(peerInfoA) - peerBookB.put(peerInfoC) - - peerBookC.put(peerInfoA) - peerBookC.put(peerInfoB) - - libp2pNodeA = new Node(peerInfoA, peerBookA) - libp2pNodeA.start(started) - libp2pNodeB = new Node(peerInfoB, peerBookB) - libp2pNodeB.start(started) - libp2pNodeC = new Node(peerInfoC, peerBookC) - libp2pNodeC.start(started) - - function started () { - if (++counter === 3) { - done() - } - } + parallel([ + (cb) => p2pA.start(cb), + (cb) => p2pB.start(cb), + (cb) => p2pC.start(cb) + ], done) }) }) after((done) => { - let counter = 0 - libp2pNodeA.stop(stopped) - libp2pNodeB.stop(stopped) - libp2pNodeC.stop(stopped) - - function stopped () { - if (++counter === 3) { - done() - } - } + parallel([ + (cb) => p2pA.stop(cb), + (cb) => p2pB.stop(cb), + (cb) => p2pC.stop(cb) + ], done) }) let bitswapMockA = { @@ -124,24 +108,24 @@ describe('network', () => { } it('instantiate the network obj', (done) => { - networkA = new Network(libp2pNodeA, peerBookA, bitswapMockA) - networkB = new Network(libp2pNodeB, peerBookB, bitswapMockB) + networkA = new Network(p2pA, bitswapMockA) + networkB = new Network(p2pB, bitswapMockB) // only bitswap100 - networkC = new Network(libp2pNodeC, peerBookC, bitswapMockC, true) + networkC = new Network(p2pC, bitswapMockC, { b100Only: true }) expect(networkA).to.exist() expect(networkB).to.exist() expect(networkC).to.exist() - networkA.start() - networkB.start() - networkC.start() - - done() + parallel([ + (cb) => networkA.start(cb), + (cb) => networkB.start(cb), + (cb) => networkC.start(cb) + ], done) }) it('connectTo fail', (done) => { - networkA.connectTo(peerInfoB.id, (err) => { + networkA.connectTo(p2pB.peerInfo.id, (err) => { expect(err).to.exist() done() }) @@ -151,20 +135,22 @@ describe('network', () => { var counter = 0 bitswapMockA._onPeerConnected = (peerId) => { - expect(peerId.toB58String()).to.equal(peerInfoB.id.toB58String()) + expect(peerId.toB58String()).to.equal(p2pB.peerInfo.id.toB58String()) + if (++counter === 2) { finish() } } bitswapMockB._onPeerConnected = (peerId) => { - expect(peerId.toB58String()).to.equal(peerInfoA.id.toB58String()) + expect(peerId.toB58String()).to.equal(p2pA.peerInfo.id.toB58String()) + if (++counter === 2) { finish() } } - libp2pNodeA.dial(peerInfoB, (err) => { + p2pA.dial(p2pB.peerInfo, (err) => { expect(err).to.not.exist() }) @@ -176,10 +162,7 @@ describe('network', () => { }) it('connectTo success', (done) => { - networkA.connectTo(peerInfoB.id, (err) => { - expect(err).to.not.exist() - done() - }) + networkA.connectTo(p2pB.peerInfo, done) }) it('._receiveMessage success from Bitswap 1.0.0', (done) => { @@ -193,6 +176,7 @@ describe('network', () => { bitswapMockB._receiveMessage = (peerId, msgReceived) => { expect(msg).to.eql(msgReceived) + bitswapMockB._receiveMessage = () => {} bitswapMockB._receiveError = () => {} done() @@ -202,7 +186,7 @@ describe('network', () => { expect(err).to.not.exist() } - libp2pNodeA.dial(peerInfoB, '/ipfs/bitswap/1.0.0', (err, conn) => { + p2pA.dial(p2pB.peerInfo, '/ipfs/bitswap/1.0.0', (err, conn) => { expect(err).to.not.exist() pull( @@ -235,7 +219,7 @@ describe('network', () => { expect(err).to.not.exist() } - libp2pNodeA.dial(peerInfoB, '/ipfs/bitswap/1.1.0', (err, conn) => { + p2pA.dial(p2pB.peerInfo, '/ipfs/bitswap/1.1.0', (err, conn) => { expect(err).to.not.exist() pull( @@ -268,7 +252,7 @@ describe('network', () => { expect(err).to.not.exist() } - networkA.sendMessage(peerInfoB.id, msg, (err) => { + networkA.sendMessage(p2pB.peerInfo.id, msg, (err) => { expect(err).to.not.exist() }) }) @@ -277,27 +261,27 @@ describe('network', () => { let counter = 0 bitswapMockA._onPeerConnected = (peerId) => { - expect(peerId.toB58String()).to.equal(peerInfoC.id.toB58String()) + expect(peerId.toB58String()).to.equal(p2pC.peerInfo.id.toB58String()) if (++counter === 2) { finish() } } bitswapMockC._onPeerConnected = (peerId) => { - expect(peerId.toB58String()).to.equal(peerInfoA.id.toB58String()) + expect(peerId.toB58String()).to.equal(p2pA.peerInfo.id.toB58String()) if (++counter === 2) { finish() } } - libp2pNodeA.dial(peerInfoC, (err) => { + p2pA.dial(p2pC.peerInfo, (err) => { expect(err).to.not.exist() }) function finish () { bitswapMockA._onPeerConnected = () => {} bitswapMockC._onPeerConnected = () => {} - networkA.connectTo(peerInfoC.id, done) + networkA.connectTo(p2pC.peerInfo.id, done) } }) @@ -321,7 +305,7 @@ describe('network', () => { expect(err).to.not.exist() } - networkA.sendMessage(peerInfoC.id, msg, (err) => { + networkA.sendMessage(p2pC.peerInfo.id, msg, (err) => { expect(err).to.not.exist() }) }) diff --git a/test/node.js b/test/node.js index eac5f25b..02daf18b 100644 --- a/test/node.js +++ b/test/node.js @@ -1,47 +1,7 @@ 'use strict' -const IPFSRepo = require('ipfs-repo') -const path = require('path') -const ncp = require('ncp') -const rimraf = require('rimraf') -const testRepoPath = path.join(__dirname, 'test-repo') -const each = require('async/each') - -// book keeping -let repos = [] - -function createRepo (id, done) { - const date = Date.now().toString() - const repoPath = `${testRepoPath}-for-${date}-${id}` - ncp(testRepoPath, repoPath, (err) => { - if (err) return done(err) - - const repo = new IPFSRepo(repoPath) - repo.open((err) => { - if (err) { - return done(err) - } - repos.push(repoPath) - done(null, repo) - }) - }) -} - -function removeRepos (done) { - each(repos, (repo, cb) => { - rimraf(repo, cb) - }, (err) => { - repos = [] - done(err) - }) -} - -const repo = { - create: createRepo, - remove: removeRepos -} - -require('./index-test')(repo) -require('./components/decision-engine/index-test')(repo) -require('./components/network/network.node.js') -require('./components/network/gen-bitswap-network.node.js') +require('./bitswap.js') +require('./bitswap-mock-internals.js') +require('./decision-engine/decision-engine') +require('./network/network.node.js') +require('./network/gen-bitswap-network.node.js') diff --git a/test/notifications.spec.js b/test/notifications.spec.js new file mode 100644 index 00000000..10086230 --- /dev/null +++ b/test/notifications.spec.js @@ -0,0 +1,74 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const map = require('async/map') +const parallel = require('async/parallel') +const PeerId = require('peer-id') + +const Notifications = require('../src/notifications') + +const makeBlock = require('./utils/make-block') + +describe('Notifications', () => { + let blocks + let peerId + + before((done) => { + parallel([ + (cb) => map([0, 1, 2], (i, cb) => makeBlock(cb), (err, res) => { + expect(err).to.not.exist() + blocks = res + cb() + }), + (cb) => PeerId.create({bits: 1024}, (err, id) => { + expect(err).to.not.exist() + peerId = id + cb() + }) + ], done) + }) + + it('hasBlock', (done) => { + const n = new Notifications(peerId) + const b = blocks[0] + n.once(`block:${b.cid.buffer.toString()}`, (block) => { + expect(b).to.eql(block) + done() + }) + n.hasBlock(b) + }) + + describe('wantBlock', () => { + it('receive block', (done) => { + const n = new Notifications(peerId) + const b = blocks[0] + + n.wantBlock(b.cid, (block) => { + expect(b).to.eql(block) + + // check that internal cleanup works as expected + expect(Object.keys(n._blockListeners)).to.have.length(0) + expect(Object.keys(n._unwantListeners)).to.have.length(0) + done() + }, () => { + done(new Error('should never happen')) + }) + + n.hasBlock(b) + }) + + it('unwant block', (done) => { + const n = new Notifications() + const b = blocks[0] + + n.wantBlock(b.cid, () => { + done(new Error('should never happen')) + }, done) + + n.unwantBlock(b.cid) + }) + }) +}) diff --git a/test/types/message.spec.js b/test/types/message.spec.js index fc5add5f..f3b10890 100644 --- a/test/types/message.spec.js +++ b/test/types/message.spec.js @@ -10,23 +10,23 @@ const map = require('async/map') const CID = require('cids') const isNode = require('detect-node') const _ = require('lodash') - +const Buffer = require('safe-buffer').Buffer const loadFixture = require('aegir/fixtures') -const testDataPath = (isNode ? '../' : '') + 'test-data/serialized-from-go' +const testDataPath = (isNode ? '../' : '') + 'fixtures/serialized-from-go' const rawMessageFullWantlist = loadFixture(__dirname, testDataPath + '/bitswap110-message-full-wantlist') const rawMessageOneBlock = loadFixture(__dirname, testDataPath + '/bitswap110-message-one-block') const pbm = protobuf(require('../../src/types/message/message.proto')) const BitswapMessage = require('../../src/types/message') -const utils = require('../utils') +const makeBlock = require('../utils/make-block') describe('BitswapMessage', () => { let blocks let cids before((done) => { - map(_.range(3), (i, cb) => utils.makeBlock(cb), (err, res) => { + map(_.range(3), (i, cb) => makeBlock(cb), (err, res) => { expect(err).to.not.exist() blocks = res cids = blocks.map((b) => b.cid) @@ -243,7 +243,7 @@ describe('BitswapMessage', () => { describe('go interop', () => { it('bitswap 1.0.0 message', (done) => { - const goEncoded = new Buffer('CioKKAoiEiAs8k26X7CjDiboOyrFueKeGxYeXB+nQl5zBDNik4uYJBAKGAA=', 'base64') + const goEncoded = Buffer.from('CioKKAoiEiAs8k26X7CjDiboOyrFueKeGxYeXB+nQl5zBDNik4uYJBAKGAA=', 'base64') const msg = new BitswapMessage(false) const cid = new CID('QmRN6wdp1S2A5EtjW9A3M1vKSBuQQGcgvuhoMUoEz4iiT5') diff --git a/test/types/wantlist.spec.js b/test/types/wantlist.spec.js index 4050adb3..d98539f8 100644 --- a/test/types/wantlist.spec.js +++ b/test/types/wantlist.spec.js @@ -10,14 +10,14 @@ const _ = require('lodash') const multihashing = require('multihashing-async') const Wantlist = require('../../src/types/wantlist') -const utils = require('../utils') +const makeBlock = require('../utils/make-block') describe('Wantlist', () => { let wm let blocks before((done) => { - map(_.range(2), (i, cb) => utils.makeBlock(cb), (err, res) => { + map(_.range(2), (i, cb) => makeBlock(cb), (err, res) => { expect(err).to.not.exist() blocks = res done() diff --git a/test/utils/create-libp2p-node.js b/test/utils/create-libp2p-node.js new file mode 100644 index 00000000..5116b629 --- /dev/null +++ b/test/utils/create-libp2p-node.js @@ -0,0 +1,44 @@ +'use strict' + +const TCP = require('libp2p-tcp') +const Multiplex = require('libp2p-multiplex') +const SECIO = require('libp2p-secio') +const libp2p = require('libp2p') +const KadDHT = require('libp2p-kad-dht') +const waterfall = require('async/waterfall') +const PeerInfo = require('peer-info') +const PeerId = require('peer-id') + +class Node extends libp2p { + constructor (peerInfo, options) { + options = options || {} + + const modules = { + transport: [new TCP()], + connection: { + muxer: Multiplex, + crypto: SECIO + }, + DHT: options.DHT ? KadDHT : undefined + } + + super(modules, peerInfo, null, options.DHT || {}) + } +} + +function createLibp2pNode (options, callback) { + let node + + waterfall([ + (cb) => PeerId.create({bits: 1024}, cb), + (id, cb) => PeerInfo.create(id, cb), + (peerInfo, cb) => { + peerInfo.multiaddrs.add('/ip4/0.0.0.0/tcp/0') + node = new Node(peerInfo, options) + node.start(cb) + } + ], (err) => callback(err, node)) +} + +exports = module.exports = createLibp2pNode +exports.bundle = Node diff --git a/test/utils/create-temp-repo-browser.js b/test/utils/create-temp-repo-browser.js new file mode 100644 index 00000000..1a0c9338 --- /dev/null +++ b/test/utils/create-temp-repo-browser.js @@ -0,0 +1,35 @@ +/* global self */ +'use strict' + +const IPFSRepo = require('ipfs-repo') +const series = require('async/series') + +const idb = self.indexedDB || + self.mozIndexedDB || + self.webkitIndexedDB || + self.msIndexedDB + +function createTempRepo (callback) { + const date = Date.now().toString() + const path = `/bitswap-tests-${date}-${Math.random()}` + + const repo = new IPFSRepo(path) + + series([ + (cb) => repo.init({}, cb), + (cb) => repo.open(cb) + ], (err) => { + if (err) { + return callback(err) + } + repo.teardown = (callback) => { + idb.deleteDatabase(path) + idb.deleteDatabase(`${path}/blocks`) + callback() + } + + callback(null, repo) + }) +} + +module.exports = createTempRepo diff --git a/test/utils/create-temp-repo-nodejs.js b/test/utils/create-temp-repo-nodejs.js new file mode 100644 index 00000000..de34c326 --- /dev/null +++ b/test/utils/create-temp-repo-nodejs.js @@ -0,0 +1,28 @@ +'use strict' + +const IPFSRepo = require('ipfs-repo') +const path = require('path') +const ncp = require('ncp') +const rimraf = require('rimraf') + +const baseRepo = path.join(__dirname, '../fixtures/repo') + +function createTempRepo (callback) { + const date = Date.now().toString() + const path = `/tmp/bitswap-tests-${date}-${Math.random()}` + + ncp(baseRepo, path, (err) => { + if (err) { return callback(err) } + + const repo = new IPFSRepo(path) + + repo.teardown = (callback) => rimraf(path, callback) + + repo.open((err) => { + if (err) { return callback(err) } + callback(null, repo) + }) + }) +} + +module.exports = createTempRepo diff --git a/test/utils/helpers.js b/test/utils/helpers.js new file mode 100644 index 00000000..9e3d897e --- /dev/null +++ b/test/utils/helpers.js @@ -0,0 +1,18 @@ +'use strict' + +const _ = require('lodash') + +exports.orderedFinish = (n, callback) => { + const r = _.range(1, n + 1) + const finishs = [] + + return (i) => { + finishs.push(i) + if (finishs.length === n) { + if (!_.isEqual(r, finishs)) { + return callback(new Error('Invalid finish order: ' + finishs)) + } + callback() + } + } +} diff --git a/test/utils/make-block.js b/test/utils/make-block.js new file mode 100644 index 00000000..d125f880 --- /dev/null +++ b/test/utils/make-block.js @@ -0,0 +1,15 @@ +'use strict' + +const multihashing = require('multihashing-async') +const CID = require('cids') +const Block = require('ipfs-block') +const Buffer = require('safe-buffer').Buffer + +module.exports = (callback) => { + const data = Buffer.from(`hello world ${Math.random()}`) + + multihashing(data, 'sha2-256', (err, hash) => { + if (err) { return callback(err) } + callback(null, new Block(data, new CID(hash))) + }) +} diff --git a/test/utils.js b/test/utils/mocks.js similarity index 77% rename from test/utils.js rename to test/utils/mocks.js index 00b60d3d..eea8c36e 100644 --- a/test/utils.js +++ b/test/utils/mocks.js @@ -4,29 +4,56 @@ const each = require('async/each') const eachSeries = require('async/eachSeries') const map = require('async/map') const parallel = require('async/parallel') +const setImmediate = require('async/setImmediate') const series = require('async/series') const _ = require('lodash') const PeerId = require('peer-id') const PeerInfo = require('peer-info') -const Node = require('./libp2p-bundle') +const PeerBook = require('peer-book') +const Node = require('./create-libp2p-node').bundle const os = require('os') const Repo = require('ipfs-repo') -const multihashing = require('multihashing-async') -const CID = require('cids') -const Block = require('ipfs-block') -const Bitswap = require('../src') +const Bitswap = require('../../src') +/* + * Create a mock libp2p node + */ +exports.mockLibp2pNode = () => { + const peerInfo = new PeerInfo(PeerId.createFromHexString('122019318b6e5e0cf93a2314bf01269a2cc23cd3dcd452d742cdb9379d8646f6e4a9')) + + return { + peerInfo: peerInfo, + handle () {}, + contentRouting: { + provide: (cid, callback) => callback(), + findProviders: (cid, timeout, callback) => callback(null, []) + }, + on () {}, + dial (peer, protocol, callback) { + setImmediate(() => callback()) + }, + swarm: { + muxedConns: {}, + setMaxListeners () {} + }, + peerBook: new PeerBook() + } +} + +/* + * Create a mock network instance + */ exports.mockNetwork = (calls, done) => { done = done || (() => {}) + const connects = [] const messages = [] let i = 0 const finish = () => { - i++ - if (i === calls) { - done({connects, messages}) + if (++i === calls) { + done({ connects: connects, messages: messages }) } } @@ -44,12 +71,22 @@ exports.mockNetwork = (calls, done) => { finish() }) }, - start () { + start (callback) { + setImmediate(() => callback()) + }, + findAndConnect (cid, callback) { + setImmediate(() => callback()) + }, + provide (cid, callback) { + setImmediate(() => callback()) } } } -exports.createMockNet = (repo, count, cb) => { +/* + * Create a mock test network + */ +exports.createMockTestNet = (repo, count, cb) => { parallel([ (cb) => map(_.range(count), (i, cb) => repo.create(`repo-${i}`), cb), (cb) => map(_.range(count), (i, cb) => PeerId.create(cb), cb) @@ -185,20 +222,8 @@ exports.genBitswapNetwork = (n, callback) => { // callback with netArray function finish (err) { - if (err) { - throw err - } + if (err) { throw err } callback(null, netArray) } }) } - -exports.makeBlock = (cb) => { - const data = new Buffer(`hello world ${Math.random()}`) - multihashing(data, 'sha2-256', (err, hash) => { - if (err) { - return cb(err) - } - cb(null, new Block(data, new CID(hash))) - }) -} diff --git a/test/utils/store-has-blocks.js b/test/utils/store-has-blocks.js new file mode 100644 index 00000000..3533dc1c --- /dev/null +++ b/test/utils/store-has-blocks.js @@ -0,0 +1,19 @@ +'use strict' + +const each = require('async/each') + +function storeHasBlocks (message, store, callback) { + each(message.blocks.values(), (b, callback) => { + store.has(b.cid, (err, has) => { + if (err) { + return callback(err) + } + if (!has) { + return callback(new Error('missing block')) + } + callback() + }) + }, callback) +} + +module.exports = storeHasBlocks diff --git a/test/wantmanager/index.spec.js b/test/wantmanager/index.spec.js new file mode 100644 index 00000000..c40590f9 --- /dev/null +++ b/test/wantmanager/index.spec.js @@ -0,0 +1,101 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const PeerId = require('peer-id') +const parallel = require('async/parallel') +const series = require('async/series') +const map = require('async/map') +const _ = require('lodash') + +const cs = require('../../src/constants') +const Message = require('../../src/types/message') +const WantManager = require('../../src/want-manager') + +const mockNetwork = require('../utils/mocks').mockNetwork +const makeBlock = require('../utils/make-block') + +describe('WantManager', () => { + it('sends wantlist to all connected peers', function (done) { + this.timeout(80 * 1000) + + let cids + let blocks + + parallel([ + (cb) => PeerId.create(cb), + (cb) => PeerId.create(cb), + (cb) => PeerId.create(cb), + (cb) => { + map(_.range(3), (i, cb) => makeBlock(cb), (err, res) => { + expect(err).to.not.exist() + blocks = res + cids = blocks.map((b) => b.cid) + cb() + }) + } + ], (err, peerIds) => { + if (err) { + return done(err) + } + + const peer1 = peerIds[0] + const peer2 = peerIds[1] + const cid1 = cids[0] + const cid2 = cids[1] + const cid3 = cids[2] + + const m1 = new Message(true) + m1.addEntry(cid1, cs.kMaxPriority) + m1.addEntry(cid2, cs.kMaxPriority - 1) + + const m2 = new Message(false) + m2.cancel(cid2) + + const m3 = new Message(false) + m3.addEntry(cid3, cs.kMaxPriority) + + const msgs = [m1, m1, m2, m2, m3, m3] + + const network = mockNetwork(6, (calls) => { + expect(calls.connects).to.have.length(6) + expect(calls.messages).to.have.length(6) + + calls.messages.forEach((m, i) => { + expect(m[0]).to.be.eql(calls.connects[i]) + if (!m[1].equals(msgs[i])) { + return done( + new Error(`expected ${m[1].toString()} to equal ${msgs[1].toString()}`) + ) + } + }) + + done() + }) + + const wantManager = new WantManager(peerIds[2], network) + + wantManager.start((err) => { + expect(err).to.not.exist() + wantManager.wantBlocks([cid1, cid2]) + + wantManager.connected(peer1) + wantManager.connected(peer2) + + series([ + (cb) => setTimeout(cb, 200), + (cb) => { + wantManager.cancelWants([cid2]) + cb() + }, + (cb) => setTimeout(cb, 200) + ], (err) => { + expect(err).to.not.exist() + wantManager.wantBlocks([cid3]) + }) + }) + }) + }) +}) diff --git a/test/components/wantmanager/msg-queue.spec.js b/test/wantmanager/msg-queue.spec.js similarity index 75% rename from test/components/wantmanager/msg-queue.spec.js rename to test/wantmanager/msg-queue.spec.js index 05ff6499..a4186a3b 100644 --- a/test/components/wantmanager/msg-queue.spec.js +++ b/test/wantmanager/msg-queue.spec.js @@ -9,25 +9,24 @@ const map = require('async/map') const parallel = require('async/parallel') const CID = require('cids') const multihashing = require('multihashing-async') +const Buffer = require('safe-buffer').Buffer -const Message = require('../../../src/types/message') -const MsgQueue = require('../../../src/components/want-manager/msg-queue') +const Message = require('../../src/types/message') +const MsgQueue = require('../../src/want-manager/msg-queue') describe('MessageQueue', () => { - let peerId + let peerIds let cids before((done) => { parallel([ + (cb) => map([0, 1], (i, cb) => PeerId.create({bits: 1024}, cb), (err, res) => { + expect(err).to.not.exist() + peerIds = res + cb() + }), (cb) => { - PeerId.create((err, _peerId) => { - expect(err).to.not.exist() - peerId = _peerId - cb() - }) - }, - (cb) => { - const data = ['1', '2', '3', '4', '5', '6'].map((d) => new Buffer(d)) + const data = ['1', '2', '3', '4', '5', '6'].map((d) => Buffer.from(d)) map(data, (d, cb) => multihashing(d, 'sha2-256', cb), (err, hashes) => { expect(err).to.not.exist() cids = hashes.map((h) => new CID(h)) @@ -56,7 +55,7 @@ describe('MessageQueue', () => { const finish = () => { i++ if (i === 2) { - expect(connects).to.be.eql([peerId, peerId]) + expect(connects).to.be.eql([peerIds[1], peerIds[1]]) const m1 = new Message(false) m1.addEntry(cid3, 1) @@ -67,8 +66,8 @@ describe('MessageQueue', () => { expect( messages ).to.be.eql([ - [peerId, msg], - [peerId, m1] + [peerIds[1], msg], + [peerIds[1], m1] ]) done() @@ -87,7 +86,7 @@ describe('MessageQueue', () => { } } - const mq = new MsgQueue(peerId, network) + const mq = new MsgQueue(peerIds[0], peerIds[1], network) expect(mq.refcnt).to.equal(1)