Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

fix: race condition causing Database is not open error #1834

Merged
merged 1 commit into from
Jan 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/cli/bin.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ if (args[0] === 'daemon' || args[0] === 'init') {
.completion()
.command(require('./commands/daemon'))
.command(require('./commands/init'))
.parse(args)

new YargsPromise(cli).parse(args)
.then(({ data }) => {
if (data) print(data)
})
} else {
// here we have to make a separate yargs instance with
// only the `api` option because we need this before doing
Expand Down
129 changes: 66 additions & 63 deletions src/cli/commands/add.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,56 @@

const sortBy = require('lodash/sortBy')
const pull = require('pull-stream')
const getFolderSize = require('get-folder-size')
const promisify = require('promisify-es6')
const getFolderSize = promisify(require('get-folder-size'))
const byteman = require('byteman')
const reduce = require('async/reduce')
const mh = require('multihashes')
const multibase = require('multibase')
const toPull = require('stream-to-pull-stream')
const { print, isDaemonOn, createProgressBar } = require('../utils')
const { cidToString } = require('../../utils/cid')
const globSource = require('../../utils/files/glob-source')

function getTotalBytes (paths, cb) {
reduce(paths, 0, (total, path, cb) => {
getFolderSize(path, (err, size) => {
if (err) return cb(err)
cb(null, total + size)
})
}, cb)
async function getTotalBytes (paths, cb) {
const sizes = await Promise.all(paths.map(p => getFolderSize(p)))
return sizes.reduce((total, size) => total + size, 0)
}

function addPipeline (source, addStream, options) {
pull(
source,
addStream,
pull.collect((err, added) => {
if (err) {
// Tweak the error message and add more relevant infor for the CLI
if (err.code === 'ERR_DIR_NON_RECURSIVE') {
err.message = `'${err.path}' is a directory, use the '-r' flag to specify directories`
return new Promise((resolve, reject) => {
pull(
source,
addStream,
pull.collect((err, added) => {
if (err) {
// Tweak the error message and add more relevant infor for the CLI
if (err.code === 'ERR_DIR_NON_RECURSIVE') {
err.message = `'${err.path}' is a directory, use the '-r' flag to specify directories`
}
return reject(err)
}

if (options.silent) return resolve()

if (options.quieter) {
print(added.pop().hash)
return resolve()
}
throw err
}

if (options.silent) return
if (options.quieter) return print(added.pop().hash)

sortBy(added, 'path')
.reverse()
.map((file) => {
const log = options.quiet ? [] : ['added']
log.push(cidToString(file.hash, { base: options.cidBase }))
if (!options.quiet && file.path.length > 0) log.push(file.path)
return log.join(' ')
})
.forEach((msg) => print(msg))
})
)
sortBy(added, 'path')
.reverse()
.map((file) => {
const log = options.quiet ? [] : ['added']
log.push(cidToString(file.hash, { base: options.cidBase }))
if (!options.quiet && file.path.length > 0) log.push(file.path)
return log.join(' ')
})
.forEach((msg) => print(msg))

resolve()
})
)
})
}

module.exports = {
Expand Down Expand Up @@ -140,46 +144,45 @@ module.exports = {
},

handler (argv) {
const { ipfs } = argv
const options = {
strategy: argv.trickle ? 'trickle' : 'balanced',
shardSplitThreshold: argv.enableShardingExperiment
? argv.shardSplitThreshold
: Infinity,
cidVersion: argv.cidVersion,
rawLeaves: argv.rawLeaves,
onlyHash: argv.onlyHash,
hashAlg: argv.hash,
wrapWithDirectory: argv.wrapWithDirectory,
pin: argv.pin,
chunker: argv.chunker
}

if (options.enableShardingExperiment && isDaemonOn()) {
throw new Error('Error: Enabling the sharding experiment should be done on the daemon')
}
argv.resolve((async () => {
const { ipfs } = argv
const options = {
strategy: argv.trickle ? 'trickle' : 'balanced',
shardSplitThreshold: argv.enableShardingExperiment
? argv.shardSplitThreshold
: Infinity,
cidVersion: argv.cidVersion,
rawLeaves: argv.rawLeaves,
onlyHash: argv.onlyHash,
hashAlg: argv.hash,
wrapWithDirectory: argv.wrapWithDirectory,
pin: argv.pin,
chunker: argv.chunker
}

const source = argv.file
? globSource(...argv.file, { recursive: argv.recursive })
: toPull.source(process.stdin) // Pipe directly to ipfs.add
if (options.enableShardingExperiment && isDaemonOn()) {
throw new Error('Error: Enabling the sharding experiment should be done on the daemon')
}

const adder = ipfs.addPullStream(options)
const source = argv.file
? globSource(...argv.file, { recursive: argv.recursive })
: toPull.source(process.stdin) // Pipe directly to ipfs.add

// No progress or piping directly to ipfs.add: no need to getTotalBytes
if (!argv.progress || !argv.file) {
return addPipeline(source, adder, argv)
}
const adder = ipfs.addPullStream(options)

getTotalBytes(argv.file, (err, totalBytes) => {
if (err) throw err
// No progress or piping directly to ipfs.add: no need to getTotalBytes
if (!argv.progress || !argv.file) {
return addPipeline(source, adder, argv)
}

const totalBytes = await getTotalBytes(argv.file)
const bar = createProgressBar(totalBytes)

options.progress = byteLength => {
bar.update(byteLength / totalBytes, { progress: byteman(byteLength, 2, 'MB') })
}

addPipeline(source, adder, argv)
})
return addPipeline(source, adder, argv)
})())
}
}
11 changes: 4 additions & 7 deletions src/cli/commands/bitswap/stat.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@ module.exports = {
}
},

handler ({ ipfs, cidBase }) {
ipfs.bitswap.stat((err, stats) => {
if (err) {
throw err
}

handler ({ ipfs, cidBase, resolve }) {
resolve((async () => {
const stats = await ipfs.bitswap.stat()
stats.wantlist = stats.wantlist.map(k => cidToString(k['/'], { base: cidBase, upgrade: false }))
stats.peers = stats.peers || []

Expand All @@ -34,6 +31,6 @@ module.exports = {
${stats.wantlist.join('\n ')}
partners [${stats.peers.length}]
${stats.peers.join('\n ')}`)
})
})())
}
}
10 changes: 4 additions & 6 deletions src/cli/commands/bitswap/unwant.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ module.exports = {
choices: multibase.names
}
},
handler ({ ipfs, key, cidBase }) {
ipfs.bitswap.unwant(key, (err) => {
if (err) {
throw err
}
handler ({ ipfs, key, cidBase, resolve }) {
resolve((async () => {
await ipfs.bitswap.unwant(key)
print(`Key ${cidToString(key, { base: cidBase, upgrade: false })} removed from wantlist`)
})
})())
}
}
10 changes: 4 additions & 6 deletions src/cli/commands/bitswap/wantlist.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ module.exports = {
}
},

handler ({ ipfs, peer, cidBase }) {
ipfs.bitswap.wantlist(peer, (err, list) => {
if (err) {
throw err
}
handler ({ ipfs, peer, cidBase, resolve }) {
resolve((async () => {
const list = await ipfs.bitswap.wantlist(peer)
list.Keys.forEach(k => print(cidToString(k['/'], { base: cidBase, upgrade: false })))
})
})())
}
}
11 changes: 4 additions & 7 deletions src/cli/commands/block/get.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,14 @@ module.exports = {

builder: {},

handler ({ ipfs, key }) {
ipfs.block.get(key, (err, block) => {
if (err) {
throw err
}

handler ({ ipfs, key, resolve }) {
resolve((async () => {
const block = await ipfs.block.get(key)
if (block) {
print(block.data, false)
} else {
print('Block was unwanted before it could be remotely retrieved')
}
})
})())
}
}
37 changes: 16 additions & 21 deletions src/cli/commands/block/put.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,10 @@
const bl = require('bl')
const fs = require('fs')
const multibase = require('multibase')
const promisify = require('promisify-es6')
const { print } = require('../../utils')
const { cidToString } = require('../../../utils/cid')

function addBlock (data, opts) {
const ipfs = opts.ipfs

ipfs.block.put(data, opts, (err, block) => {
if (err) {
throw err
}
print(cidToString(block.cid, { base: opts.cidBase }))
})
}

module.exports = {
command: 'put [block]',

Expand Down Expand Up @@ -48,17 +38,22 @@ module.exports = {
},

handler (argv) {
if (argv.block) {
const buf = fs.readFileSync(argv.block)
return addBlock(buf, argv)
}

process.stdin.pipe(bl((err, input) => {
if (err) {
throw err
argv.resolve((async () => {
let data

if (argv.block) {
data = await promisify(fs.readFile)(argv.block)
} else {
data = await new Promise((resolve, reject) => {
process.stdin.pipe(bl((err, input) => {
if (err) return reject(err)
resolve(input)
}))
})
}

addBlock(input, argv)
}))
const { cid } = await argv.ipfs.block.put(data, argv)
print(cidToString(cid, { base: argv.cidBase }))
})())
}
}
17 changes: 7 additions & 10 deletions src/cli/commands/block/rm.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,15 @@ module.exports = {

builder: {},

handler ({ ipfs, key }) {
if (isDaemonOn()) {
// TODO implement this once `js-ipfs-http-client` supports it
throw new Error('rm block with daemon running is not yet implemented')
}

ipfs.block.rm(key, (err) => {
if (err) {
throw err
handler ({ ipfs, key, resolve }) {
resolve((async () => {
if (isDaemonOn()) {
// TODO implement this once `js-ipfs-http-client` supports it
throw new Error('rm block with daemon running is not yet implemented')
}

await ipfs.block.rm(key)
print('removed ' + key)
})
})())
}
}
11 changes: 4 additions & 7 deletions src/cli/commands/block/stat.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@ module.exports = {
}
},

handler ({ ipfs, key, cidBase }) {
ipfs.block.stat(key, (err, stats) => {
if (err) {
throw err
}

handler ({ ipfs, key, cidBase, resolve }) {
resolve((async () => {
const stats = await ipfs.block.stat(key)
print('Key: ' + cidToString(stats.key, { base: cidBase }))
print('Size: ' + stats.size)
})
})())
}
}
11 changes: 3 additions & 8 deletions src/cli/commands/bootstrap/add.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,9 @@ module.exports = {
},

handler (argv) {
argv.ipfs.bootstrap.add(argv.peer, {
default: argv.default
}, (err, list) => {
if (err) {
throw err
}

argv.resolve((async () => {
const list = await argv.ipfs.bootstrap.add(argv.peer, { default: argv.default })
list.Peers.forEach((peer) => print(peer))
})
})())
}
}
9 changes: 3 additions & 6 deletions src/cli/commands/bootstrap/list.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,9 @@ module.exports = {
builder: {},

handler (argv) {
argv.ipfs.bootstrap.list((err, list) => {
if (err) {
throw err
}

argv.resolve((async () => {
const list = await argv.ipfs.bootstrap.list()
list.Peers.forEach((node) => print(node))
})
})())
}
}
Loading