Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

send files HTTP request should stream #629

Merged
merged 5 commits into from
Nov 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
"lru-cache": "^4.1.1",
"multiaddr": "^3.0.1",
"multihashes": "~0.4.12",
"multipart-stream": "^2.0.1",
"ndjson": "^1.5.0",
"once": "^1.4.0",
"peer-id": "~0.10.2",
Expand Down
29 changes: 15 additions & 14 deletions src/block/put.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,35 @@
const promisify = require('promisify-es6')
const Block = require('ipfs-block')
const CID = require('cids')
const once = require('once')
const SendOneFile = require('../utils/send-one-file')

module.exports = (send) => {
return promisify((block, cid, callback) => {
const sendOneFile = SendOneFile(send, 'block/put')

return promisify((block, cid, _callback) => {
// TODO this needs to be adjusted with the new go-ipfs http-api
if (typeof cid === 'function') {
callback = cid
_callback = cid
cid = {}
}

const callback = once(_callback)

if (Array.isArray(block)) {
const err = new Error('block.put() only accepts 1 file')
return callback(err)
return callback(new Error('block.put accepts only one block'))
}

if (typeof block === 'object' && block.data) {
block = block.data
}

const request = {
path: 'block/put',
files: block
}

// Transform the response to a Block
const transform = (info, callback) => {
callback(null, new Block(block, new CID(info.Key)))
}
sendOneFile(block, {}, (err, result) => {
if (err) {
return callback(err) // early
}

send.andTransform(request, transform, callback)
callback(null, new Block(block, new CID(result.Key)))
})
})
}
30 changes: 3 additions & 27 deletions src/files/add-pull-stream.js
Original file line number Diff line number Diff line change
@@ -1,30 +1,6 @@
'use strict'

const addCmd = require('./add.js')
const pull = require('pull-stream')
const pushable = require('pull-pushable')
const SendFilesStream = require('../utils/send-files-stream')
const toPull = require('stream-to-pull-stream')

module.exports = (send) => {
const add = addCmd(send)

return (options) => {
options = options || {}

const source = pushable()
const sink = pull.collect((err, tuples) => {
if (err) { return source.end(err) }

add(tuples, options, (err, filesAdded) => {
if (err) { return source.end(err) }

filesAdded.forEach((file) => source.push(file))
source.end()
})
})

return {
sink: sink,
source: source
}
}
}
module.exports = (send) => (options) => toPull(SendFilesStream(send, 'add')(options))
30 changes: 2 additions & 28 deletions src/files/add-readable-stream.js
Original file line number Diff line number Diff line change
@@ -1,31 +1,5 @@
'use strict'

const addCmd = require('./add.js')
const Duplex = require('readable-stream').Duplex
const SendFilesStream = require('../utils/send-files-stream')

module.exports = (send) => {
const add = addCmd(send)

return (options) => {
options = options || {}

const tuples = []

const ds = new Duplex({ objectMode: true })
ds._read = (n) => {}

ds._write = (file, enc, next) => {
tuples.push(file)
next()
}

ds.end = () => add(tuples, options, (err, res) => {
if (err) { return ds.emit('error', err) }

res.forEach((tuple) => ds.push(tuple))
ds.push(null)
})

return ds
}
}
module.exports = (send) => SendFilesStream(send, 'add')
59 changes: 25 additions & 34 deletions src/files/add.js
Original file line number Diff line number Diff line change
@@ -1,51 +1,42 @@
'use strict'

const isStream = require('is-stream')
const promisify = require('promisify-es6')
const ProgressStream = require('../utils/progress-stream')
const converter = require('../utils/converter')
const ConcatStream = require('concat-stream')
const once = require('once')
const isStream = require('is-stream')
const SendFilesStream = require('../utils/send-files-stream')

module.exports = (send) => {
return promisify((files, opts, callback) => {
if (typeof opts === 'function') {
callback = opts
opts = {}
}
const createAddStream = SendFilesStream(send, 'add')

opts = opts || {}

const ok = Buffer.isBuffer(files) ||
isStream.readable(files) ||
Array.isArray(files)

if (!ok) {
return callback(new Error('"files" must be a buffer, readable stream, or array of objects'))
return promisify((_files, options, _callback) => {
if (typeof options === 'function') {
_callback = options
options = null
}

const qs = {}
const callback = once(_callback)

if (opts['cid-version'] != null) {
qs['cid-version'] = opts['cid-version']
} else if (opts.cidVersion != null) {
qs['cid-version'] = opts.cidVersion
if (!options) {
options = {}
}

if (opts['raw-leaves'] != null) {
qs['raw-leaves'] = opts['raw-leaves']
} else if (opts.rawLeaves != null) {
qs['raw-leaves'] = opts.rawLeaves
}
const ok = Buffer.isBuffer(_files) ||
isStream.readable(_files) ||
Array.isArray(_files)

if (opts.hash != null) {
qs.hash = opts.hash
} else if (opts.hashAlg != null) {
qs.hash = opts.hashAlg
if (!ok) {
return callback(new Error('"files" must be a buffer, readable stream, or array of objects'))
}

const request = { path: 'add', files: files, qs: qs, progress: opts.progress }
const files = [].concat(_files)

const stream = createAddStream(options)
const concat = ConcatStream((result) => callback(null, result))
stream.once('error', callback)
stream.pipe(concat)

send.andTransform(request, (response, cb) => {
converter(ProgressStream.fromStream(opts.progress, response), cb)
}, callback)
files.forEach((file) => stream.write(file))
stream.end()
})
}
34 changes: 24 additions & 10 deletions src/files/write.js
Original file line number Diff line number Diff line change
@@ -1,28 +1,42 @@
'use strict'

const promisify = require('promisify-es6')
const concatStream = require('concat-stream')
const once = require('once')
const SendFilesStream = require('../utils/send-files-stream')

module.exports = (send) => {
return promisify((pathDst, files, opts, callback) => {
const sendFilesStream = SendFilesStream(send, 'files/write')

return promisify((pathDst, _files, opts, _callback) => {
if (typeof opts === 'function' &&
!callback) {
callback = opts
!_callback) {
_callback = opts
opts = {}
}

// opts is the real callback --
// 'callback' is being injected by promisify
if (typeof opts === 'function' &&
typeof callback === 'function') {
callback = opts
typeof _callback === 'function') {
_callback = opts
opts = {}
}

send({
path: 'files/write',
const files = [].concat(_files)
const callback = once(_callback)

const options = {
args: pathDst,
qs: opts,
files: files
}, callback)
qs: opts
}

const stream = sendFilesStream(options)
const concat = concatStream((result) => callback(null, result))
stream.once('error', callback)
stream.pipe(concat)

files.forEach((file) => stream.write(file))
stream.end()
})
}
4 changes: 2 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
const multiaddr = require('multiaddr')
const loadCommands = require('./utils/load-commands')
const getConfig = require('./utils/default-config')
const getRequestAPI = require('./utils/request-api')
const sendRequest = require('./utils/send-request')

function IpfsAPI (hostOrMultiaddr, port, opts) {
const config = getConfig()
Expand Down Expand Up @@ -35,7 +35,7 @@ function IpfsAPI (hostOrMultiaddr, port, opts) {
config.port = split[1]
}

const requestAPI = getRequestAPI(config)
const requestAPI = sendRequest(config)
const cmds = loadCommands(requestAPI)
cmds.send = requestAPI
cmds.Buffer = Buffer
Expand Down
15 changes: 8 additions & 7 deletions src/object/appendData.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
'use strict'

const promisify = require('promisify-es6')
const once = require('once')
const cleanMultihash = require('../utils/clean-multihash')
const SendOneFile = require('../utils/send-one-file')

module.exports = (send) => {
const objectGet = require('./get')(send)
const sendOneFile = SendOneFile(send, 'object/patch/append-data')

return promisify((multihash, data, opts, callback) => {
return promisify((multihash, data, opts, _callback) => {
if (typeof opts === 'function') {
callback = opts
_callback = opts
opts = {}
}
const callback = once(_callback)
if (!opts) {
opts = {}
}
Expand All @@ -21,14 +25,11 @@ module.exports = (send) => {
return callback(err)
}

send({
path: 'object/patch/append-data',
args: [multihash],
files: data
}, (err, result) => {
sendOneFile(data, { args: [multihash] }, (err, result) => {
if (err) {
return callback(err)
}

objectGet(result.Hash, { enc: 'base58' }, callback)
})
})
Expand Down
23 changes: 15 additions & 8 deletions src/object/put.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,20 @@ const lruOptions = {
}

const cache = LRU(lruOptions)
const SendOneFile = require('../utils/send-one-file')
const once = require('once')

module.exports = (send) => {
return promisify((obj, options, callback) => {
const sendOneFile = SendOneFile(send, 'object/put')

return promisify((obj, options, _callback) => {
if (typeof options === 'function') {
callback = options
_callback = options
options = {}
}

const callback = once(_callback)

if (!options) {
options = {}
}
Expand Down Expand Up @@ -56,13 +63,13 @@ module.exports = (send) => {
}
const enc = options.enc || 'json'

send({
path: 'object/put',
qs: { inputenc: enc },
files: buf
}, (err, result) => {
const sendOptions = {
qs: { inputenc: enc }
}

sendOneFile(buf, sendOptions, (err, result) => {
if (err) {
return callback(err)
return callback(err) // early
}

if (Buffer.isBuffer(obj)) {
Expand Down
14 changes: 7 additions & 7 deletions src/object/setData.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
'use strict'

const promisify = require('promisify-es6')
const once = require('once')
const cleanMultihash = require('../utils/clean-multihash')
const SendOneFile = require('../utils/send-one-file')

module.exports = (send) => {
const objectGet = require('./get')(send)
const sendOneFile = SendOneFile(send, 'object/patch/set-data')

return promisify((multihash, data, opts, callback) => {
return promisify((multihash, data, opts, _callback) => {
if (typeof opts === 'function') {
callback = opts
_callback = opts
opts = {}
}
const callback = once(_callback)
if (!opts) {
opts = {}
}
Expand All @@ -21,11 +25,7 @@ module.exports = (send) => {
return callback(err)
}

send({
path: 'object/patch/set-data',
args: [multihash],
files: data
}, (err, result) => {
sendOneFile(data, { args: [multihash] }, (err, result) => {
if (err) {
return callback(err)
}
Expand Down
Loading