Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
chore: move withTimeoutOption to core-utils (#3407)
Browse files Browse the repository at this point in the history
Pulls non-grpc changes out of #3403 to ease the continued merging of
master into that branch.

- Moves withTimeoutOption into core-utils
- Moves TimeoutError into core-utils
- Adds missing ts project links
- Adds more add-all tests to interface suite
- Ignores unpassable tests for non-grpc or core implementations
- Normalises mode and mtime in normalise-input function
- Dedupes mtime normalisation between core and http client
  • Loading branch information
achingbrain authored Nov 18, 2020
1 parent d38b0ab commit 288a259
Show file tree
Hide file tree
Showing 98 changed files with 425 additions and 281 deletions.
1 change: 1 addition & 0 deletions packages/interface-ipfs-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"is-ipfs": "^2.0.0",
"iso-random-stream": "^1.1.1",
"it-all": "^1.0.4",
"it-buffer-stream": "^1.0.5",
"it-concat": "^1.0.1",
"it-drain": "^1.0.3",
"it-last": "^1.0.4",
Expand Down
80 changes: 80 additions & 0 deletions packages/interface-ipfs-core/src/add-all.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const { isNode } = require('ipfs-utils/src/env')
const { getDescribe, getIt, expect } = require('./utils/mocha')
const testTimeout = require('./utils/test-timeout')
const uint8ArrayFromString = require('uint8arrays/from-string')
const bufferStream = require('it-buffer-stream')

/** @typedef { import("ipfsd-ctl/src/factory") } Factory */
/**
Expand Down Expand Up @@ -420,5 +421,84 @@ module.exports = (common, options) => {
expect(files[0].cid.codec).to.equal('dag-pb')
expect(files[0].size).to.equal(18)
})

it('should support bidirectional streaming', async function () {
let progressInvoked

const handler = (bytes, path) => {
progressInvoked = true
}

const source = async function * () {
yield {
content: 'hello',
path: '/file'
}

await new Promise((resolve) => {
const interval = setInterval(() => {
// we've received a progress result, that means we've received some
// data from the server before we're done sending data to the server
// so the streaming is bidirectional and we can finish up
if (progressInvoked) {
clearInterval(interval)
resolve()
}
}, 10)
})
}

await drain(ipfs.addAll(source(), {
progress: handler,
fileImportConcurrency: 1
}))

expect(progressInvoked).to.be.true()
})

it('should error during add-all stream', async function () {
const source = async function * () {
yield {
content: 'hello',
path: '/file'
}

yield {
content: 'hello',
path: '/file'
}
}

await expect(drain(ipfs.addAll(source(), {
fileImportConcurrency: 1,
chunker: 'rabin-2048--50' // invalid chunker parameters, validated after the stream starts moving
}))).to.eventually.be.rejectedWith(/Chunker parameter avg must be an integer/)
})

it('should add big files', async function () {
const totalSize = 1024 * 1024 * 200
const chunkSize = 1024 * 1024 * 99

const source = async function * () {
yield {
path: '/dir/file-200mb-1',
content: bufferStream(totalSize, {
chunkSize
})
}

yield {
path: '/dir/file-200mb-2',
content: bufferStream(totalSize, {
chunkSize
})
}
}

const results = await all(ipfs.addAll(source()))

expect(await ipfs.files.stat(`/ipfs/${results[0].cid}`)).to.have.property('size', totalSize)
expect(await ipfs.files.stat(`/ipfs/${results[1].cid}`)).to.have.property('size', totalSize)
})
})
}
7 changes: 5 additions & 2 deletions packages/ipfs-cli/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@
],
"references": [
{
"path": "../ipfs-core-utils"
"path": "../ipfs-core"
},
{
"path": "../ipfs-core"
"path": "../ipfs-core-utils"
},
{
"path": "../ipfs-http-client"
},
{
"path": "../ipfs-http-gateway"
},
{
"path": "../ipfs-http-server"
}
Expand Down
3 changes: 3 additions & 0 deletions packages/ipfs-core-utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
},
"license": "MIT",
"dependencies": {
"any-signal": "^2.0.0",
"blob-to-it": "^1.0.1",
"browser-readablestream-to-it": "^1.0.1",
"cids": "^1.0.0",
Expand All @@ -48,6 +49,8 @@
"it-peekable": "^1.0.1",
"multiaddr": "^8.0.0",
"multiaddr-to-uri": "^6.0.0",
"parse-duration": "^0.4.4",
"timeout-abort-controller": "^1.1.1",
"uint8arrays": "^1.1.0"
},
"devDependencies": {
Expand Down
12 changes: 12 additions & 0 deletions packages/ipfs-core-utils/src/errors.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
'use strict'

class TimeoutError extends Error {
constructor (message = 'request timed out') {
super(message)
this.name = 'TimeoutError'
this.code = TimeoutError.code
}
}

TimeoutError.code = 'ERR_TIMEOUT'
exports.TimeoutError = TimeoutError
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ const {
isBytes,
isBlob,
isReadableStream,
isFileObject
isFileObject,
mtimeToObject,
modeToNumber
} = require('./utils')

// eslint-disable-next-line complexity
Expand Down Expand Up @@ -105,7 +107,8 @@ module.exports = async function * normaliseInput (input, normaliseContent) {
async function toFileObject (input, normaliseContent) {
// @ts-ignore - Those properties don't exist on most input types
const { path, mode, mtime, content } = input
const file = { path: path || '', mode, mtime }

const file = { path: path || '', mode: modeToNumber(mode), mtime: mtimeToObject(mtime) }

if (content) {
file.content = await normaliseContent(content)
Expand Down
87 changes: 86 additions & 1 deletion packages/ipfs-core-utils/src/files/normalise-input/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,94 @@ function isFileObject (obj) {
const isReadableStream = (value) =>
value && typeof value.getReader === 'function'

/**
* @param {any} mtime
* @returns {{secs:number, nsecs:number}|undefined}
*/
function mtimeToObject (mtime) {
if (mtime == null) {
return undefined
}

// Javascript Date
if (mtime instanceof Date) {
const ms = mtime.getTime()
const secs = Math.floor(ms / 1000)

return {
secs: secs,
nsecs: (ms - (secs * 1000)) * 1000
}
}

// { secs, nsecs }
if (Object.prototype.hasOwnProperty.call(mtime, 'secs')) {
return {
secs: mtime.secs,
nsecs: mtime.nsecs
}
}

// UnixFS TimeSpec
if (Object.prototype.hasOwnProperty.call(mtime, 'Seconds')) {
return {
secs: mtime.Seconds,
nsecs: mtime.FractionalNanoseconds
}
}

// process.hrtime()
if (Array.isArray(mtime)) {
return {
secs: mtime[0],
nsecs: mtime[1]
}
}
/*
TODO: https://github.com/ipfs/aegir/issues/487
// process.hrtime.bigint()
if (typeof mtime === 'bigint') {
const secs = mtime / BigInt(1e9)
const nsecs = mtime - (secs * BigInt(1e9))
return {
secs: parseInt(secs),
nsecs: parseInt(nsecs)
}
}
*/
}

/**
* @param {any} mode
* @returns {number|undefined}
*/
function modeToNumber (mode) {
if (mode == null) {
return undefined
}

if (typeof mode === 'number') {
return mode
}

mode = mode.toString()

if (mode.substring(0, 1) === '0') {
// octal string
return parseInt(mode, 8)
}

// decimal string
return parseInt(mode, 10)
}

module.exports = {
isBytes,
isBlob,
isFileObject,
isReadableStream
isReadableStream,
mtimeToObject,
modeToNumber
}
6 changes: 6 additions & 0 deletions packages/ipfs-core-utils/src/index.js
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
'use strict'

/**
* @template {any[]} ARGS
* @template R
* @typedef {(...args: ARGS) => R} Fn
*/
106 changes: 106 additions & 0 deletions packages/ipfs-core-utils/src/with-timeout-option.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/* eslint-disable no-unreachable */
'use strict'

const TimeoutController = require('timeout-abort-controller')
const { anySignal } = require('any-signal')
const parseDuration = require('parse-duration').default
const { TimeoutError } = require('./errors')

/**
* @template {any[]} ARGS
* @template {Promise<any> | AsyncIterable<any>} R - The return type of `fn`
* @param {Fn<ARGS, R>} fn
* @param {number} [optionsArgIndex]
* @returns {Fn<ARGS, R>}
*/
function withTimeoutOption (fn, optionsArgIndex) {
// eslint-disable-next-line
return /** @returns {R} */(/** @type {ARGS} */...args) => {
const options = args[optionsArgIndex == null ? args.length - 1 : optionsArgIndex]
if (!options || !options.timeout) return fn(...args)

const timeout = typeof options.timeout === 'string'
? parseDuration(options.timeout)
: options.timeout

const controller = new TimeoutController(timeout)

options.signal = anySignal([options.signal, controller.signal])

const fnRes = fn(...args)
// eslint-disable-next-line promise/param-names
const timeoutPromise = new Promise((_resolve, reject) => {
controller.signal.addEventListener('abort', () => {
reject(new TimeoutError())
})
})

const start = Date.now()

const maybeThrowTimeoutError = () => {
if (controller.signal.aborted) {
throw new TimeoutError()
}

const timeTaken = Date.now() - start

// if we have starved the event loop by adding microtasks, we could have
// timed out already but the TimeoutController will never know because it's
// setTimeout will not fire until we stop adding microtasks
if (timeTaken > timeout) {
controller.abort()
throw new TimeoutError()
}
}

if (fnRes[Symbol.asyncIterator]) {
// @ts-ignore
return (async function * () {
const it = fnRes[Symbol.asyncIterator]()

try {
while (true) {
const { value, done } = await Promise.race([it.next(), timeoutPromise])

if (done) {
break
}

maybeThrowTimeoutError()

yield value
}
} catch (err) {
maybeThrowTimeoutError()

throw err
} finally {
controller.clear()

if (it.return) {
it.return()
}
}
})()
}

// @ts-ignore
return (async () => {
try {
const res = await Promise.race([fnRes, timeoutPromise])

maybeThrowTimeoutError()

return res
} catch (err) {
maybeThrowTimeoutError()

throw err
} finally {
controller.clear()
}
})()
}
}

module.exports = withTimeoutOption
2 changes: 0 additions & 2 deletions packages/ipfs-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
"dep-check": "aegir dep-check -i typescript -i interface-ipfs-core"
},
"dependencies": {
"any-signal": "^2.0.0",
"array-shuffle": "^1.0.1",
"bignumber.js": "^9.0.0",
"cbor": "^5.1.0",
Expand Down Expand Up @@ -116,7 +115,6 @@
"parse-duration": "^0.4.4",
"peer-id": "^0.14.1",
"streaming-iterables": "^5.0.2",
"timeout-abort-controller": "^1.1.1",
"uint8arrays": "^1.1.0"
},
"devDependencies": {
Expand Down
Loading

0 comments on commit 288a259

Please sign in to comment.