diff --git a/package.json b/package.json index d684499640..92346aa177 100644 --- a/package.json +++ b/package.json @@ -96,6 +96,7 @@ "promisify-es6": "^1.0.1", "pull-file": "^1.0.0", "pull-paramap": "^1.1.6", + "pull-pushable": "^2.0.1", "pull-sort": "^1.0.0", "pull-stream": "^3.4.5", "pull-stream-to-stream": "^1.3.3", diff --git a/src/core/ipfs/files.js b/src/core/ipfs/files.js index 5070398b17..4f829d5269 100644 --- a/src/core/ipfs/files.js +++ b/src/core/ipfs/files.js @@ -53,6 +53,7 @@ module.exports = function files (self) { pull( pull.values([hash]), pull.asyncMap(self._dagS.get.bind(self._dagS)), + pull.take(1), pull.map((node) => { const data = UnixFS.unmarshal(node.data) if (data.type === 'directory') { @@ -81,6 +82,10 @@ module.exports = function files (self) { return file }) ))) + }), + + getPull: promisify((hash, callback) => { + callback(null, exporter(hash, self._dagS)) }) } } diff --git a/src/http-api/resources/files.js b/src/http-api/resources/files.js index 46cff0093c..38e755fd11 100644 --- a/src/http-api/resources/files.js +++ b/src/http-api/resources/files.js @@ -1,13 +1,16 @@ 'use strict' const bs58 = require('bs58') -const ndjson = require('ndjson') const multipart = require('ipfs-multipart') const debug = require('debug') const tar = require('tar-stream') const log = debug('http-api:files') log.error = debug('http-api:files:error') -const async = require('async') +const pull = require('pull-stream') +const toStream = require('pull-stream-to-stream') +const toPull = require('stream-to-pull-stream') +const pushable = require('pull-pushable') +const EOL = require('os').EOL exports = module.exports @@ -37,8 +40,9 @@ exports.cat = { // main route handler which is called after the above `parseArgs`, but only if the args were valid handler: (request, reply) => { const key = request.pre.args.key + const ipfs = request.server.app.ipfs - request.server.app.ipfs.files.cat(key, (err, stream) => { + ipfs.files.cat(key, (err, stream) => { if (err) { log.error(err) return reply({ @@ -46,6 +50,13 @@ exports.cat = { Code: 0 }).code(500) } + + // hapi is not very clever and throws if no + // - _read method + // - _readableState object + // are there :( + stream._read = () => {} + stream._readableState = {} return reply(stream).header('X-Stream-Output', '1') }) } @@ -58,45 +69,44 @@ exports.get = { // main route handler which is called after the above `parseArgs`, but only if the args were valid handler: (request, reply) => { const key = request.pre.args.key - - request.server.app.ipfs.files.get(key, (err, stream) => { - if (err) { - log.error(err) - return reply({ - Message: 'Failed to get file: ' + err, - Code: 0 - }).code(500) - } - var pack = tar.pack() - const files = [] - stream.on('data', (data) => { - files.push(data) - }) - const processFile = (file) => { - return (callback) => { - if (!file.content) { // is directory - pack.entry({name: file.path, type: 'directory'}) - callback() - } else { // is file - const fileContents = [] - file.content.on('data', (data) => { - fileContents.push(data) - }) - file.content.on('end', () => { - pack.entry({name: file.path}, Buffer.concat(fileContents)) - callback() - }) + const ipfs = request.server.app.ipfs + const pack = tar.pack() + + ipfs.files.getPull(key, (err, stream) => { + if (err) return handleError(err) + + pull( + stream, + pull.asyncMap((file, cb) => { + const header = {name: file.path} + + if (!file.content) { + header.type = 'directory' + pack.entry(header) + cb() + } else { + header.size = file.size + toStream.source(file.content) + .pipe(pack.entry(header, cb)) } - } - } - stream.on('end', () => { - const callbacks = files.map(processFile) - async.series(callbacks, () => { + }), + pull.onEnd((err) => { + if (err) return handleError(err) + pack.finalize() reply(pack).header('X-Stream-Output', '1') }) - }) + ) }) + + function handleError (err) { + log.error(err) + + reply({ + Message: 'Failed to get file: ' + err, + Code: 0 + }).code(500) + } } } @@ -106,67 +116,66 @@ exports.add = { return reply('Array, Buffer, or String is required.').code(400).takeover() } + const ipfs = request.server.app.ipfs + // TODO: make pull-multipart const parser = multipart.reqParser(request.payload) + let filesParsed = false - var filesParsed = false - var filesAdded = 0 + const fileAdder = pushable() - var serialize = ndjson.serialize() - // hapi doesn't permit object streams: http://hapijs.com/api#replyerr-result - serialize._readableState.objectMode = false - - request.server.app.ipfs.files.createAddStream((err, fileAdder) => { - if (err) { - return reply({ - Message: err, - Code: 0 - }).code(500) + parser.on('file', (fileName, fileStream) => { + const filePair = { + path: fileName, + content: toPull(fileStream) } + filesParsed = true + fileAdder.push(filePair) + }) - fileAdder.on('data', (file) => { - const filePath = file.path ? file.path : file.hash - serialize.write({ - Name: filePath, - Hash: file.hash - }) - filesAdded++ + parser.on('directory', (directory) => { + fileAdder.push({ + path: directory, + content: '' }) + }) + + parser.on('end', () => { + if (!filesParsed) { + return reply("File argument 'data' is required.") + .code(400).takeover() + } + fileAdder.end() + }) - fileAdder.on('end', () => { - if (filesAdded === 0 && filesParsed) { + pull( + fileAdder, + ipfs.files.createAddPullStream(), + pull.map((file) => { + return { + Name: file.path ? file.path : file.hash, + Hash: file.hash + } + }), + pull.map((file) => JSON.stringify(file) + EOL), + pull.collect((err, files) => { + if (err) { return reply({ - Message: 'Failed to add files.', + Message: err, Code: 0 }).code(500) - } else { - serialize.end() - return reply(serialize) - .header('x-chunked-output', '1') - .header('content-type', 'application/json') } - }) - parser.on('file', (fileName, fileStream) => { - var filePair = { - path: fileName, - content: fileStream + if (files.length === 0 && filesParsed) { + return reply({ + Message: 'Failed to add files.', + Code: 0 + }).code(500) } - filesParsed = true - fileAdder.write(filePair) - }) - parser.on('directory', (directory) => { - fileAdder.write({ - path: directory, - content: '' - }) - }) - parser.on('end', () => { - if (!filesParsed) { - return reply("File argument 'data' is required.").code(400).takeover() - } - fileAdder.end() + reply(files.join('')) + .header('x-chunked-output', '1') + .header('content-type', 'application/json') }) - }) + ) } }