Skip to content

Commit

Permalink
fix: report ipfs.add progress over http (#3310)
Browse files Browse the repository at this point in the history
The browser fetch api doesn't allow reading of any data until the whole request has been sent which means progress events only fire after the upload is complete which rather defeats the purpose of reporting upload progress.

Here we switch to XHR for uploads with progress that does allow reading response data before the request is complete.

Co-authored-by: achingbrain <alex@achingbrain.net>
  • Loading branch information
Gozala and achingbrain authored Nov 16, 2020
1 parent c281053 commit 16f754d
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 87 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
"debug": "^4.1.1",
"form-data": "^3.0.0",
"ipfs-core-utils": "^0.5.1",
"ipfs-utils": "^4.0.0",
"ipfs-utils": "^5.0.0",
"ipld-block": "^0.11.0",
"ipld-dag-cbor": "^0.17.0",
"ipld-dag-pb": "^0.20.0",
Expand Down
62 changes: 57 additions & 5 deletions src/add-all.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,20 @@ module.exports = configure((api) => {
* @type {import('.').Implements<typeof import('ipfs-core/src/components/add-all/index')>}
*/
async function * addAll (source, options = {}) {
const progressFn = options.progress

// allow aborting requests on body errors
const controller = new AbortController()
const signal = anySignal([controller.signal, options.signal])
const { headers, body, total, parts } =
await multipartRequest(source, controller, options.headers)

// In browser response body only starts streaming once upload is
// complete, at which point all the progress updates are invalid. If
// length of the content is computable we can interpret progress from
// `{ total, loaded}` passed to `onUploadProgress` and `multipart.total`
// in which case we disable progress updates to be written out.
const [progressFn, onUploadProgress] = typeof options.progress === 'function'
? createProgressHandler(total, parts, options.progress)
: [null, null]

const res = await api.post('add', {
searchParams: toUrlSearchParams({
Expand All @@ -26,10 +35,10 @@ module.exports = configure((api) => {
progress: Boolean(progressFn)
}),
timeout: options.timeout,
onUploadProgress,
signal,
...(
await multipartRequest(source, controller, options.headers)
)
headers,
body
})

for await (let file of res.ndjson()) {
Expand All @@ -45,6 +54,48 @@ module.exports = configure((api) => {
return addAll
})

/**
* Returns simple progress callback when content length isn't computable or a
* progress event handler that calculates progress from upload progress events.
*
* @param {number} total
* @param {{name:string, start:number, end:number}[]|null} parts
* @param {(n:number, name:string) => void} progress
*/
const createProgressHandler = (total, parts, progress) =>
parts ? [null, createOnUploadPrgress(total, parts, progress)] : [progress, null]

/**
* Creates a progress handler that interpolates progress from upload progress
* events and total size of the content that is added.
*
* @param {number} size - actual content size
* @param {{name:string, start:number, end:number}[]} parts
* @param {(n:number, name:string) => void} progress
* @returns {(event:{total:number, loaded: number}) => void}
*/
const createOnUploadPrgress = (size, parts, progress) => {
let index = 0
const count = parts.length
return ({ loaded, total }) => {
// Derive position from the current progress.
const position = Math.floor(loaded / total * size)
while (index < count) {
const { start, end, name } = parts[index]
// If within current part range report progress and break the loop
if (position < end) {
progress(position - start, name)
break
// If passed current part range report final byte for the chunk and
// move to next one.
} else {
progress(end - start, name)
index += 1
}
}
}
}

/**
* @param {any} input
* @returns {UnixFSEntry}
Expand All @@ -67,6 +118,7 @@ function toCoreInterface ({ name, hash, size, mode, mtime, mtimeNsecs }) {
}
}

// @ts-ignore
return output
}

Expand Down
9 changes: 9 additions & 0 deletions src/lib/multipart-request.browser.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
'use strict'

// Import browser version otherwise electron-renderer will end up with node
// version and fail.
const normaliseInput = require('ipfs-core-utils/src/files/normalise-input/index.browser')
const modeToString = require('./mode-to-string')
const mtimeToObject = require('./mtime-to-object')
const { File, FormData } = require('ipfs-utils/src/globalthis')

async function multipartRequest (source = '', abortController, headers = {}) {
const parts = []
const formData = new FormData()
let index = 0
let total = 0

for await (const { content, path, mode, mtime } of normaliseInput(source)) {
let fileSuffix = ''
Expand Down Expand Up @@ -41,6 +45,9 @@ async function multipartRequest (source = '', abortController, headers = {}) {

if (content) {
formData.set(fieldName, content, encodeURIComponent(path))
const end = total + content.size
parts.push({ name: path, start: total, end })
total = end
} else {
formData.set(fieldName, new File([''], encodeURIComponent(path), { type: 'application/x-directory' }))
}
Expand All @@ -49,6 +56,8 @@ async function multipartRequest (source = '', abortController, headers = {}) {
}

return {
total,
parts,
headers,
body: formData
}
Expand Down
85 changes: 4 additions & 81 deletions src/lib/multipart-request.js
Original file line number Diff line number Diff line change
@@ -1,87 +1,10 @@
'use strict'

const normaliseInput = require('ipfs-core-utils/src/files/normalise-input/index')
const { nanoid } = require('nanoid')
const modeToString = require('../lib/mode-to-string')
const mtimeToObject = require('../lib/mtime-to-object')
const merge = require('merge-options').bind({ ignoreUndefined: true })
const toStream = require('it-to-stream')
const { isElectronRenderer } = require('ipfs-utils/src/env')

/**
*
* @param {Object} source
* @param {AbortController} abortController
* @param {Headers|Record<string, string>} [headers]
* @param {string} [boundary]
*/
async function multipartRequest (source = '', abortController, headers = {}, boundary = `-----------------------------${nanoid()}`) {
async function * streamFiles (source) {
try {
let index = 0

for await (const { content, path, mode, mtime } of normaliseInput(source)) {
let fileSuffix = ''
const type = content ? 'file' : 'dir'

if (index > 0) {
yield '\r\n'

fileSuffix = `-${index}`
}

let fieldName = type + fileSuffix
const qs = []

if (mode !== null && mode !== undefined) {
qs.push(`mode=${modeToString(mode)}`)
}

const time = mtimeToObject(mtime)
if (time != null) {
const { secs, nsecs } = time

qs.push(`mtime=${secs}`)

if (nsecs != null) {
qs.push(`mtime-nsecs=${nsecs}`)
}
}

if (qs.length) {
fieldName = `${fieldName}?${qs.join('&')}`
}

yield `--${boundary}\r\n`
yield `Content-Disposition: form-data; name="${fieldName}"; filename="${encodeURIComponent(path)}"\r\n`
yield `Content-Type: ${content ? 'application/octet-stream' : 'application/x-directory'}\r\n`
yield '\r\n'

if (content) {
yield * content
}

index++
}
} catch (err) {
// workaround for https://github.com/node-fetch/node-fetch/issues/753
// @ts-ignore - abort does not expect an arguments
abortController.abort(err)
} finally {
yield `\r\n--${boundary}--\r\n`
}
}

return {
headers: merge(headers, {
'Content-Type': `multipart/form-data; boundary=${boundary}`
}),
body: await toStream(streamFiles(source))
}
}

module.exports = multipartRequest

// In electron-renderer we use native fetch and should encode body using native
// form data.
if (isElectronRenderer) {
module.exports = require('./multipart-request.browser')
} else {
module.exports = require('./multipart-request.node')
}
84 changes: 84 additions & 0 deletions src/lib/multipart-request.node.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
'use strict'

const normaliseInput = require('ipfs-core-utils/src/files/normalise-input')
const { nanoid } = require('nanoid')
const modeToString = require('./mode-to-string')
const mtimeToObject = require('./mtime-to-object')
const merge = require('merge-options').bind({ ignoreUndefined: true })
const toStream = require('it-to-stream')

/**
*
* @param {Object} source
* @param {AbortController} abortController
* @param {Headers|Record<string, string>} [headers]
* @param {string} [boundary]
*/
async function multipartRequest (source = '', abortController, headers = {}, boundary = `-----------------------------${nanoid()}`) {
async function * streamFiles (source) {
try {
let index = 0

for await (const { content, path, mode, mtime } of normaliseInput(source)) {
let fileSuffix = ''
const type = content ? 'file' : 'dir'

if (index > 0) {
yield '\r\n'

fileSuffix = `-${index}`
}

let fieldName = type + fileSuffix
const qs = []

if (mode !== null && mode !== undefined) {
qs.push(`mode=${modeToString(mode)}`)
}

const time = mtimeToObject(mtime)
if (time != null) {
const { secs, nsecs } = time

qs.push(`mtime=${secs}`)

if (nsecs != null) {
qs.push(`mtime-nsecs=${nsecs}`)
}
}

if (qs.length) {
fieldName = `${fieldName}?${qs.join('&')}`
}

yield `--${boundary}\r\n`
yield `Content-Disposition: form-data; name="${fieldName}"; filename="${encodeURIComponent(path)}"\r\n`
yield `Content-Type: ${content ? 'application/octet-stream' : 'application/x-directory'}\r\n`
yield '\r\n'

if (content) {
yield * content
}

index++
}
} catch (err) {
// workaround for https://github.com/node-fetch/node-fetch/issues/753
// @ts-ignore - abort does not expect an arguments
abortController.abort(err)
} finally {
yield `\r\n--${boundary}--\r\n`
}
}

return {
parts: null,
total: -1,
headers: merge(headers, {
'Content-Type': `multipart/form-data; boundary=${boundary}`
}),
body: await toStream(streamFiles(source))
}
}

module.exports = multipartRequest

0 comments on commit 16f754d

Please sign in to comment.