Skip to content

Commit

Permalink
Merge pull request #89 from ipfs/awesome-ipld
Browse files Browse the repository at this point in the history
Awesome IPLD Endeavour
  • Loading branch information
daviddias authored Oct 26, 2016
2 parents 6e39eae + f7e4047 commit 32e8fca
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 150 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
"dependencies": {
"babel-runtime": "^6.11.6",
"base32.js": "^0.1.0",
"ipfs-block": "^0.3.0",
"ipfs-block": "^0.4.0",
"lock": "^0.1.3",
"multihashes": "^0.2.2",
"pull-defer": "^0.2.2",
Expand All @@ -67,4 +67,4 @@
"nginnever <ginneversource@gmail.com>",
"npmcdn-to-unpkg-bot <npmcdn-to-unpkg-bot@users.noreply.github.com>"
]
}
}
186 changes: 113 additions & 73 deletions src/stores/blockstore.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
'use strict'

const Block = require('ipfs-block')
const pull = require('pull-stream')
const Lock = require('lock')
const base32 = require('base32.js')
const path = require('path')
const write = require('pull-write')
const parallel = require('run-parallel')
const defer = require('pull-defer/source')
const pull = require('pull-stream')
const pullWrite = require('pull-write')
const pullDefer = require('pull-defer/source')

const PREFIX_LENGTH = 5
const EXTENSION = 'data'

exports = module.exports

function multihashToPath (multihash, extension) {
extension = extension || 'data'
function multihashToPath (multihash) {
const encoder = new base32.Encoder()
const hash = encoder.write(multihash).finalize()
const filename = `${hash}.${extension}`
const filename = `${hash}.${EXTENSION}`
const folder = filename.slice(0, PREFIX_LENGTH)

return path.join(folder, filename)
Expand All @@ -27,82 +27,103 @@ exports.setUp = (basePath, BlobStore, locks) => {
const store = new BlobStore(basePath + '/blocks')
const lock = new Lock()

function writeBlock (block, cb) {
if (!block || !block.data) {
return cb(new Error('Invalid block'))
// blockBlob is an object with:
// { data: <>, key: <> }
function writeBlock (blockBlob, callback) {
if (!blockBlob || !blockBlob.data) {
return callback(new Error('Invalid block'))
}

const key = multihashToPath(block.key, block.extension)

lock(key, (release) => pull(
pull.values([block.data]),
store.write(key, release((err) => {
if (err) {
return cb(err)
}
cb(null, {key})
}))
))
const key = multihashToPath(blockBlob.key)

lock(key, (release) => {
pull(
pull.values([
blockBlob.data
]),
store.write(key, release(released))
)
})

// called once the lock is released
function released (err) {
if (err) {
return callback(err)
}
callback(null, { key: key })
}
}

return {
getStream (key, extension) {
// returns a pull-stream of one block being read
getStream (key) {
if (!key) {
return pull.error(new Error('Invalid key'))
}

const p = multihashToPath(key, extension)
const deferred = defer()
const blockPath = multihashToPath(key)
const deferred = pullDefer()

lock(p, (release) => {
const ext = extension === 'data' ? 'protobuf' : extension
lock(blockPath, (release) => {
pull(
store.read(p),
pull.collect(release((err, data) => {
if (err) {
return deferred.abort(err)
}

deferred.resolve(pull.values([
new Block(Buffer.concat(data), ext)
]))
}))
store.read(blockPath),
pull.collect(release(released))
)
})

function released (err, data) {
if (err) {
return deferred.abort(err)
}

deferred.resolve(
pull.values([
new Block(Buffer.concat(data))
])
)
}

return deferred
},

/*
* putStream - write multiple blocks
*
* returns a pull-stream that expects blockBlobs
*
* NOTE: blockBlob is a { data: <>, key: <> } and not a
* ipfs-block instance. This is because Block instances support
* several types of hashing and it is up to the BlockService
* to understand the right one to use (given the CID)
*/
// TODO
// consider using a more explicit name, this can cause some confusion
// since the natural association is
// getStream - createReadStream - read one
// putStream - createWriteStream - write one
// where in fact it is:
// getStream - createReadStream - read one (the same)
// putStream - createFilesWriteStream = write several
//
putStream () {
let ended = false
let written = []
let push = null

const sink = write((blocks, cb) => {
parallel(blocks.map((block) => (cb) => {
writeBlock(block, (err, meta) => {
if (err) {
return cb(err)
}

if (push) {
const read = push
push = null
read(null, meta)
return cb()
}

written.push(meta)
cb()
})
}), cb)
const sink = pullWrite((blockBlobs, cb) => {
const tasks = writeTasks(blockBlobs)
parallel(tasks, cb)
}, null, 100, (err) => {
ended = err || true
if (push) push(ended)
if (push) {
push(ended)
}
})

const source = (end, cb) => {
if (end) ended = end
if (end) {
ended = end
}
if (ended) {
return cb(ended)
}
Expand All @@ -114,35 +135,54 @@ exports.setUp = (basePath, BlobStore, locks) => {
push = cb
}

return {source, sink}
},
/*
* Creates individual tasks to write each block blob that can be
* exectured in parallel
*/
function writeTasks (blockBlobs) {
return blockBlobs.map((blockBlob) => {
return (cb) => {
writeBlock(blockBlob, (err, meta) => {
if (err) {
return cb(err)
}

if (push) {
const read = push
push = null
read(null, meta)
return cb()
}

written.push(meta)
cb()
})
}
})
}

has (key, extension, cb) {
if (typeof extension === 'function') {
cb = extension
extension = undefined
return {
source: source,
sink: sink
}
},

has (key, callback) {
if (!key) {
return cb(new Error('Invalid key'))
return callback(new Error('Invalid key'))
}

const p = multihashToPath(key, extension)
store.exists(p, cb)
const blockPath = multihashToPath(key)
store.exists(blockPath, callback)
},

delete (key, extension, cb) {
if (typeof extension === 'function') {
cb = extension
extension = undefined
}

delete (key, callback) {
if (!key) {
return cb(new Error('Invalid key'))
return callback(new Error('Invalid key'))
}

const p = multihashToPath(key, extension)
store.remove(p, cb)
const blockPath = multihashToPath(key)
store.remove(blockPath, callback)
}
}
}
34 changes: 22 additions & 12 deletions src/stores/locks.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@ exports.setUp = (basePath, BlobStore) => {
lock (callback) {
function createLock () {
pull(
pull.values([new Buffer('LOCK')]),
pull.values([
new Buffer('LOCK')
]),
store.write(lockFile, callback)
)
}

function doesExist (err, exists) {
if (err) return callback(err)
if (err) {
return callback(err)
}

if (exists) {
// default 100ms
Expand All @@ -37,16 +41,22 @@ exports.setUp = (basePath, BlobStore) => {

unlock (callback) {
series([
(cb) => store.remove(lockFile, cb),
(cb) => store.exists(lockFile, (err, exists) => {
if (err) return cb(err)

if (exists) {
return cb(new Error('failed to remove lock'))
}

cb()
})
(cb) => {
store.remove(lockFile, cb)
},
(cb) => {
store.exists(lockFile, (err, exists) => {
if (err) {
return cb(err)
}

if (exists) {
return cb(new Error('failed to remove lock'))
}

cb()
})
}
], callback)
}
}
Expand Down
Loading

0 comments on commit 32e8fca

Please sign in to comment.