Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

[WIP] Replacing wreck with fetch #355

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@
"main": "lib/index.js",
"jsnext:main": "src/index.js",
"scripts": {
"test": "node --max_old_space_size=4096 node_modules/.bin/gulp test:node",
"test": "gulp test",
"test:node": "gulp test:node",
"test:browser": "node --max_old_space_size=4096 node_modules/.bin/gulp test:browser",
"test:browser": "gulp test:browser",
"lint": "aegir-lint",
"build": "gulp build",
"release": "node --max_old_space_size=4096 node_modules/.bin/gulp release",
"release-minor": "node --max_old_space_size=4096 node_modules/.bin/gulp release --type minor",
"release-major": "node --max_old_space_size=4096 node_modules/.bin/gulp release --type major",
"release": "gulp release",
"release-minor": "gulp release --type minor",
"release-major": "gulp release --type major",
"coverage": "gulp coverage",
"coverage-publish": "aegir-coverage publish"
},
"dependencies": {
"arraybuffer-to-buffer": "0.0.4",
"async": "^2.0.1",
"babel-runtime": "^6.11.6",
"bl": "^1.1.2",
Expand All @@ -26,14 +27,19 @@
"glob": "^7.0.5",
"ipfs-merkle-dag": "^0.6.0",
"is-ipfs": "^0.2.0",
"isomorphic-fetch": "^2.2.1",
"isstream": "^0.1.2",
"multiaddr": "^2.0.2",
"multipart-stream": "^2.0.1",
"ndjson": "^1.4.3",
"promisify-es6": "^1.0.1",
"pull-split": "^0.2.0",
"pull-stream": "^3.4.3",
"pull-stream-to-stream": "^1.3.0",
"qs": "^6.2.1",
"stream-to-pull-stream": "^1.7.0",
"streamifier": "^0.1.1",
"tar-stream": "^1.5.2",
"wreck": "^9.0.0"
"to-arraybuffer": "^1.0.1"
},
"engines": {
"node": ">=4.2.2"
Expand Down Expand Up @@ -99,4 +105,4 @@
"url": "https://github.com/ipfs/js-ipfs-api/issues"
},
"homepage": "https://github.com/ipfs/js-ipfs-api"
}
}
11 changes: 3 additions & 8 deletions src/api/log.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
'use strict'

const ndjson = require('ndjson')
const promisify = require('promisify-es6')

module.exports = (send) => {
return {
tail: promisify((callback) => {
send({
path: 'log/tail'
}, (err, response) => {
if (err) {
return callback(err)
}
callback(null, response.pipe(ndjson.parse()))
})
path: 'log/tail',
ndjson: true
}, callback)
})
}
}
4 changes: 2 additions & 2 deletions src/api/object.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const DAGNode = require('ipfs-merkle-dag').DAGNode
const DAGLink = require('ipfs-merkle-dag').DAGLink
const DAGNode = require('ipfs-merkle-dag/lib/dag-node')
const DAGLink = require('ipfs-merkle-dag/lib/dag-link')
const promisify = require('promisify-es6')
const bs58 = require('bs58')
const bl = require('bl')
Expand Down
30 changes: 18 additions & 12 deletions src/api/util/url-add.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
/* globals fetch:false */
'use strict'

const Wreck = require('wreck')
const addToDagNodesTransform = require('./../../add-to-dagnode-transform')
const bufferReturn = require('../../buffer-return')

const promisify = require('promisify-es6')

Expand All @@ -28,16 +29,21 @@ module.exports = (send) => {

const sendWithTransform = send.withTransform(addToDagNodesTransform)

Wreck.request('GET', url, null, (err, res) => {
if (err) {
return callback(err)
}

sendWithTransform({
path: 'add',
qs: opts,
files: res
}, callback)
})
fetch(url)
.then((res) => {
if (!res.ok) {
throw new Error(`Failed to fetch: ${url}`)
}

return bufferReturn(res)
})
.then((content) => {
sendWithTransform({
path: 'add',
qs: opts,
files: content
}, callback)
})
.catch(callback)
})
}
13 changes: 13 additions & 0 deletions src/buffer-return.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
'use strict'

const toBuffer = require('arraybuffer-to-buffer')

// node-fetch has res.buffer
// window.fetch has res.arrayBuffer
module.exports = function bufferReturn (res) {
if (res.buffer) {
return res.buffer()
} else {
return res.arrayBuffer().then(toBuffer)
}
}
2 changes: 1 addition & 1 deletion src/get-dagnode.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const DAGNode = require('ipfs-merkle-dag').DAGNode
const DAGNode = require('ipfs-merkle-dag/lib/dag-node')
const bl = require('bl')
const parallel = require('async/parallel')

Expand Down
1 change: 1 addition & 0 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict'

require('isomorphic-fetch')
const multiaddr = require('multiaddr')
const loadCommands = require('./load-commands')
const getConfig = require('./default-config')
Expand Down
163 changes: 119 additions & 44 deletions src/request-api.js
Original file line number Diff line number Diff line change
@@ -1,69 +1,125 @@
/* globals fetch:false */
'use strict'

const Wreck = require('wreck')
const Qs = require('qs')
const ndjson = require('ndjson')
const getFilesStream = require('./get-files-stream')

const isNode = require('detect-node')
const bl = require('bl')
const toArrayBuffer = require('to-arraybuffer')
const pull = require('pull-stream')
const toStream = require('pull-stream-to-stream')
const toPull = require('stream-to-pull-stream')
const toBuffer = require('arraybuffer-to-buffer')
const split = require('pull-split')

