Skip to content

Commit

Permalink
Merge pull request #85 from ipfs/lock
Browse files Browse the repository at this point in the history
fix(blockstore: lock getStream to avoid race issues
  • Loading branch information
daviddias authored Sep 12, 2016
2 parents 477d6e5 + d12086e commit 96b1f95
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 18 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
"ipfs-block": "^0.3.0",
"lock": "^0.1.3",
"multihashes": "^0.2.2",
"pull-defer": "^0.2.2",
"pull-stream": "^3.4.5",
"pull-through": "^1.0.18",
"pull-write": "^1.1.0",
"run-parallel": "^1.1.6",
"run-series": "^1.1.4",
Expand All @@ -66,4 +66,4 @@
"greenkeeperio-bot <support@greenkeeper.io>",
"nginnever <ginneversource@gmail.com>"
]
}
}
41 changes: 26 additions & 15 deletions src/stores/blockstore.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const base32 = require('base32.js')
const path = require('path')
const write = require('pull-write')
const parallel = require('run-parallel')
const through = require('pull-through')
const defer = require('pull-defer/source')

const PREFIX_LENGTH = 5

Expand Down Expand Up @@ -52,19 +52,25 @@ exports.setUp = (basePath, BlobStore, locks) => {
}

const p = multihashToPath(key, extension)
const deferred = defer()

lock(p, (release) => {
const ext = extension === 'data' ? 'protobuf' : extension
pull(
store.read(p),
pull.collect(release((err, data) => {
if (err) {
return deferred.abort(err)
}

deferred.resolve(pull.values([
new Block(Buffer.concat(data), ext)
]))
}))
)
})

const ext = extension === 'data' ? 'protobuf' : extension
let data = []

return pull(
store.read(p),
through(function (values) {
data = data.concat(values)
}, function () {
this.queue(new Block(Buffer.concat(data), ext))
this.queue(null)
})
)
return deferred
},

putStream () {
Expand All @@ -75,7 +81,10 @@ exports.setUp = (basePath, BlobStore, locks) => {
const sink = write((blocks, cb) => {
parallel(blocks.map((block) => (cb) => {
writeBlock(block, (err, meta) => {
if (err) return cb(err)
if (err) {
return cb(err)
}

if (push) {
const read = push
push = null
Expand All @@ -94,7 +103,9 @@ exports.setUp = (basePath, BlobStore, locks) => {

const source = (end, cb) => {
if (end) ended = end
if (ended) return cb(ended)
if (ended) {
return cb(ended)
}

if (written.length) {
return cb(null, written.shift())
Expand Down
2 changes: 1 addition & 1 deletion test/blockstore-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ module.exports = (repo) => {

it('massive read', (done) => {
parallel(_.range(20 * 100).map((i) => (cb) => {
const j = i % 20
const j = i % blockCollection.length
pull(
repo.blockstore.getStream(blockCollection[j].key),
pull.collect((err, meta) => {
Expand Down

0 comments on commit 96b1f95

Please sign in to comment.