Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Awesome IPLD Endeavour #89

Merged
merged 2 commits into from
Oct 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
"dependencies": {
"babel-runtime": "^6.11.6",
"base32.js": "^0.1.0",
"ipfs-block": "^0.3.0",
"ipfs-block": "^0.4.0",
"lock": "^0.1.3",
"multihashes": "^0.2.2",
"pull-defer": "^0.2.2",
Expand All @@ -67,4 +67,4 @@
"nginnever <ginneversource@gmail.com>",
"npmcdn-to-unpkg-bot <npmcdn-to-unpkg-bot@users.noreply.github.com>"
]
}
}
186 changes: 113 additions & 73 deletions src/stores/blockstore.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
'use strict'

const Block = require('ipfs-block')
const pull = require('pull-stream')
const Lock = require('lock')
const base32 = require('base32.js')
const path = require('path')
const write = require('pull-write')
const parallel = require('run-parallel')
const defer = require('pull-defer/source')
const pull = require('pull-stream')
const pullWrite = require('pull-write')
const pullDefer = require('pull-defer/source')

const PREFIX_LENGTH = 5
const EXTENSION = 'data'

exports = module.exports

function multihashToPath (multihash, extension) {
extension = extension || 'data'
function multihashToPath (multihash) {
const encoder = new base32.Encoder()
const hash = encoder.write(multihash).finalize()
const filename = `${hash}.${extension}`
const filename = `${hash}.${EXTENSION}`
const folder = filename.slice(0, PREFIX_LENGTH)

return path.join(folder, filename)
Expand All @@ -27,82 +27,103 @@ exports.setUp = (basePath, BlobStore, locks) => {
const store = new BlobStore(basePath + '/blocks')
const lock = new Lock()

function writeBlock (block, cb) {
if (!block || !block.data) {
return cb(new Error('Invalid block'))
// blockBlob is an object with:
// { data: <>, key: <> }
function writeBlock (blockBlob, callback) {
if (!blockBlob || !blockBlob.data) {
return callback(new Error('Invalid block'))
}

const key = multihashToPath(block.key, block.extension)

lock(key, (release) => pull(
pull.values([block.data]),
store.write(key, release((err) => {
if (err) {
return cb(err)
}
cb(null, {key})
}))
))
const key = multihashToPath(blockBlob.key)

lock(key, (release) => {
pull(
pull.values([
blockBlob.data
]),
store.write(key, release(released))
)
})

// called once the lock is released
function released (err) {
if (err) {
return callback(err)
}
callback(null, { key: key })
}
}

return {
getStream (key, extension) {
// returns a pull-stream of one block being read
getStream (key) {
if (!key) {
return pull.error(new Error('Invalid key'))
}

const p = multihashToPath(key, extension)
const deferred = defer()
const blockPath = multihashToPath(key)
const deferred = pullDefer()

lock(p, (release) => {
const ext = extension === 'data' ? 'protobuf' : extension
lock(blockPath, (release) => {
pull(
store.read(p),
pull.collect(release((err, data) => {
if (err) {
return deferred.abort(err)
}

deferred.resolve(pull.values([
new Block(Buffer.concat(data), ext)
]))
}))
store.read(blockPath),
pull.collect(release(released))
)
})

function released (err, data) {
if (err) {
return deferred.abort(err)
}

deferred.resolve(
pull.values([
new Block(Buffer.concat(data))
])
)
}

return deferred
},

/*
* putStream - write multiple blocks
*
* returns a pull-stream that expects blockBlobs
*
* NOTE: blockBlob is a { data: <>, key: <> } and not a
* ipfs-block instance. This is because Block instances support
* several types of hashing and it is up to the BlockService
* to understand the right one to use (given the CID)
*/
// TODO
// consider using a more explicit name, this can cause some confusion
// since the natural association is
// getStream - createReadStream - read one
// putStream - createWriteStream - write one
// where in fact it is:
// getStream - createReadStream - read one (the same)
// putStream - createFilesWriteStream = write several
//
putStream () {
let ended = false
let written = []
let push = null

const sink = write((blocks, cb) => {
parallel(blocks.map((block) => (cb) => {
writeBlock(block, (err, meta) => {
if (err) {
return cb(err)
}

if (push) {
const read = push
push = null
read(null, meta)
return cb()
}

written.push(meta)
cb()
})
}), cb)
const sink = pullWrite((blockBlobs, cb) => {
const tasks = writeTasks(blockBlobs)
parallel(tasks, cb)
}, null, 100, (err) => {
ended = err || true
if (push) push(ended)
if (push) {
push(ended)
}
})

const source = (end, cb) => {
if (end) ended = end
if (end) {
ended = end
}
if (ended) {
return cb(ended)
}
Expand All @@ -114,35 +135,54 @@ exports.setUp = (basePath, BlobStore, locks) => {
push = cb
}

return {source, sink}
},
/*
* Creates individual tasks to write each block blob that can be
* exectured in parallel
*/
function writeTasks (blockBlobs) {
return blockBlobs.map((blockBlob) => {
return (cb) => {
writeBlock(blockBlob, (err, meta) => {
if (err) {
return cb(err)
}

if (push) {
const read = push
push = null
read(null, meta)
return cb()
}

written.push(meta)
cb()
})
}
})
}

has (key, extension, cb) {
if (typeof extension === 'function') {
cb = extension
extension = undefined
return {
source: source,
sink: sink
}
},

has (key, callback) {
if (!key) {
return cb(new Error('Invalid key'))
return callback(new Error('Invalid key'))
}

const p = multihashToPath(key, extension)
store.exists(p, cb)
const blockPath = multihashToPath(key)
store.exists(blockPath, callback)
},

delete (key, extension, cb) {
if (typeof extension === 'function') {
cb = extension
extension = undefined
}

delete (key, callback) {
if (!key) {
return cb(new Error('Invalid key'))
return callback(new Error('Invalid key'))
}

const p = multihashToPath(key, extension)
store.remove(p, cb)
const blockPath = multihashToPath(key)
store.remove(blockPath, callback)
}
}
}
34 changes: 22 additions & 12 deletions src/stores/locks.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@ exports.setUp = (basePath, BlobStore) => {
lock (callback) {
function createLock () {
pull(
pull.values([new Buffer('LOCK')]),
pull.values([
new Buffer('LOCK')
]),
store.write(lockFile, callback)
)
}

function doesExist (err, exists) {
if (err) return callback(err)
if (err) {
return callback(err)
}

if (exists) {
// default 100ms
Expand All @@ -37,16 +41,22 @@ exports.setUp = (basePath, BlobStore) => {

unlock (callback) {
series([
(cb) => store.remove(lockFile, cb),
(cb) => store.exists(lockFile, (err, exists) => {
if (err) return cb(err)

if (exists) {
return cb(new Error('failed to remove lock'))
}

cb()
})
(cb) => {
store.remove(lockFile, cb)
},
(cb) => {
store.exists(lockFile, (err, exists) => {
if (err) {
return cb(err)
}

if (exists) {
return cb(new Error('failed to remove lock'))
}

cb()
})
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👎 this change makes it harder to read, and is a matter of personal preference

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can agree on that for line 44, however, for line 44, the left (previous) is way harder to read because the way the 3 functions are nested is not entirely clear.

], callback)
}
}
Expand Down
Loading