Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
feat: EXPERIMENTAL ipfsx API - boot procedure and add API method
Browse files Browse the repository at this point in the history
This PR allows ipfsx to be used by calling `IPFS.create(options)` with `{ EXPERIMENTAL: { ipfsx: true } }` options.

It adds a single API method `add` that returns an iterator that yields objects of the form `{ cid, path, size }`. The iterator is decorated with a `first` and `last` function so users can conveniently `await` on the first or last item to be yielded as per the [proposal here](https://github.com/ipfs-shipyard/ipfsx/blob/master/API.md#add).

In order to boot up a new ipfsx node I refactored the boot procedure to enable the following:

1. **Remove the big stateful blob "`self`" - components are passed just the dependencies they need to operate.** Right now it is opaque as to which components require which parts of an IPFS node without inspecting the entirety of the component's code. This change makes it easier to look at a component and know what aspects of the IPFS stack it uses and consequently allows us to understand which APIs should be available at which points of the node's lifecycle. It makes the code easier to understand, more maintainable and easier to mock dependencies for unit tests.
1. **Restrict APIs to appropriate lifecycle stage(s).** This PR introduces an `ApiManager` that allows us to update the API that is exposed at any given point. It allows us to (for example) disallow `ipfs.add` before the node is initialized or access `libp2p` before the node is started. The lifecycle methods `init`, `start` and `stop` each define which API methods are available after they have run avoiding having to put boilerplate in every method to check if it can be called when the node is in a particular state. See #1438
1. **Safer and more flexible API usage.** The `ApiManager` allows us to temporarily change APIs to stop `init` from being called again while it is already running and has the facility to rollback to the previous API state if an operation fails. It also enables piggybacking so we don't attempt 2 or more concurrent start/stop calls at once. See #1061 #2257
1. **Enable config changes at runtime.** Having an API that can be updated during a node's lifecycle will enable this feature in the future.

**FEEDBACK REQUIRED**: The changes I've made here are a little...racy. They have a bunch of benefits, as I've outlined above but the `ApiManager` is implemented as a `Proxy`, allowing us to swap out the underlying API at will. How do y'all feel about that? Is there a better way or got a suggestion?

resolves #1438
resolves #1061
resolves #2257
refs #2509
refs #1670

License: MIT
Signed-off-by: Alan Shaw <alan.shaw@protocol.ai>
  • Loading branch information
Alan Shaw committed Oct 17, 2019
1 parent 069bf73 commit 30af67a
Show file tree
Hide file tree
Showing 19 changed files with 910 additions and 227 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"main": "src/core/index.js",
"browser": {
"./src/core/components/init-assets.js": false,
"./src/core/runtime/init-assets-nodejs.js": "./src/core/runtime/init-assets-browser.js",
"./src/core/runtime/add-from-fs-nodejs.js": "./src/core/runtime/add-from-fs-browser.js",
"./src/core/runtime/config-nodejs.js": "./src/core/runtime/config-browser.js",
"./src/core/runtime/dns-nodejs.js": "./src/core/runtime/dns-browser.js",
Expand Down
21 changes: 21 additions & 0 deletions src/core/api-manager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
module.exports = class ApiManager {
constructor () {
this._api = {}
this._onUndef = () => undefined
this.api = new Proxy({}, {
get (target, prop) {
return target[prop] === undefined
? this._onUndef(prop)
: target[prop]
}
})
}

update (nextApi, onUndef) {
const prevApi = this._api
const prevUndef = this._onUndef
this._api = nextApi
if (onUndef) this._onUndef = onUndef
return { cancel: () => this.update(prevApi, prevUndef), api: this.api }
}
}
115 changes: 115 additions & 0 deletions src/core/components-ipfsx/add/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
'use strict'

const importer = require('ipfs-unixfs-importer')
const normaliseAddInput = require('ipfs-utils/src/files/normalise-input')
const { parseChunkerString } = require('./utils')
const pipe = require('it-pipe')
const { withFirstAndLast } = require('../../utils')

module.exports = ({ ipld, dag, gcLock, preload, pin, constructorOptions }) => {
return withFirstAndLast(async function * add (source, options) {
options = options || {}

const opts = {
shardSplitThreshold: constructorOptions.EXPERIMENTAL.sharding ? 1000 : Infinity,
...options,
...parseChunkerString(options.chunker)
}

// CID v0 is for multihashes encoded with sha2-256
if (opts.hashAlg && opts.cidVersion !== 1) {
opts.cidVersion = 1
}

if (opts.progress) {
let total = 0
const prog = opts.progress

opts.progress = (bytes) => {
total += bytes
prog(total)
}
}

const iterator = pipe(
normaliseAddInput(source),
source => importer(source, ipld, opts),
transformFile(dag, opts),
preloadFile(preload, opts),
pinFile(pin, opts)
)

const releaseLock = await gcLock.readLock()

try {
yield * iterator
} finally {
releaseLock()
}
})
}

function transformFile (dag, opts) {
return async function * (source) {
for await (const { cid, path, unixfs } of source) {
if (opts.onlyHash) {
yield {
cid,
path: path || cid.toString(),
size: unixfs.fileSize()
}

return
}

const node = await dag.get(cid, { ...opts, preload: false })

yield {
cid,
path: path || cid.toString(),
size: Buffer.isBuffer(node) ? node.length : node.size
}
}
}
}

function preloadFile (preload, opts) {
return async function * (source) {
for await (const file of source) {
const isRootFile = !file.path || opts.wrapWithDirectory
? file.path === ''
: !file.path.includes('/')

const shouldPreload = isRootFile && !opts.onlyHash && opts.preload !== false

if (shouldPreload) {
preload(file.hash)
}

yield file
}
}
}

function pinFile (pin, opts) {
return async function * (source) {
for await (const file of source) {
// Pin a file if it is the root dir of a recursive add or the single file
// of a direct add.
const pin = 'pin' in opts ? opts.pin : true
const isRootDir = !file.path.includes('/')
const shouldPin = pin && isRootDir && !opts.onlyHash

if (shouldPin) {
// Note: addAsyncIterator() has already taken a GC lock, so tell
// pin.add() not to take a (second) GC lock
await pin.add(file.hash, {
preload: false,
lock: false
})
}

yield file
}
}
}
87 changes: 87 additions & 0 deletions src/core/components-ipfsx/add/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
'use strict'

/**
* Parses chunker string into options used by DAGBuilder in ipfs-unixfs-engine
*
*
* @param {String} chunker Chunker algorithm supported formats:
* "size-{size}"
* "rabin"
* "rabin-{avg}"
* "rabin-{min}-{avg}-{max}"
*
* @return {Object} Chunker options for DAGBuilder
*/
const parseChunkerString = (chunker) => {
if (!chunker) {
return {
chunker: 'fixed'
}
} else if (chunker.startsWith('size-')) {
const sizeStr = chunker.split('-')[1]
const size = parseInt(sizeStr)
if (isNaN(size)) {
throw new Error('Chunker parameter size must be an integer')
}
return {
chunker: 'fixed',
chunkerOptions: {
maxChunkSize: size
}
}
} else if (chunker.startsWith('rabin')) {
return {
chunker: 'rabin',
chunkerOptions: parseRabinString(chunker)
}
} else {
throw new Error(`Unrecognized chunker option: ${chunker}`)
}
}

/**
* Parses rabin chunker string
*
* @param {String} chunker Chunker algorithm supported formats:
* "rabin"
* "rabin-{avg}"
* "rabin-{min}-{avg}-{max}"
*
* @return {Object} rabin chunker options
*/
const parseRabinString = (chunker) => {
const options = {}
const parts = chunker.split('-')
switch (parts.length) {
case 1:
options.avgChunkSize = 262144
break
case 2:
options.avgChunkSize = parseChunkSize(parts[1], 'avg')
break
case 4:
options.minChunkSize = parseChunkSize(parts[1], 'min')
options.avgChunkSize = parseChunkSize(parts[2], 'avg')
options.maxChunkSize = parseChunkSize(parts[3], 'max')
break
default:
throw new Error('Incorrect chunker format (expected "rabin" "rabin-[avg]" or "rabin-[min]-[avg]-[max]"')
}

return options
}

const parseChunkSize = (str, name) => {
const size = parseInt(str)
if (isNaN(size)) {
throw new Error(`Chunker parameter ${name} must be an integer`)
}

return size
}

module.exports = {
parseChunkSize,
parseRabinString,
parseChunkerString
}
15 changes: 15 additions & 0 deletions src/core/components-ipfsx/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
'use strict'

module.exports = {
add: require('./add'),
init: require('./init'),
start: require('./start'),
stop: require('./stop'),
legacy: {
config: require('../components/config'),
dag: require('../components/dag'),
libp2p: require('../components/libp2p'),
object: require('../components/object'),
pin: require('../components/pin')
}
}
Loading

0 comments on commit 30af67a

Please sign in to comment.