Skip to content

Commit

Permalink
refactor: migrate to pull-streams
Browse files Browse the repository at this point in the history
- pull-streams
- remove highland
- update deps
  • Loading branch information
dignifiedquire committed Sep 9, 2016
1 parent b77cb11 commit f3dfceb
Show file tree
Hide file tree
Showing 16 changed files with 625 additions and 492 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ sudo: false
language: node_js
node_js:
- 4
- 5
- stable

# Make sure we have new NPM.
before_install:
Expand Down
53 changes: 31 additions & 22 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,52 +6,61 @@

- `id: PeerId`, the id of the local instance.
- `libp2p: Libp2p`, instance of the local network stack.
- `datastore: Datastore`, instance of the local database (`IpfsRepo.datastore`)
- `blockstore: Datastore`, instance of the local database (`IpfsRepo.blockstore`)

Create a new instance.

### `getBlock(key, cb)`

- `key: Multihash`
- `cb: Function`
### `getStream(key)`

Fetch a single block.
- `key: Multihash|Array`

> Note: This is safe guarded so that the network is not asked
> for blocks that are in the local `datastore`.
Returns a source `pull-stream`. Values emitted are the received blocks.

### `getBlocks(keys, cb)`

- `keys: []Multihash`
- `cb: Function`
Example:

Fetch multiple blocks. The `cb` is called with a result object of the form
```js
{
[key1]: {error: errorOrUndefined, block: blockOrUndefined},
[key2]: {error: errorOrUndefined, block: blockOrUndefined},
...
}
// Single block
pull(
bitswap.getStream(key),
pull.collect((err, blocks) => {
// blocks === [block]
})
)

// Many blocks
pull(
bitswap.getStream([key1, key2, key3]),
pull.collect((err, blocks) => {
// blocks === [block1, block2, block3]
})
)
```

Where `key<i>` is the multihash of the block.

### `unwantBlocks(keys)`
> Note: This is safe guarded so that the network is not asked
> for blocks that are in the local `datastore`.

- `keys: []Multihash`
### `unwant(keys)`

- `keys: Mutlihash|[]Multihash`

Cancel previously requested keys, forcefully. That means they are removed from the
wantlist independent of how many other resources requested these keys. Callbacks
attached to `getBlock` are errored with `Error('manual unwant: key')`.

### `cancelWants(keys)`

- `keys: []Multihash`
- `keys: Multihash|[]Multihash`

Cancel previously requested keys.

### `putStream()`

Returns a duplex `pull-stream` that emits an object `{key: Multihash}` for every written block when it was stored.

### `hasBlock(block, cb)`
### `put(block, cb)`

- `block: IpfsBlock`
- `cb: Function`
Expand Down
31 changes: 17 additions & 14 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,40 +33,43 @@
},
"homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme",
"devDependencies": {
"abstract-blob-store": "^3.2.0",
"aegir": "^8.0.1",
"buffer-loader": "0.0.1",
"chai": "^3.5.0",
"fs-blob-store": "^5.2.1",
"idb-plus-blob-store": "^1.1.2",
"ipfs-repo": "^0.8.0",
"libp2p-ipfs": "^0.12.0",
"lodash": "^4.13.1",
"fs-pull-blob-store": "^0.3.0",
"idb-pull-blob-store": "^0.4.0",
"interface-pull-blob-store": "^0.5.0",
"ipfs-repo": "^0.9.0",
"libp2p-ipfs": "^0.13.0",
"lodash": "^4.15.0",
"multiaddr": "^2.0.3",
"ncp": "^2.0.0",
"peer-book": "^0.3.0",
"peer-id": "^0.7.0",
"peer-info": "^0.7.1",
"rimraf": "^2.5.2",
"rimraf": "^2.5.4",
"safe-buffer": "^5.0.1"
},
"dependencies": {
"async": "^2.0.0-rc.5",
"bl": "^1.1.2",
"async": "^2.0.1",
"debug": "^2.2.0",
"heap": "^0.2.6",
"highland": "^3.0.0-beta.1",
"ipfs-block": "^0.3.0",
"length-prefixed-stream": "^1.5.0",
"lodash.isequalwith": "^4.2.0",
"lodash.isequalwith": "^4.4.0",
"lodash.isundefined": "^3.0.1",
"multihashes": "^0.2.2",
"protocol-buffers": "^3.1.6"
"protocol-buffers": "^3.1.6",
"pull-defer": "^0.2.2",
"pull-generate": "^2.2.0",
"pull-length-prefixed": "^1.2.0",
"pull-paramap": "^1.1.6",
"pull-pushable": "^2.0.1",
"pull-stream": "^3.4.5"
},
"contributors": [
"David Dias <daviddias.p@gmail.com>",
"Richard Littauer <richard.littauer@gmail.com>",
"Stephen Whitmore <stephen.whitmore@gmail.com>",
"dignifiedquire <dignifiedquire@gmail.com>"
]
}
}
94 changes: 53 additions & 41 deletions src/decision/engine.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
'use strict'

const debug = require('debug')
const _ = require('highland')
const async = require('async')
const mh = require('multihashes')
const pull = require('pull-stream')
const generate = require('pull-generate')

