This repository has been archived by the owner on Aug 12, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 20
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #64 from ipfs/pull
[WIP] Pull streams
- Loading branch information
Showing
133 changed files
with
868 additions
and
815 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ language: node_js | |
node_js: | ||
- 4 | ||
- 5 | ||
- stable | ||
|
||
# Make sure we have new NPM. | ||
before_install: | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
'use strict' | ||
|
||
const chunker = require('block-stream2') | ||
const block = require('pull-block') | ||
|
||
exports = module.exports = function (size) { | ||
return chunker({ size: size, zeroPadding: false }) | ||
return block(size, {zeroPadding: false}) | ||
} |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,124 +1,39 @@ | ||
'use strict' | ||
|
||
const debug = require('debug') | ||
const log = debug('unixfs') | ||
log.err = debug('unixfs:error') | ||
const isIPFS = require('is-ipfs') | ||
const UnixFS = require('ipfs-unixfs') | ||
const series = require('run-series') | ||
const Readable = require('readable-stream').Readable | ||
const pathj = require('path') | ||
const util = require('util') | ||
const fieldtrip = require('field-trip') | ||
const cleanMultihash = require('./clean-multihash') | ||
const traverse = require('pull-traverse') | ||
const pull = require('pull-stream') | ||
|
||
exports = module.exports = Exporter | ||
const util = require('./util') | ||
const switchType = util.switchType | ||
const cleanMultihash = util.cleanMultihash | ||
|
||
util.inherits(Exporter, Readable) | ||
const dirExporter = require('./exporters/dir') | ||
const fileExporter = require('./exporters/file') | ||
|
||
function Exporter (hash, dagService, options) { | ||
if (!(this instanceof Exporter)) { | ||
return new Exporter(hash, dagService, options) | ||
} | ||
|
||
// Sanitize hash | ||
if (!isIPFS.multihash(hash)) { | ||
throw new Error('not valid multihash') | ||
} | ||
module.exports = (hash, dagService, options) => { | ||
hash = cleanMultihash(hash) | ||
|
||
Readable.call(this, { objectMode: true }) | ||
|
||
this.options = options || {} | ||
|
||
this._read = (n) => {} | ||
|
||
let fileExporter = (node, name, done) => { | ||
if (!done) { | ||
throw new Error('done must be set') | ||
} | ||
|
||
const contentRS = new Readable() | ||
contentRS._read = () => {} | ||
|
||
// Logic to export a single (possibly chunked) unixfs file. | ||
if (node.links.length === 0) { | ||
const unmarshaledData = UnixFS.unmarshal(node.data) | ||
contentRS.push(unmarshaledData.data) | ||
contentRS.push(null) | ||
this.push({ content: contentRS, path: name }) | ||
done() | ||
} else { | ||
const array = node.links.map((link) => { | ||
return (cb) => { | ||
dagService.get(link.hash, (err, res) => { | ||
if (err) { | ||
return cb(err) | ||
} | ||
var unmarshaledData = UnixFS.unmarshal(res.data) | ||
contentRS.push(unmarshaledData.data) | ||
cb() | ||
}) | ||
} | ||
}) | ||
series(array, (err) => { | ||
if (err) { | ||
return contentRS.emit('error', err) | ||
} | ||
contentRS.push(null) | ||
}) | ||
this.push({ content: contentRS, path: name }) | ||
done() | ||
} | ||
} | ||
|
||
// Logic to export a unixfs directory. | ||
let dirExporter = (node, name, add, done) => { | ||
if (!add) { | ||
throw new Error('add must be set') | ||
} | ||
if (!done) { | ||
throw new Error('done must be set') | ||
} | ||
|
||
this.push({content: null, path: name}) | ||
|
||
// Directory has links | ||
if (node.links.length > 0) { | ||
node.links.forEach((link) => { | ||
add({ path: pathj.join(name, link.name), hash: link.hash }) | ||
}) | ||
} | ||
done() | ||
} | ||
|
||
// Traverse the DAG asynchronously | ||
fieldtrip([{path: hash, hash: hash}], visit.bind(this), (err) => { | ||
if (err) { | ||
return this.emit('error', err) | ||
} | ||
this.push(null) | ||
}) | ||
|
||
// Visit function: called once per node in the exported graph | ||
function visit (item, add, done) { | ||
dagService.get(item.hash, (err, node) => { | ||
if (err) { | ||
return this.emit('error', err) | ||
} | ||
|
||
const data = UnixFS.unmarshal(node.data) | ||
const type = data.type | ||
|
||
if (type === 'directory') { | ||
dirExporter(node, item.path, add, done) | ||
} | ||
|
||
if (type === 'file') { | ||
fileExporter(node, item.path, done) | ||
} | ||
}) | ||
options = options || {} | ||
|
||
function visitor (item) { | ||
return pull( | ||
dagService.getStream(item.hash), | ||
pull.map((node) => switchType( | ||
node, | ||
() => dirExporter(node, item.path, dagService), | ||
() => fileExporter(node, item.path, dagService) | ||
)), | ||
pull.flatten() | ||
) | ||
} | ||
|
||
return this | ||
// Traverse the DAG | ||
return pull( | ||
dagService.getStream(hash), | ||
pull.map((node) => switchType( | ||
node, | ||
() => traverse.widthFirst({path: hash, hash}, visitor), | ||
() => fileExporter(node, hash, dagService) | ||
)), | ||
pull.flatten() | ||
) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
'use strict' | ||
|
||
const path = require('path') | ||
const pull = require('pull-stream') | ||
|
||
const fileExporter = require('./file') | ||
const switchType = require('../util').switchType | ||
|
||
// Logic to export a unixfs directory. | ||
module.exports = (node, name, dagService) => { | ||
return pull( | ||
pull.values(node.links), | ||
pull.map((link) => ({ | ||
path: path.join(name, link.name), | ||
hash: link.hash | ||
})), | ||
pull.map((item) => pull( | ||
dagService.getStream(item.hash), | ||
pull.map((n) => switchType( | ||
n, | ||
() => pull.values([item]), | ||
() => fileExporter(n, item.path, dagService) | ||
)), | ||
pull.flatten() | ||
)), | ||
pull.flatten() | ||
) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
'use strict' | ||
|
||
const UnixFS = require('ipfs-unixfs') | ||
const pull = require('pull-stream') | ||
|
||
function extractContent (node) { | ||
return UnixFS.unmarshal(node.data).data | ||
} | ||
|
||
// Logic to export a single (possibly chunked) unixfs file. | ||
module.exports = (node, name, ds) => { | ||
let content | ||
|
||
if (node.links.length === 0) { | ||
const c = extractContent(node) | ||
content = pull.values([c]) | ||
} else { | ||
content = pull( | ||
pull.values(node.links), | ||
pull.map((link) => ds.getStream(link.hash)), | ||
pull.flatten(), | ||
pull.map(extractContent) | ||
) | ||
} | ||
|
||
return pull.values([{ | ||
content: content, | ||
path: name | ||
}]) | ||
} |
Oops, something went wrong.