const getFilesStream = require('./get-files-stream')
const bufferReturn = require('./buffer-return')

// -- Internal

function parseChunkedJson (res, cb) {
const parsed = []
res
.pipe(ndjson.parse())
.on('data', (obj) => {
parsed.push(obj)
})
.on('end', () => {
cb(null, parsed)
})
}
function streamReturn (res, ndjson) {
let stream
if (res.body && res.body.getReader) {
// Chrome implements ReadbleStream
const reader = res.body.getReader()
let ended = false
stream = (end, cb) => {
if (end) ended = end
if (ended) {
reader.cancel()
return cb(ended)
}

reader.read()
.then((result) => {
console.log('got result', result)
if (result.done) ended = true
if (ended) return cb(ended)

const val = result.value
cb(null, Buffer.isBuffer(val) ? val : toBuffer(val))
})
.catch((err) => {
ended = err
cb(ended)
})
}
}

// node-fetch has PassThrough stream as body
if (res.body && res.body.readable) {
if (!ndjson) {
return res.body
}

stream = toPull.source(res.body)
}

function onRes (buffer, cb, uri) {
return (err, res) => {
if (err) {
return cb(err)
if (stream) {
if (ndjson) {
return toStream.source(pull(
stream,
split('\n', JSON.parse)
))
} else {
return toStream.source(stream)
}
}

throw new Error('Streaming is not supported by our browser')
}

const stream = Boolean(res.headers['x-stream-output'])
const chunkedObjects = Boolean(res.headers['x-chunked-output'])
const isJson = res.headers['content-type'] &&
res.headers['content-type'].indexOf('application/json') === 0
function onRes (buffer, ndjson) {
return (res) => {
const stream = Boolean(res.headers.get('x-stream-output'))
const chunkedObjects = Boolean(res.headers.get('x-chunked-output'))
const isJson = res.headers.has('content-type') &&
res.headers.get('content-type').indexOf('application/json') === 0

if (res.statusCode >= 400 || !res.statusCode) {
if (!res.ok) {
const error = new Error(`Server responded with ${res.statusCode}`)

return Wreck.read(res, {json: true}, (err, payload) => {
if (err) {
return cb(err)
}
if (payload) {
error.code = payload.Code
error.message = payload.Message || payload.toString()
}
cb(error)
})
return res.json()
.then((payload) => {
if (payload) {
error.code = payload.Code
error.message = payload.Message || payload.toString()
}

throw error
})
}

if (stream && !buffer) {
return cb(null, res)
return streamReturn(res, ndjson)
}

if (chunkedObjects) {
if (isJson) {
return parseChunkedJson(res, cb)
if (!isJson) {
return bufferReturn(res)
}

return Wreck.read(res, null, cb)
return bufferReturn(res)
.then((raw) => {
const parts = raw.toString().split('\n').filter(Boolean)
try {
return parts
.map(JSON.parse)
} catch (err) {
throw err
}
})
}

Wreck.read(res, {json: isJson}, cb)
// Can't use res.json() as it throws on empty responses
return res.text().then((raw) => {
if (raw) {
return JSON.parse(raw)
}
})
}
}

function requestAPI (config, options, callback) {
function requestAPI (config, options) {
options.qs = options.qs || {}

if (Array.isArray(options.files)) {
Expand Down Expand Up @@ -114,14 +170,31 @@ function requestAPI (config, options, callback) {

if (options.files) {
if (!stream.boundary) {
return callback(new Error('No boundary in multipart stream'))
return Promise.reject(new Error('No boundary in multipart stream'))
}

opts.headers['Content-Type'] = `multipart/form-data; boundary=${stream.boundary}`
opts.payload = stream
opts.body = stream
}

return Wreck.request(opts.method, opts.uri, opts, onRes(options.buffer, callback, opts.uri))
return Promise.resolve(opts.body)
.then((body) => {
if (!body || !body.pipe || isNode) return body

return new Promise((resolve, reject) => {
body.pipe(bl((err, buf) => {
if (err) return reject(err)
resolve(toArrayBuffer(buf))
}))
})
})
.then((body) => fetch(opts.uri, {
headers: opts.headers,
method: opts.method,
mode: 'cors',
body: body
}))
.then(onRes(options.buffer, options.ndjson))
}

//
Expand All @@ -142,7 +215,9 @@ exports = module.exports = function getRequestAPI (config) {
return callback(new Error('no options were passed'))
}

return requestAPI(config, options, callback)
return requestAPI(config, options)
.then((res) => callback(null, res))
.catch((err) => callback(err))
}

// Wraps the 'send' function such that an asynchronous
Expand Down
4 changes: 4 additions & 0 deletions test/interface-ipfs-core/log.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ const FactoryClient = require('../factory/factory-client')
const isPhantom = !isNode && typeof navigator !== 'undefined' && navigator.userAgent.match(/PhantomJS/)

if (!isPhantom) {
// How did this work before
// our polyfill didn't have real streaming support
describe('.log', () => {
let ipfs
let fc
Expand All @@ -34,6 +36,7 @@ if (!isPhantom) {
expect(req).to.exist

res.once('data', (obj) => {
res.end()
expect(obj).to.be.an('object')
done()
})
Expand All @@ -45,6 +48,7 @@ if (!isPhantom) {
return ipfs.log.tail()
.then((res) => {
res.once('data', (obj) => {
res.end()
expect(obj).to.be.an('object')
})
})
Expand Down
Loading