const log = debug('bitswap:engine')
log.error = debug('bitswap:engine:error')
Expand All @@ -14,8 +14,8 @@ const PeerRequestQueue = require('./peer-request-queue')
const Ledger = require('./ledger')

module.exports = class Engine {
constructor (datastore, network) {
this.datastore = datastore
constructor (blockstore, network) {
this.blockstore = blockstore
this.network = network

// A list of of ledgers by their partner id
Expand Down Expand Up @@ -45,34 +45,43 @@ module.exports = class Engine {
_outbox () {
if (!this._running) return

const doIt = (cb) => {
_((push, next) => {
if (!this._running) return push(null, _.nil)
const nextTask = this.peerRequestQueue.pop()
const doIt = (cb) => pull(
generate(null, (state, cb) => {
log('generating', this._running)
if (!this._running) {
return cb(true)
}

if (!nextTask) return push(null, _.nil)
const nextTask = this.peerRequestQueue.pop()
log('got task', nextTask)
if (!nextTask) {
return cb(true)
}

this.datastore.get(nextTask.entry.key, (err, block) => {
if (err || !block) {
nextTask.done()
} else {
push(null, {
pull(
this.blockstore.getStream(nextTask.entry.key),
pull.collect((err, blocks) => {
log('generated', blocks)
const block = blocks[0]
if (err || !block) {
nextTask.done()
return cb(null, false)
}

cb(null, {
peer: nextTask.target,
block: block,
sent: () => {
nextTask.done()
}
})
}

next()
})
})
.flatMap((envelope) => {
return _.wrapCallback(this._sendBlock.bind(this))(envelope)
})
.done(cb)
}
})
)
}),
pull.filter(Boolean),
pull.asyncMap(this._sendBlock.bind(this)),
pull.onEnd(cb)
)

if (!this._timer) {
this._timer = setTimeout(() => {
Expand All @@ -97,40 +106,43 @@ module.exports = class Engine {

// Handle incoming messages
messageReceived (peerId, msg, cb) {
const ledger = this._findOrCreate(peerId)

if (msg.empty) {
log('received empty message from %s', peerId.toB58String())
return cb()
}

const ledger = this._findOrCreate(peerId)

// If the message was a full wantlist clear the current one
if (msg.full) {
ledger.wantlist = new Wantlist()
}

this._processBlocks(msg.blocks, ledger)
log('wantlist', Array.from(msg.wantlist.values()).map((e) => e.toString()))
async.eachSeries(
msg.wantlist.values(),
this._processWantlist.bind(this, ledger, peerId),
(err) => {
const done = (err) => async.setImmediate(() => cb(err))
if (err) return done(err)

pull(
pull.values(Array.from(msg.wantlist.values())),
pull.asyncMap((entry, cb) => {
this._processWantlist(ledger, peerId, entry, cb)
}),
pull.onEnd((err) => {
if (err) return cb(err)
this._outbox()
done()
}
cb()
})
)
}

receivedBlock (block) {
this._processBlock(block)
receivedBlock (key) {
this._processBlock(key)
this._outbox()
}

_processBlock (block) {
_processBlock (key) {
// Check all connected peers if they want the block we received
for (let l of this.ledgerMap.values()) {
const entry = l.wantlistContains(block.key)
const entry = l.wantlistContains(key)

if (entry) {
this.peerRequestQueue.push(entry, l.partner)
Expand All @@ -143,13 +155,13 @@ module.exports = class Engine {
log('cancel %s', mh.toB58String(entry.key))
ledger.cancelWant(entry.key)
this.peerRequestQueue.remove(entry.key, peerId)
async.setImmediate(() => cb())
setImmediate(() => cb())
} else {
log('wants %s - %s', mh.toB58String(entry.key), entry.priority)
ledger.wants(entry.key, entry.priority)

// If we already have the block, serve it
this.datastore.has(entry.key, (err, exists) => {
this.blockstore.has(entry.key, (err, exists) => {
if (err) {
log('failed existence check %s', mh.toB58String(entry.key))
} else if (exists) {
Expand All @@ -166,7 +178,7 @@ module.exports = class Engine {
log('got block %s (%s bytes)', mh.toB58String(block.key), block.data.length)
ledger.receivedBytes(block.data.length)

this.receivedBlock(block)
this.receivedBlock(block.key)
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/decision/peer-request-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,14 @@ function taskKey (peerId, key) {

function partnerCompare (a, b) {
// having no blocks in their wantlist means lowest priority
// having both of these checks ensures stability of the sort
// having both of these checks ensures stability of the sort
if (a.requests === 0) return false
if (b.requests === 0) return true

if (a.active === b.active) {
// sorting by taskQueue.size() aids in cleaning out trash entries faster
// if we sorted instead by requests, one peer could potentially build up
// a huge number of cancelled entries in the queue resulting in a memory leak
// if we sorted instead by requests, one peer could potentially build up
// a huge number of cancelled entries in the queue resulting in a memory leak
return a.taskQueue.size() > b.taskQueue.size()
}

Expand Down
Loading

0 comments on commit f3dfceb

Please sign in to comment.