Skip to content
This repository has been archived by the owner on Aug 12, 2020. It is now read-only.

export is now a readable object stream #42

Merged
merged 1 commit into from
May 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ add.on('end', () => {
// Calling write on the importer to add the file/object tuples

add.write(input)
add.write(input2)
add.end()
```

Expand Down Expand Up @@ -121,13 +122,13 @@ const repo = new ipfsRepo('', { stores: memStore })
const blocks = new ipfsBlockService(repo)
const dag = new ipfsMerkleDag.DAGService(blocks)
// Create an export event with the hash you want to export and a dag service
// Create an export readable object stream with the hash you want to export and a dag service
const exportEvent = Exporter(hash, dag)
// Pipe the return stream to console
exportEvent.on('file', (result) => {
exportEvent.on('data', (result) => {
result.stream.pipe(process.stdout)
}
```
Expand All @@ -137,8 +138,7 @@ exportEvent.on('file', (result) => {
const Importer = require('ipfs-unixfs-engine').exporter
```

The exporter is an event emitter that returns a stream of the file found
by the multihash of the file from the dag service.
The exporter is a readable stream in object mode that returns an object ```{ stream: stream, path: 'path' }``` by the multihash of the file from the dag service.


## install
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"ipfs-merkle-dag": "^0.5.0",
"ipfs-unixfs": "^0.1.0",
"readable-stream": "^1.1.13",
"run-series": "^1.1.4",
"through2": "^2.0.0"
},
"contributors": [
Expand All @@ -67,4 +68,4 @@
"greenkeeperio-bot <support@greenkeeper.io>",
"nginnever <ginneversource@gmail.com>"
]
}
}
120 changes: 62 additions & 58 deletions src/exporter.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,32 @@ const debug = require('debug')
const log = debug('exporter')
log.err = debug('exporter:error')
const UnixFS = require('ipfs-unixfs')
const series = require('run-series')
const async = require('async')
const events = require('events')
const Readable = require('readable-stream').Readable
const pathj = require('path')
const util = require('util')

exports = module.exports = exporter
exports = module.exports = Exporter

function exporter (hash, dagService, options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
util.inherits(Exporter, Readable)

function Exporter (hash, dagService, options) {
if (!(this instanceof Exporter)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might be leaking memory in a weird way if this check doesn't come before the Readable.call(...) line.

return new Exporter(hash, dagService, options)
}

const ee = new events.EventEmitter()
dagService.get(hash, (err, fetchedNode) => {
if (err) {
if (callback) {
return callback(err)
}
return
}
const data = UnixFS.unmarshal(fetchedNode.data)
const type = data.type
if (type === 'directory') {
dirExporter(fetchedNode, hash, callback)
}
if (type === 'file') {
fileExporter(fetchedNode, hash, false, callback)
}
})
return ee
Readable.call(this, { objectMode: true })

function fileExporter (node, name, dir, callback) {
this.options = options || {}

this._read = (n) => {}

let fileExporter = (node, name, callback) => {
let init

if (typeof dir === 'function') { callback = dir; dir = {} }
if (!callback) { callback = function noop () {} }

var rs = new Readable()
if (node.links.length === 0) {
const unmarshaledData = UnixFS.unmarshal(node.data)
Expand All @@ -52,10 +42,8 @@ function exporter (hash, dagService, options, callback) {
rs.push(unmarshaledData.data)
rs.push(null)
}
ee.emit('file', { stream: rs, path: name, dir: dir })
if (callback) {
callback()
}
this.push({ stream: rs, path: name })
callback()
return
} else {
init = false
Expand All @@ -64,36 +52,40 @@ function exporter (hash, dagService, options, callback) {
return
}
init = true
async.forEachSeries(node.links, (link, callback) => {
dagService.get(link.hash, (err, res) => {
if (err) {
callback(err)
}
var unmarshaledData = UnixFS.unmarshal(res.data)
rs.push(unmarshaledData.data)
callback()
})
}, (err) => {

const array = node.links.map((link) => {
return (cb) => {
dagService.get(link.hash, (err, res) => {
if (err) {
cb(err)
}
var unmarshaledData = UnixFS.unmarshal(res.data)
rs.push(unmarshaledData.data)
cb()
})
}
})
series(array, (err, res) => {
if (err) {
if (callback) {
return callback(err)
}
callback()
return
}
rs.push(null)
if (callback) {
callback()
}
callback()
return
})
}
ee.emit('file', { stream: rs, path: name, dir: dir })
this.push({ stream: rs, path: name })
callback()
return
}
}

function dirExporter (node, name, callback) {
let dirExporter = (node, name, callback) => {
let init

if (!callback) { callback = function noop () {} }

var rs = new Readable()
if (node.links.length === 0) {
init = false
Expand All @@ -105,10 +97,8 @@ function exporter (hash, dagService, options, callback) {
rs.push(node.data)
rs.push(null)
}
ee.emit('file', {stream: rs, path: name})
if (callback) {
callback()
}
this.push({stream: null, path: name})
callback()
return
} else {
async.forEachSeries(node.links, (link, callback) => {
Expand All @@ -127,16 +117,30 @@ function exporter (hash, dagService, options, callback) {
})
}, (err) => {
if (err) {
if (callback) {
return callback(err)
}
return
}
if (callback) {
callback()
return
}
callback()
return
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all of these (L123 to L134) can be replaced by just passing the callback as the done function for your async.forEachSeries.

}
}

dagService.get(hash, (err, fetchedNode) => {
if (err) {
this.emit('error', err)
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no log, no emit about the error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would the proper way to log an error over a stream be? something like this.emit( "error", new Error( "dag service Error" ) )?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

}
const data = UnixFS.unmarshal(fetchedNode.data)
const type = data.type

if (type === 'directory') {
dirExporter(fetchedNode, hash)
}
if (type === 'file') {
fileExporter(fetchedNode, hash)
}
})

return this
}
74 changes: 60 additions & 14 deletions test/test-exporter.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ const BlockService = require('ipfs-block-service')
const DAGService = require('ipfs-merkle-dag').DAGService
const UnixFS = require('ipfs-unixfs')
const bl = require('bl')
const fs = require('fs')
const path = require('path')

let ds

module.exports = function (repo) {
describe('exporter', function () {
const bigFile = fs.readFileSync(path.join(__dirname, '/test-data/1.2MiB.txt'))
before((done) => {
const bs = new BlockService(repo)
expect(bs).to.exist
Expand All @@ -25,12 +28,12 @@ module.exports = function (repo) {
const hash = 'QmQmZQxSKQppbsWfVzBvg59Cn3DKtsNVQ94bjAxg2h3Lb8'
const bs = new BlockService(repo)
const ds = new DAGService(bs)
const testExport = exporter(hash, ds)
testExport.on('file', (data) => {
ds.get(hash, (err, fetchedNode) => {
const unmarsh = UnixFS.unmarshal(fetchedNode.data)
expect(err).to.not.exist
data.stream.pipe(bl((err, bldata) => {
ds.get(hash, (err, fetchedNode) => {
const unmarsh = UnixFS.unmarshal(fetchedNode.data)
expect(err).to.not.exist
const testExport = exporter(hash, ds)
testExport.on('data', (file) => {
file.stream.pipe(bl((err, bldata) => {
expect(err).to.not.exist
expect(bldata).to.deep.equal(unmarsh.data)
done()
Expand All @@ -44,9 +47,12 @@ module.exports = function (repo) {
const bs = new BlockService(repo)
const ds = new DAGService(bs)
const testExport = exporter(hash, ds)
testExport.on('file', (data) => {
expect(data.stream).to.exist
done()
testExport.on('data', (file) => {
file.stream.pipe(bl((err, bldata) => {
expect(bldata).to.deep.equal(bigFile)
expect(err).to.not.exist
done()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What data do you expect here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's expecting the raw data from 1.2MiB.txt, just made a test case for this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we assert that the data and links match here?

}))
})
})

Expand All @@ -55,9 +61,12 @@ module.exports = function (repo) {
const bs = new BlockService(repo)
const ds = new DAGService(bs)
const testExport = exporter(hash, ds)
testExport.on('file', (data) => {
expect(data.stream).to.exist
done()
testExport.on('data', (file) => {
expect(file.path).to.equal('QmRQgufjp9vLE8XK2LGKZSsPCFCF6e4iynCQtNB5X2HBKE')
file.stream.pipe(bl((err, bldata) => {
expect(err).to.not.exist
done()
}))
})
})

Expand All @@ -67,8 +76,8 @@ module.exports = function (repo) {
const ds = new DAGService(bs)
const testExport = exporter(hash, ds)
var fsa = []
testExport.on('file', (data) => {
fsa.push(data)
testExport.on('data', (files) => {
fsa.push(files)
})
setTimeout(() => {
expect(fsa[0].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/200Bytes.txt')
Expand All @@ -78,5 +87,42 @@ module.exports = function (repo) {
done()
}, 1000)
})

it('returns a null stream for dir', (done) => {
const hash = 'QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn' // This hash doesn't exist in the repo
const bs = new BlockService(repo)
const ds = new DAGService(bs)
const testExport = exporter(hash, ds)
testExport.on('data', (dir) => {
expect(dir.stream).to.equal(null)
done()
})
})

it('fails on non existent hash', (done) => {
const hash = 'QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKj3' // This hash doesn't exist in the repo
const bs = new BlockService(repo)
const ds = new DAGService(bs)
const testExport = exporter(hash, ds)
testExport.on('error', (err) => {
const error = err.toString()
expect(err).to.exist
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check that it's the error we're expecting?

const browser = error.includes('Error: key not found:')
const node = error.includes('no such file or directory')
// the browser and node js return different errors
if (browser) {
expect(error).to.contain('Error: key not found:')
done()
}
if (node) {
expect(error).to.contain('no such file or directory')
done()
}
if (!node && !browser) {
expect(node).to.equal(true)
done()
}
})
})
})
}