Skip to content
This repository has been archived by the owner on Aug 12, 2020. It is now read-only.

Commit

Permalink
Merge pull request #47 from noffle/end-exporter-stream
Browse files Browse the repository at this point in the history
End export stream on completion.
  • Loading branch information
daviddias authored Jun 28, 2016
2 parents 138990f + a5b9816 commit 04e7483
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 82 deletions.
6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@
"homepage": "https://github.com/diasdavid/js-ipfs-data-importing#readme",
"devDependencies": {
"aegir": "^3.0.1",
"async": "^1.5.2",
"block-stream2": "^1.1.0",
"bs58": "^3.0.0",
"buffer-loader": "0.0.1",
"chai": "^3.5.0",
"concat-stream": "^1.5.1",
"fs-blob-store": "^5.2.1",
"idb-plus-blob-store": "^1.1.2",
"ipfs-repo": "^0.7.5",
Expand All @@ -51,11 +53,13 @@
"string-to-stream": "^1.0.1"
},
"dependencies": {
"async": "^1.5.2",
"block-stream2": "^1.1.0",
"bs58": "^3.0.0",
"debug": "^2.2.0",
"field-trip": "0.0.3",
"ipfs-merkle-dag": "^0.5.0",
"ipfs-unixfs": "^0.1.0",
"is-ipfs": "^0.2.0",
"isstream": "^0.1.2",
"readable-stream": "^1.1.13",
"run-series": "^1.1.4",
Expand Down
118 changes: 55 additions & 63 deletions src/exporter.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
const debug = require('debug')
const log = debug('exporter')
log.err = debug('exporter:error')
const isIPFS = require('is-ipfs')
const bs58 = require('bs58')
const UnixFS = require('ipfs-unixfs')
const series = require('run-series')
const async = require('async')
const Readable = require('readable-stream').Readable
const pathj = require('path')
const util = require('util')
const fieldtrip = require('field-trip')

exports = module.exports = Exporter

Expand All @@ -19,21 +21,29 @@ function Exporter (hash, dagService, options) {
return new Exporter(hash, dagService, options)
}

// Sanitize hash.
if (!isIPFS.multihash(hash)) {
throw new Error('not valid multihash')
}
if (Buffer.isBuffer(hash)) {
hash = bs58.encode(hash)
}

Readable.call(this, { objectMode: true })

this.options = options || {}

this._read = (n) => {}

let fileExporter = (node, name, callback) => {
let init
let fileExporter = (node, name, done) => {
let init = false

if (!callback) { callback = function noop () {} }
if (!done) throw new Error('done must be set')

// Logic to export a single (possibly chunked) unixfs file.
var rs = new Readable()
if (node.links.length === 0) {
const unmarshaledData = UnixFS.unmarshal(node.data)
init = false
rs._read = () => {
if (init) {
return
Expand All @@ -43,10 +53,8 @@ function Exporter (hash, dagService, options) {
rs.push(null)
}
this.push({ content: rs, path: name })
callback()
return
done()
} else {
init = false
rs._read = () => {
if (init) {
return
Expand All @@ -57,7 +65,7 @@ function Exporter (hash, dagService, options) {
return (cb) => {
dagService.get(link.hash, (err, res) => {
if (err) {
cb(err)
return cb(err)
}
var unmarshaledData = UnixFS.unmarshal(res.data)
rs.push(unmarshaledData.data)
Expand All @@ -67,80 +75,64 @@ function Exporter (hash, dagService, options) {
})
series(array, (err, res) => {
if (err) {
callback()
rs.emit('error', err)
return
}
rs.push(null)
callback()
return
})
}
this.push({ content: rs, path: name })
callback()
return
done()
}
}

let dirExporter = (node, name, callback) => {
let init
// Logic to export a unixfs directory.
let dirExporter = (node, name, add, done) => {
if (!add) throw new Error('add must be set')
if (!done) throw new Error('done must be set')

if (!callback) { callback = function noop () {} }
this.push({content: null, path: name})

var rs = new Readable()
if (node.links.length === 0) {
init = false
rs._read = () => {
if (init) {
return
}
init = true
rs.push(node.data)
rs.push(null)
}
this.push({content: null, path: name})
callback()
return
} else {
async.forEachSeries(node.links, (link, callback) => {
dagService.get(link.hash, (err, res) => {
if (err) {
callback(err)
}
var unmarshaledData = UnixFS.unmarshal(res.data)
if (unmarshaledData.type === 'file') {
return (fileExporter(res, pathj.join(name, link.name), callback))
}
if (unmarshaledData.type === 'directory') {
return (dirExporter(res, pathj.join(name, link.name), callback))
}
callback()
})
}, (err) => {
if (err) {
callback()
return
}
callback()
return
// Directory has links
if (node.links.length > 0) {
node.links.forEach((link) => {
add({ path: pathj.join(name, link.name), hash: link.hash })
})
}
done()
}

dagService.get(hash, (err, fetchedNode) => {
// Traverse the DAG asynchronously
var self = this
fieldtrip([{ path: hash, hash: hash }], visit, (err) => {
if (err) {
this.emit('error', err)
self.emit('error', err)
return
}
const data = UnixFS.unmarshal(fetchedNode.data)
const type = data.type

if (type === 'directory') {
dirExporter(fetchedNode, hash)
}
if (type === 'file') {
fileExporter(fetchedNode, hash)
}
self.push(null)
})

// Visit function: called once per node in the exported graph
function visit (item, add, done) {
dagService.get(item.hash, (err, fetchedNode) => {
if (err) {
self.emit('error', err)
return
}

const data = UnixFS.unmarshal(fetchedNode.data)
const type = data.type

if (type === 'directory') {
dirExporter(fetchedNode, item.path, add, done)
}

if (type === 'file') {
fileExporter(fetchedNode, item.path, done)
}
})
}

return this
}
48 changes: 30 additions & 18 deletions test/test-exporter.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const expect = require('chai').expect
const BlockService = require('ipfs-block-service')
const DAGService = require('ipfs-merkle-dag').DAGService
const UnixFS = require('ipfs-unixfs')
const bl = require('bl')
const concat = require('concat-stream')
const fs = require('fs')
const path = require('path')

Expand All @@ -32,13 +32,16 @@ module.exports = function (repo) {
const unmarsh = UnixFS.unmarshal(fetchedNode.data)
expect(err).to.not.exist
const testExport = exporter(hash, ds)
testExport.on('data', (file) => {
file.content.pipe(bl((err, bldata) => {
expect(err).to.not.exist
testExport.on('error', (err) => {
expect(err).to.not.exist
})
testExport.pipe(concat((files) => {
expect(files).to.be.length(1)
files[0].content.pipe(concat((bldata) => {
expect(bldata).to.deep.equal(unmarsh.data)
done()
}))
})
}))
})
})

Expand All @@ -47,10 +50,12 @@ module.exports = function (repo) {
const bs = new BlockService(repo)
const ds = new DAGService(bs)
const testExport = exporter(hash, ds)
testExport.on('error', (err) => {
expect(err).to.not.exist
})
testExport.on('data', (file) => {
file.content.pipe(bl((err, bldata) => {
file.content.pipe(concat((bldata) => {
expect(bldata).to.deep.equal(bigFile)
expect(err).to.not.exist
done()
}))
})
Expand All @@ -61,10 +66,13 @@ module.exports = function (repo) {
const bs = new BlockService(repo)
const ds = new DAGService(bs)
const testExport = exporter(hash, ds)
testExport.on('error', (err) => {
expect(err).to.not.exist
})
testExport.on('data', (file) => {
expect(file.path).to.equal('QmRQgufjp9vLE8XK2LGKZSsPCFCF6e4iynCQtNB5X2HBKE')
file.content.pipe(bl((err, bldata) => {
expect(err).to.not.exist
file.content.pipe(concat((bldata) => {
expect(bldata).to.exist
done()
}))
})
Expand All @@ -75,24 +83,28 @@ module.exports = function (repo) {
const bs = new BlockService(repo)
const ds = new DAGService(bs)
const testExport = exporter(hash, ds)
var fsa = []
testExport.on('data', (files) => {
fsa.push(files)
testExport.on('error', (err) => {
expect(err).to.not.exist
})
setTimeout(() => {
expect(fsa[0].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/200Bytes.txt')
expect(fsa[1].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/dir-another')
expect(fsa[2].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1/200Bytes.txt')
expect(fsa[3].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1/level-2')
testExport.pipe(concat((files) => {
expect(files[0].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN')
expect(files[1].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/200Bytes.txt')
expect(files[2].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/dir-another')
expect(files[3].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1')
expect(files[4].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1/200Bytes.txt')
expect(files[5].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1/level-2')
done()
}, 1000)
}))
})

it('returns a null stream for dir', (done) => {
const hash = 'QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn' // This hash doesn't exist in the repo
const bs = new BlockService(repo)
const ds = new DAGService(bs)
const testExport = exporter(hash, ds)
testExport.on('error', (err) => {
expect(err).to.not.exist
})
testExport.on('data', (dir) => {
expect(dir.content).to.equal(null)
done()
Expand Down

0 comments on commit 04e7483

Please sign in to comment.