From d12086e0447ffb827a165b04c07a6b147c55a5a7 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Mon, 12 Sep 2016 11:22:16 +0200 Subject: [PATCH] fix(blockstore): lock getStream to avoid race issues --- package.json | 4 ++-- src/stores/blockstore.js | 41 +++++++++++++++++++++++++--------------- test/blockstore-test.js | 2 +- 3 files changed, 29 insertions(+), 18 deletions(-) diff --git a/package.json b/package.json index 4d49e762..96e4cb4e 100644 --- a/package.json +++ b/package.json @@ -47,8 +47,8 @@ "ipfs-block": "^0.3.0", "lock": "^0.1.3", "multihashes": "^0.2.2", + "pull-defer": "^0.2.2", "pull-stream": "^3.4.5", - "pull-through": "^1.0.18", "pull-write": "^1.1.0", "run-parallel": "^1.1.6", "run-series": "^1.1.4", @@ -66,4 +66,4 @@ "greenkeeperio-bot ", "nginnever " ] -} \ No newline at end of file +} diff --git a/src/stores/blockstore.js b/src/stores/blockstore.js index 1b13af36..21312a9e 100644 --- a/src/stores/blockstore.js +++ b/src/stores/blockstore.js @@ -7,7 +7,7 @@ const base32 = require('base32.js') const path = require('path') const write = require('pull-write') const parallel = require('run-parallel') -const through = require('pull-through') +const defer = require('pull-defer/source') const PREFIX_LENGTH = 5 @@ -52,19 +52,25 @@ exports.setUp = (basePath, BlobStore, locks) => { } const p = multihashToPath(key, extension) + const deferred = defer() + + lock(p, (release) => { + const ext = extension === 'data' ? 'protobuf' : extension + pull( + store.read(p), + pull.collect(release((err, data) => { + if (err) { + return deferred.abort(err) + } + + deferred.resolve(pull.values([ + new Block(Buffer.concat(data), ext) + ])) + })) + ) + }) - const ext = extension === 'data' ? 'protobuf' : extension - let data = [] - - return pull( - store.read(p), - through(function (values) { - data = data.concat(values) - }, function () { - this.queue(new Block(Buffer.concat(data), ext)) - this.queue(null) - }) - ) + return deferred }, putStream () { @@ -75,7 +81,10 @@ exports.setUp = (basePath, BlobStore, locks) => { const sink = write((blocks, cb) => { parallel(blocks.map((block) => (cb) => { writeBlock(block, (err, meta) => { - if (err) return cb(err) + if (err) { + return cb(err) + } + if (push) { const read = push push = null @@ -94,7 +103,9 @@ exports.setUp = (basePath, BlobStore, locks) => { const source = (end, cb) => { if (end) ended = end - if (ended) return cb(ended) + if (ended) { + return cb(ended) + } if (written.length) { return cb(null, written.shift()) diff --git a/test/blockstore-test.js b/test/blockstore-test.js index eb801369..2814610b 100644 --- a/test/blockstore-test.js +++ b/test/blockstore-test.js @@ -109,7 +109,7 @@ module.exports = (repo) => { it('massive read', (done) => { parallel(_.range(20 * 100).map((i) => (cb) => { - const j = i % 20 + const j = i % blockCollection.length pull( repo.blockstore.getStream(blockCollection[j].key), pull.collect((err, meta) => {