diff --git a/docs/core-api/DAG.md b/docs/core-api/DAG.md index 559065889c..2b997b22c5 100644 --- a/docs/core-api/DAG.md +++ b/docs/core-api/DAG.md @@ -27,6 +27,46 @@ _Explore the DAG API through interactive coding challenges in our ProtoSchool tu - _[P2P data links with content addressing](https://proto.school/#/basics/) (beginner)_ - _[Blogging on the Decentralized Web](https://proto.school/#/blog/) (intermediate)_ +## `ipfs.dag.export(cid, [options])` + +> Returns a stream of Uint8Arrays that make up a [CAR file][] + +Exports a CAR for the entire DAG available from the given root CID. The CAR will have a single +root and IPFS will attempt to fetch and bundle all blocks that are linked within the connected +DAG. + +### Parameters + +| Name | Type | Description | +| ---- | ---- | ----------- | +| cid | [CID][] | The root CID of the DAG we wish to export | + +### Options + +An optional object which may have the following keys: + +| Name | Type | Default | Description | +| ---- | ---- | ------- | ----------- | +| timeout | `Number` | `undefined` | A timeout in ms | +| signal | [AbortSignal][] | `undefined` | Can be used to cancel any long running requests started as a result of this call | + +### Returns + +| Type | Description | +| -------- | -------- | +| `AsyncIterable` | A stream containing the car file bytes | + +### Example + +```JavaScript +const { Readable } = require('stream') + +const out = await ipfs.dag.export(cid) + +Readable.from(out).pipe(fs.createWriteStream('example.car')) +``` + +A great source of [examples][] can be found in the tests for this API. ## `ipfs.dag.put(dagNode, [options])` > Store an IPLD format node @@ -146,6 +186,48 @@ await getAndLog(cid, '/c/ca/1') A great source of [examples][] can be found in the tests for this API. +## `ipfs.dag.import(source, [options])` + +> Adds one or more [CAR file][]s full of blocks to the repo for this node + +Import all blocks from one or more CARs and optionally recursively pin the roots identified +within the CARs. + +### Parameters + +| Name | Type | Description | +| ---- | ---- | ----------- | +| sources | `AsyncIterable` | `AsyncIterable>` | One or more [CAR file][] streams | + +### Options + +An optional object which may have the following keys: + +| Name | Type | Default | Description | +| ---- | ---- | ------- | ----------- | +| pinRoots | `boolean` | `true` | Whether to recursively pin each root to the blockstore | +| timeout | `Number` | `undefined` | A timeout in ms | +| signal | [AbortSignal][] | `undefined` | Can be used to cancel any long running requests started as a result of this call | + +### Returns + +| Type | Description | +| -------- | -------- | +| `AsyncIterable<{ cid: CID, pinErrorMsg?: string }>` | A stream containing the result of importing the car file(s) | + +### Example + +```JavaScript +const fs = require('fs') + +for await (const result of ipfs.dag.import(fs.createReadStream('./path/to/archive.car'))) { + console.info(result) + // Qmfoo +} +``` + +A great source of [examples][] can be found in the tests for this API. + ## `ipfs.dag.tree(cid, [options])` > Enumerate all the entries in a graph @@ -262,7 +344,7 @@ console.log(result) A great source of [examples][] can be found in the tests for this API. - [examples]: https://github.com/ipfs/js-ipfs/blob/master/packages/interface-ipfs-core/src/dag [cid]: https://www.npmjs.com/package/cids [AbortSignal]: https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal +[CAR file]: https://ipld.io/specs/transport/car/ diff --git a/examples/custom-ipfs-repo/package.json b/examples/custom-ipfs-repo/package.json index 946a458929..a7b6b61ab5 100644 --- a/examples/custom-ipfs-repo/package.json +++ b/examples/custom-ipfs-repo/package.json @@ -13,9 +13,9 @@ "@ipld/dag-cbor": "^6.0.5", "@ipld/dag-pb": "^2.1.3", "blockstore-datastore-adapter": "^1.0.0", - "datastore-fs": "^5.0.1", + "datastore-fs": "^5.0.2", "ipfs": "^0.55.4", - "ipfs-repo": "^11.0.0", + "ipfs-repo": "^11.0.1", "it-all": "^1.0.4", "multiformats": "^9.4.1" }, diff --git a/packages/interface-ipfs-core/package.json b/packages/interface-ipfs-core/package.json index d8b120c182..a76c243b95 100644 --- a/packages/interface-ipfs-core/package.json +++ b/packages/interface-ipfs-core/package.json @@ -36,6 +36,7 @@ ] }, "dependencies": { + "@ipld/car": "^3.1.6", "@ipld/dag-cbor": "^6.0.5", "@ipld/dag-pb": "^2.1.3", "abort-controller": "^3.0.0", @@ -56,7 +57,8 @@ "it-first": "^1.0.4", "it-last": "^1.0.4", "it-map": "^1.0.4", - "it-pushable": "^1.4.0", + "it-pushable": "^1.4.2", + "it-to-buffer": "^2.0.0", "libp2p-crypto": "^0.19.6", "libp2p-websockets": "^0.16.1", "multiaddr": "^10.0.0", diff --git a/packages/interface-ipfs-core/src/dag/export.js b/packages/interface-ipfs-core/src/dag/export.js new file mode 100644 index 0000000000..b1ff5fb299 --- /dev/null +++ b/packages/interface-ipfs-core/src/dag/export.js @@ -0,0 +1,90 @@ +/* eslint-env mocha */ +'use strict' + +const all = require('it-all') +const { getDescribe, getIt, expect } = require('../utils/mocha') +const { CarReader } = require('@ipld/car') +const uint8ArrayFromString = require('uint8arrays/from-string') +const dagPb = require('@ipld/dag-pb') +const dagCbor = require('@ipld/dag-cbor') +const loadFixture = require('aegir/utils/fixtures') +const toBuffer = require('it-to-buffer') + +/** @typedef { import("ipfsd-ctl/src/factory") } Factory */ +/** + * @param {Factory} common + * @param {Object} options + */ +module.exports = (common, options) => { + const describe = getDescribe(options) + const it = getIt(options) + + describe('.dag.export', () => { + let ipfs + before(async () => { + ipfs = (await common.spawn()).api + }) + + after(() => common.clean()) + + it('should export a car file', async () => { + const child = dagPb.encode({ + Data: uint8ArrayFromString('block-' + Math.random()), + Links: [] + }) + const childCid = await ipfs.block.put(child, { + format: 'dag-pb', + version: 0 + }) + const parent = dagPb.encode({ + Links: [{ + Hash: childCid, + Tsize: child.length, + Name: '' + }] + }) + const parentCid = await ipfs.block.put(parent, { + format: 'dag-pb', + version: 0 + }) + const grandParent = dagCbor.encode({ + parent: parentCid + }) + const grandParentCid = await await ipfs.block.put(grandParent, { + format: 'dag-cbor', + version: 1 + }) + + const expectedCids = [ + grandParentCid, + parentCid, + childCid + ] + + const reader = await CarReader.fromIterable(ipfs.dag.export(grandParentCid)) + const cids = await all(reader.cids()) + + expect(cids).to.deep.equal(expectedCids) + }) + + it('export of shuffled devnet export identical to canonical original', async function () { + this.timeout(360000) + + const input = loadFixture('test/fixtures/car/lotus_devnet_genesis.car', 'interface-ipfs-core') + const result = await all(ipfs.dag.import(async function * () { yield input }())) + const exported = await toBuffer(ipfs.dag.export(result[0].root.cid)) + + expect(exported).to.equalBytes(input) + }) + + it('export of shuffled testnet export identical to canonical original', async function () { + this.timeout(360000) + + const input = loadFixture('test/fixtures/car/lotus_testnet_export_128.car', 'interface-ipfs-core') + const result = await all(ipfs.dag.import(async function * () { yield input }())) + const exported = await toBuffer(ipfs.dag.export(result[0].root.cid)) + + expect(exported).to.equalBytes(input) + }) + }) +} diff --git a/packages/interface-ipfs-core/src/dag/import.js b/packages/interface-ipfs-core/src/dag/import.js new file mode 100644 index 0000000000..3c13bd3b5f --- /dev/null +++ b/packages/interface-ipfs-core/src/dag/import.js @@ -0,0 +1,156 @@ +/* eslint-env mocha */ +'use strict' + +const all = require('it-all') +const drain = require('it-drain') +const { CID } = require('multiformats/cid') +const { sha256 } = require('multiformats/hashes/sha2') +const { getDescribe, getIt, expect } = require('../utils/mocha') +const { CarWriter, CarReader } = require('@ipld/car') +const raw = require('multiformats/codecs/raw') +const uint8ArrayFromString = require('uint8arrays/from-string') +const loadFixture = require('aegir/utils/fixtures') + +/** + * + * @param {number} num + */ +async function createBlocks (num) { + const blocks = [] + + for (let i = 0; i < num; i++) { + const bytes = uint8ArrayFromString('block-' + Math.random()) + const digest = await sha256.digest(raw.encode(bytes)) + const cid = CID.create(1, raw.code, digest) + + blocks.push({ bytes, cid }) + } + + return blocks +} + +/** + * @param {{ cid: CID, bytes: Uint8Array }[]} blocks + * @returns {AsyncIterable} + */ +async function createCar (blocks) { + const rootBlock = blocks[0] + const { writer, out } = await CarWriter.create([rootBlock.cid]) + + writer.put(rootBlock) + .then(async () => { + for (const block of blocks.slice(1)) { + writer.put(block) + } + + await writer.close() + }) + + return out +} + +/** @typedef { import("ipfsd-ctl/src/factory") } Factory */ +/** + * @param {Factory} common + * @param {Object} options + */ +module.exports = (common, options) => { + const describe = getDescribe(options) + const it = getIt(options) + + describe('.dag.import', () => { + let ipfs + before(async () => { + ipfs = (await common.spawn()).api + }) + + after(() => common.clean()) + + it('should import a car file', async () => { + const blocks = await createBlocks(5) + const car = await createCar(blocks) + + const result = await all(ipfs.dag.import(car)) + expect(result).to.have.lengthOf(1) + expect(result).to.have.nested.deep.property('[0].root.cid', blocks[0].cid) + + for (const { cid } of blocks) { + await expect(ipfs.block.get(cid)).to.eventually.be.ok() + } + + await expect(all(ipfs.pin.ls({ paths: blocks[0].cid }))).to.eventually.have.lengthOf(1) + .and.have.nested.property('[0].type', 'recursive') + }) + + it('should import a car file without pinning the roots', async () => { + const blocks = await createBlocks(5) + const car = await createCar(blocks) + + await all(ipfs.dag.import(car, { + pinRoots: false + })) + + await expect(all(ipfs.pin.ls({ paths: blocks[0].cid }))).to.eventually.be.rejectedWith(/is not pinned/) + }) + + it('should import multiple car files', async () => { + const blocks1 = await createBlocks(5) + const car1 = await createCar(blocks1) + + const blocks2 = await createBlocks(5) + const car2 = await createCar(blocks2) + + const result = await all(ipfs.dag.import([car1, car2])) + expect(result).to.have.lengthOf(2) + expect(result).to.deep.include({ root: { cid: blocks1[0].cid, pinErrorMsg: '' } }) + expect(result).to.deep.include({ root: { cid: blocks2[0].cid, pinErrorMsg: '' } }) + + for (const { cid } of blocks1) { + await expect(ipfs.block.get(cid)).to.eventually.be.ok() + } + + for (const { cid } of blocks2) { + await expect(ipfs.block.get(cid)).to.eventually.be.ok() + } + }) + + it('should import car with roots but no blocks', async () => { + const input = loadFixture('test/fixtures/car/combined_naked_roots_genesis_and_128.car', 'interface-ipfs-core') + const reader = await CarReader.fromBytes(input) + const cids = await reader.getRoots() + + expect(cids).to.have.lengthOf(2) + + // naked roots car does not contain blocks + const result1 = await all(ipfs.dag.import(async function * () { yield input }())) + expect(result1).to.deep.include({ root: { cid: cids[0], pinErrorMsg: 'blockstore: block not found' } }) + expect(result1).to.deep.include({ root: { cid: cids[1], pinErrorMsg: 'blockstore: block not found' } }) + + await drain(ipfs.dag.import(async function * () { yield loadFixture('test/fixtures/car/lotus_devnet_genesis_shuffled_nulroot.car', 'interface-ipfs-core') }())) + + // have some of the blocks now, should be able to pin one root + const result2 = await all(ipfs.dag.import(async function * () { yield input }())) + expect(result2).to.deep.include({ root: { cid: cids[0], pinErrorMsg: '' } }) + expect(result2).to.deep.include({ root: { cid: cids[1], pinErrorMsg: 'blockstore: block not found' } }) + + await drain(ipfs.dag.import(async function * () { yield loadFixture('test/fixtures/car/lotus_testnet_export_128.car', 'interface-ipfs-core') }())) + + // have all of the blocks now, should be able to pin both + const result3 = await all(ipfs.dag.import(async function * () { yield input }())) + expect(result3).to.deep.include({ root: { cid: cids[0], pinErrorMsg: '' } }) + expect(result3).to.deep.include({ root: { cid: cids[1], pinErrorMsg: '' } }) + }) + + it('should import lotus devnet genesis shuffled nulroot', async () => { + const input = loadFixture('test/fixtures/car/lotus_devnet_genesis_shuffled_nulroot.car', 'interface-ipfs-core') + const reader = await CarReader.fromBytes(input) + const cids = await reader.getRoots() + + expect(cids).to.have.lengthOf(1) + expect(cids[0].toString()).to.equal('bafkqaaa') + + const result = await all(ipfs.dag.import(async function * () { yield input }())) + expect(result).to.have.nested.deep.property('[0].root.cid', cids[0]) + }) + }) +} diff --git a/packages/interface-ipfs-core/src/dag/index.js b/packages/interface-ipfs-core/src/dag/index.js index 097678e07f..18b62bd6b5 100644 --- a/packages/interface-ipfs-core/src/dag/index.js +++ b/packages/interface-ipfs-core/src/dag/index.js @@ -2,8 +2,10 @@ const { createSuite } = require('../utils/suite') const tests = { + export: require('./export'), get: require('./get'), put: require('./put'), + import: require('./import'), resolve: require('./resolve') } diff --git a/packages/interface-ipfs-core/test/fixtures/car/combined_naked_roots_genesis_and_128.car b/packages/interface-ipfs-core/test/fixtures/car/combined_naked_roots_genesis_and_128.car new file mode 100644 index 0000000000..20ded7ea83 Binary files /dev/null and b/packages/interface-ipfs-core/test/fixtures/car/combined_naked_roots_genesis_and_128.car differ diff --git a/packages/interface-ipfs-core/test/fixtures/car/lotus_devnet_genesis.car b/packages/interface-ipfs-core/test/fixtures/car/lotus_devnet_genesis.car new file mode 100644 index 0000000000..ef2fe2daba Binary files /dev/null and b/packages/interface-ipfs-core/test/fixtures/car/lotus_devnet_genesis.car differ diff --git a/packages/interface-ipfs-core/test/fixtures/car/lotus_devnet_genesis_shuffled_nulroot.car b/packages/interface-ipfs-core/test/fixtures/car/lotus_devnet_genesis_shuffled_nulroot.car new file mode 100644 index 0000000000..9cd8f03721 Binary files /dev/null and b/packages/interface-ipfs-core/test/fixtures/car/lotus_devnet_genesis_shuffled_nulroot.car differ diff --git a/packages/interface-ipfs-core/test/fixtures/car/lotus_testnet_export_128.car b/packages/interface-ipfs-core/test/fixtures/car/lotus_testnet_export_128.car new file mode 100644 index 0000000000..47a61c8c2a Binary files /dev/null and b/packages/interface-ipfs-core/test/fixtures/car/lotus_testnet_export_128.car differ diff --git a/packages/interface-ipfs-core/test/fixtures/car/lotus_testnet_export_256_multiroot.car b/packages/interface-ipfs-core/test/fixtures/car/lotus_testnet_export_256_multiroot.car new file mode 100644 index 0000000000..8ff8971d59 Binary files /dev/null and b/packages/interface-ipfs-core/test/fixtures/car/lotus_testnet_export_256_multiroot.car differ diff --git a/packages/ipfs-cli/package.json b/packages/ipfs-cli/package.json index 13a8fc6192..597bf01c1a 100644 --- a/packages/ipfs-cli/package.json +++ b/packages/ipfs-cli/package.json @@ -42,7 +42,7 @@ "ipfs-core-utils": "^0.8.3", "ipfs-daemon": "^0.7.2", "ipfs-http-client": "^50.1.2", - "ipfs-repo": "^11.0.0", + "ipfs-repo": "^11.0.1", "ipfs-utils": "^8.1.4", "it-all": "^1.0.4", "it-concat": "^2.0.0", diff --git a/packages/ipfs-cli/src/commands/dag/export.js b/packages/ipfs-cli/src/commands/dag/export.js new file mode 100644 index 0000000000..d77e5a0ed9 --- /dev/null +++ b/packages/ipfs-cli/src/commands/dag/export.js @@ -0,0 +1,37 @@ +'use strict' + +const { default: parseDuration } = require('parse-duration') +const { CID } = require('multiformats/cid') + +/** + * @typedef {import('ipfs-core-types').IPFS} IPFS + */ + +module.exports = { + command: 'export ', + + describe: 'Streams the DAG beginning at the given root CID as a CAR stream on stdout.', + + builder: { + timeout: { + type: 'string', + coerce: parseDuration + } + }, + + /** + * @param {object} argv + * @param {import('../../types').Context} argv.ctx + * @param {string} argv.rootcid + * @param {number} argv.timeout + */ + async handler ({ ctx: { ipfs, print }, rootcid, timeout }) { + const options = { timeout } + const cid = CID.parse(rootcid) + + const exporter = ipfs.dag.export(cid, options) + for await (const chunk of exporter) { + print.write(chunk) + } + } +} diff --git a/packages/ipfs-cli/src/commands/dag/import.js b/packages/ipfs-cli/src/commands/dag/import.js new file mode 100644 index 0000000000..5a5d2dd465 --- /dev/null +++ b/packages/ipfs-cli/src/commands/dag/import.js @@ -0,0 +1,70 @@ +'use strict' + +const fs = require('fs') +const { default: parseDuration } = require('parse-duration') + +/** + * @typedef {import('ipfs-core-types/src/dag').ImportResult} ImportResult + */ + +module.exports = { + command: 'import [path...]', + + describe: 'Import the contents of one or more CARs from files or stdin', + + builder: { + 'pin-roots': { + type: 'boolean', + default: true, + describe: 'Pin optional roots listed in the CAR headers after importing.' + }, + 'cid-base': { + describe: 'Number base to display CIDs in.', + type: 'string', + default: 'base58btc' + }, + timeout: { + type: 'string', + coerce: parseDuration + } + }, + + /** + * @param {object} argv + * @param {import('../../types').Context} argv.ctx + * @param {string[]} argv.path + * @param {boolean} argv.pinRoots + * @param {number} argv.timeout + * @param {string} argv.cidBase + */ + async handler ({ ctx: { ipfs, print, getStdin }, path, pinRoots, timeout, cidBase }) { + const handleResult = async (/** @type {ImportResult} */ { root }) => { + const base = await ipfs.bases.getBase(cidBase) + print(`pinned root\t${root.cid.toString(base.encoder)}\t${root.pinErrorMsg || 'success'}`) + } + + const options = { timeout, pinRoots } + + if (path) { // files + for await (const result of ipfs.dag.import(fromFiles(print, path), options)) { + await handleResult(result) + } + } else { // stdin + print('importing CAR from stdin...') + for await (const result of ipfs.dag.import([getStdin()], options)) { + await handleResult(result) + } + } + } +} + +/** + * @param {import('../../types').Context["print"]} print + * @param {string[]} paths + */ +function * fromFiles (print, paths) { + for (const path of paths) { + print(`importing from ${path}...`) + yield fs.createReadStream(path) + } +} diff --git a/packages/ipfs-cli/test/add.js b/packages/ipfs-cli/test/add.js index 97897c0db2..11bf3ab949 100644 --- a/packages/ipfs-cli/test/add.js +++ b/packages/ipfs-cli/test/add.js @@ -9,6 +9,7 @@ const first = require('it-first') const cli = require('./utils/cli') const sinon = require('sinon') const uint8ArrayFromString = require('uint8arrays/from-string') +const matchIterable = require('./utils/match-iterable') // TODO: Test against all algorithms Object.keys(mh.names) // This subset is known to work with both go-ipfs and js-ipfs as of 2017-09-05 @@ -39,10 +40,6 @@ const defaultOptions = { timeout: undefined } -function matchIterable () { - return sinon.match((thing) => Boolean(thing[Symbol.asyncIterator]) || Boolean(thing[Symbol.iterator])) -} - describe('add', () => { let ipfs diff --git a/packages/ipfs-cli/test/commands.js b/packages/ipfs-cli/test/commands.js index 0888528273..20561e5ffa 100644 --- a/packages/ipfs-cli/test/commands.js +++ b/packages/ipfs-cli/test/commands.js @@ -4,7 +4,7 @@ const { expect } = require('aegir/utils/chai') const cli = require('./utils/cli') -const commandCount = 115 +const commandCount = 117 describe('commands', () => { it('list the commands', async () => { diff --git a/packages/ipfs-cli/test/dag.js b/packages/ipfs-cli/test/dag.js index 3388705167..8b5f0855aa 100644 --- a/packages/ipfs-cli/test/dag.js +++ b/packages/ipfs-cli/test/dag.js @@ -12,6 +12,7 @@ const { base58btc } = require('multiformats/bases/base58') const { base64 } = require('multiformats/bases/base64') const uint8ArrayFromString = require('uint8arrays/from-string') const uint8ArrayToString = require('uint8arrays/to-string') +const matchIterable = require('./utils/match-iterable') describe('dag', () => { const dagPbCid = CID.parse('Qmaj2NmcyAXT8dFmZRRytE12wpcaHADzbChKToMEjBsj5Z') @@ -24,7 +25,9 @@ describe('dag', () => { dag: { get: sinon.stub(), resolve: sinon.stub(), - put: sinon.stub() + put: sinon.stub(), + import: sinon.stub(), + export: sinon.stub() }, bases: { getBase: sinon.stub() @@ -482,4 +485,149 @@ describe('dag', () => { expect(out).to.equal(`${dagCborCid.toString(base58btc)}\n`) }) }) + + describe('import', () => { + const defaultOptions = { + pinRoots: true, + timeout: undefined + } + + it('imports car from stdin', async () => { + const cid = CID.parse('QmPZ9gcCEpqKTo6aq61g2nXGUhM4iCL3ewB6LDXZCtioEB') + + ipfs.dag.import.withArgs([matchIterable()], { + ...defaultOptions + }).returns([{ root: { cid, pinErrorMsg: '' } }]) + ipfs.bases.getBase.withArgs('base58btc').returns(base58btc) + + const proc = cli('dag import', { + ipfs, + getStdin: function * () { + yield uint8ArrayFromString('hello\n') + } + }) + + const out = await proc + expect(out).to.equal(`importing CAR from stdin...\npinned root\t${cid}\tsuccess\n`) + }) + + it('imports car from path', async () => { + const cid = CID.parse('QmPZ9gcCEpqKTo6aq61g2nXGUhM4iCL3ewB6LDXZCtioEB') + + ipfs.dag.import.withArgs(matchIterable(), { + ...defaultOptions + }).returns([{ root: { cid, pinErrorMsg: '' } }]) + ipfs.bases.getBase.withArgs('base58btc').returns(base58btc) + + const proc = cli('dag import README.md', { + ipfs + }) + + const out = await proc + expect(out).to.equal(`pinned root\t${cid}\tsuccess\n`) + }) + + it('imports car from path and fails to pin', async () => { + const cid = CID.parse('QmPZ9gcCEpqKTo6aq61g2nXGUhM4iCL3ewB6LDXZCtioEB') + + ipfs.dag.import.withArgs(matchIterable(), { + ...defaultOptions + }).returns([{ root: { cid, pinErrorMsg: 'oh noes' } }]) + ipfs.bases.getBase.withArgs('base58btc').returns(base58btc) + + const proc = cli('dag import README.md', { + ipfs + }) + + const out = await proc + expect(out).to.equal(`pinned root\t${cid}\toh noes\n`) + }) + + it('imports car from path with no pin arg', async () => { + const cid = CID.parse('QmPZ9gcCEpqKTo6aq61g2nXGUhM4iCL3ewB6LDXZCtioEB') + + ipfs.dag.import.withArgs(matchIterable(), { + ...defaultOptions, + pinRoots: false + }).returns([{ root: { cid, pinErrorMsg: '' } }]) + ipfs.bases.getBase.withArgs('base58btc').returns(base58btc) + + const proc = cli('dag import README.md --pin-roots=false', { + ipfs + }) + + const out = await proc + expect(out).to.equal(`pinned root\t${cid}\tsuccess\n`) + }) + + it('imports car from path with different base', async () => { + const cid = CID.parse('QmPZ9gcCEpqKTo6aq61g2nXGUhM4iCL3ewB6LDXZCtioEB') + + ipfs.dag.import.withArgs(matchIterable(), { + ...defaultOptions + }).returns([{ root: { cid, pinErrorMsg: '' } }]) + ipfs.bases.getBase.withArgs('derp').returns(base58btc) + + const proc = cli('dag import README.md --cid-base=derp', { + ipfs + }) + + const out = await proc + expect(out).to.equal(`pinned root\t${cid}\tsuccess\n`) + }) + + it('imports car from path with timeout', async () => { + const cid = CID.parse('QmPZ9gcCEpqKTo6aq61g2nXGUhM4iCL3ewB6LDXZCtioEB') + + ipfs.dag.import.withArgs(matchIterable(), { + ...defaultOptions, + timeout: 1000 + }).returns([{ root: { cid, pinErrorMsg: '' } }]) + ipfs.bases.getBase.withArgs('base58btc').returns(base58btc) + + const proc = cli('dag import README.md --timeout=1s', { + ipfs + }) + + const out = await proc + expect(out).to.equal(`pinned root\t${cid}\tsuccess\n`) + }) + }) + + describe('export', () => { + const defaultOptions = { + timeout: undefined + } + + it('exports car', async () => { + const cid = CID.parse('QmPZ9gcCEpqKTo6aq61g2nXGUhM4iCL3ewB6LDXZCtioEB') + + ipfs.dag.export.withArgs(cid, { + ...defaultOptions + }).returns(['some bytes']) + + const proc = cli(`dag export ${cid}`, { + ipfs + }) + + const out = await proc + expect(out).to.equal('some bytes') + }) + + it('exports car with timeout', async () => { + const cid = CID.parse('QmPZ9gcCEpqKTo6aq61g2nXGUhM4iCL3ewB6LDXZCtioEB') + + ipfs.dag.export.withArgs(cid, { + ...defaultOptions, + timeout: 1000 + }).returns(['some bytes']) + + const proc = cli(`dag export ${cid} --timeout=1s`, { + ipfs + }) + + const out = await proc + expect(out).to.equal('some bytes') + }) + }) }) diff --git a/packages/ipfs-cli/test/utils/match-iterable.js b/packages/ipfs-cli/test/utils/match-iterable.js new file mode 100644 index 0000000000..2403ad21fb --- /dev/null +++ b/packages/ipfs-cli/test/utils/match-iterable.js @@ -0,0 +1,9 @@ +'use strict' + +const sinon = require('sinon') + +function matchIterable () { + return sinon.match((thing) => Boolean(thing[Symbol.asyncIterator]) || Boolean(thing[Symbol.iterator])) +} + +module.exports = matchIterable diff --git a/packages/ipfs-core-types/src/dag/index.d.ts b/packages/ipfs-core-types/src/dag/index.d.ts index 3db88b9eeb..9545e4041a 100644 --- a/packages/ipfs-core-types/src/dag/index.d.ts +++ b/packages/ipfs-core-types/src/dag/index.d.ts @@ -91,6 +91,19 @@ export interface API { * ``` */ resolve: (ipfsPath: IPFSPath, options?: ResolveOptions & OptionExtension) => Promise + + /** + * Exports a CAR for the entire DAG available from the given root CID. The CAR will have a single + * root and IPFS will attempt to fetch and bundle all blocks that are linked within the connected + * DAG. + */ + export: (root: CID, options?: ExportOptions & OptionExtension) => AsyncIterable + + /** + * Import all blocks from one or more CARs and optionally recursively pin the roots identified + * within the CARs. + */ + import: (sources: AsyncIterable> | Iterable>, options?: ImportOptions & OptionExtension) => AsyncIterable } export interface GetOptions extends AbortOptions, PreloadOptions { @@ -181,3 +194,32 @@ export interface ResolveResult { */ remainderPath?: string } + +export interface ExportOptions extends AbortOptions, PreloadOptions { +} + +export interface ImportOptions extends AbortOptions, PreloadOptions { + /** + * Recursively pin roots for the imported CARs, defaults to true. + */ + pinRoots?: boolean +} + +export interface ImportResult { + /** + * A list of roots and their pin status if `pinRoots` was set. + */ + root: ImportRootStatus +} + +export interface ImportRootStatus { + /** + * CID of a root that was recursively pinned. + */ + cid: CID + + /** + * The error message if the pin was unsuccessful. + */ + pinErrorMsg?: string +} diff --git a/packages/ipfs-core-utils/package.json b/packages/ipfs-core-utils/package.json index 343452667a..4b21e4ff81 100644 --- a/packages/ipfs-core-utils/package.json +++ b/packages/ipfs-core-utils/package.json @@ -50,7 +50,7 @@ "ipfs-utils": "^8.1.4", "it-all": "^1.0.4", "it-map": "^1.0.4", - "it-peekable": "^1.0.1", + "it-peekable": "^1.0.2", "multiaddr": "^10.0.0", "multiaddr-to-uri": "^8.0.0", "multiformats": "^9.4.1", diff --git a/packages/ipfs-core/package.json b/packages/ipfs-core/package.json index 33e129eb48..ce7164938a 100644 --- a/packages/ipfs-core/package.json +++ b/packages/ipfs-core/package.json @@ -56,13 +56,14 @@ "dep-check": "aegir dep-check -i interface-ipfs-core -i ipfs-core-types -i abort-controller" }, "dependencies": { + "@ipld/car": "^3.1.0", "@ipld/dag-cbor": "^6.0.5", "@ipld/dag-pb": "^2.1.3", "abort-controller": "^3.0.0", "array-shuffle": "^2.0.0", "blockstore-datastore-adapter": "1.0.0", - "datastore-core": "^5.0.0", - "datastore-fs": "^5.0.1", + "datastore-core": "^5.0.1", + "datastore-fs": "^5.0.2", "datastore-level": "^6.0.1", "datastore-pubsub": "^0.7.0", "debug": "^4.1.1", @@ -76,7 +77,7 @@ "ipfs-core-types": "^0.5.2", "ipfs-core-utils": "^0.8.3", "ipfs-http-client": "^50.1.2", - "ipfs-repo": "^11.0.0", + "ipfs-repo": "^11.0.1", "ipfs-unixfs": "^5.0.0", "ipfs-unixfs-exporter": "^6.0.0", "ipfs-unixfs-importer": "^8.0.0", @@ -89,7 +90,10 @@ "it-first": "^1.0.4", "it-last": "^1.0.4", "it-map": "^1.0.4", + "it-merge": "^1.0.2", + "it-peekable": "^1.0.2", "it-pipe": "^1.1.0", + "it-pushable": "^1.4.2", "just-safe-set": "^2.2.1", "libp2p": "^0.32.0", "libp2p-bootstrap": "^0.13.0", diff --git a/packages/ipfs-core/src/block-storage.js b/packages/ipfs-core/src/block-storage.js index 02b95073f7..d41a4357d6 100644 --- a/packages/ipfs-core/src/block-storage.js +++ b/packages/ipfs-core/src/block-storage.js @@ -1,6 +1,8 @@ 'use strict' const { BlockstoreAdapter } = require('interface-blockstore') +const merge = require('it-merge') +const pushable = require('it-pushable') /** * @typedef {import('interface-blockstore').Blockstore} Blockstore @@ -81,7 +83,7 @@ class BlockStorage extends BlockstoreAdapter { * @param {AbortOptions} [options] */ async get (cid, options = {}) { - if (this.bitswap.isStarted()) { + if (!(await this.has(cid)) && this.bitswap.isStarted()) { return this.bitswap.get(cid, options) } else { return this.child.get(cid, options) @@ -95,11 +97,26 @@ class BlockStorage extends BlockstoreAdapter { * @param {AbortOptions} [options] */ async * getMany (cids, options = {}) { - if (this.bitswap.isStarted()) { - yield * this.bitswap.getMany(cids, options) - } else { - yield * this.child.getMany(cids, options) - } + const getFromBitswap = pushable() + const getFromChild = pushable() + + Promise.resolve().then(async () => { + for await (const cid of cids) { + if (!(await this.has(cid)) && this.bitswap.isStarted()) { + getFromBitswap.push(cid) + } else { + getFromChild.push(cid) + } + } + + getFromBitswap.end() + getFromChild.end() + }) + + yield * merge( + this.bitswap.getMany(getFromBitswap, options), + this.child.getMany(getFromChild, options) + ) } /** diff --git a/packages/ipfs-core/src/components/dag/export.js b/packages/ipfs-core/src/components/dag/export.js new file mode 100644 index 0000000000..37d5719f4d --- /dev/null +++ b/packages/ipfs-core/src/components/dag/export.js @@ -0,0 +1,131 @@ +'use strict' + +const { CID } = require('multiformats/cid') +const Block = require('multiformats/block') +const { base58btc } = require('multiformats/bases/base58') +const { CarWriter } = require('@ipld/car/writer') +const withTimeoutOption = require('ipfs-core-utils/src/with-timeout-option') +const log = require('debug')('ipfs:components:dag:import') +const raw = require('multiformats/codecs/raw') +const json = require('multiformats/codecs/json') + +// blocks that we're OK with not inspecting for links +/** @type {number[]} */ +const NO_LINKS_CODECS = [ + raw.code, // raw + json.code // JSON +] + +/** + * @typedef {import('../../types').Preload} Preload + * @typedef {import('ipfs-repo').IPFSRepo} IPFSRepo + * @typedef {import('@ipld/car/api').BlockWriter} BlockWriter + * @typedef {import('ipfs-core-types/src/utils').AbortOptions} AbortOptions + */ + +/** + * @param {Object} config + * @param {IPFSRepo} config.repo + * @param {Preload} config.preload + * @param {import('ipfs-core-utils/src/multicodecs')} config.codecs + */ +module.exports = ({ repo, preload, codecs }) => { + /** + * @type {import('ipfs-core-types/src/dag').API["export"]} + */ + async function * dagExport (root, options = {}) { + if (options.preload !== false) { + preload(root) + } + + const cid = CID.asCID(root) + if (!cid) { + throw new Error(`Unexpected error converting CID type: ${root}`) + } + + log(`Exporting ${cid} as car`) + const { writer, out } = await CarWriter.create([cid]) + + // we need to write with one async channel and send the CarWriter output + // with another to the caller, but if the write causes an error we capture + // that and make sure it gets propagated + /** @type {Error|null} */ + let err = null + ;(async () => { + try { + await traverseWrite( + repo, + { signal: options.signal, timeout: options.timeout }, + cid, + writer, + codecs) + writer.close() + } catch (e) { + err = e + } + })() + + for await (const chunk of out) { + if (err) { + break + } + yield chunk + } + if (err) { + throw err + } + } + + return withTimeoutOption(dagExport) +} + +/** + * @param {IPFSRepo} repo + * @param {AbortOptions} options + * @param {CID} cid + * @param {BlockWriter} writer + * @param {import('ipfs-core-utils/src/multicodecs')} codecs + * @param {Set} seen + * @returns {Promise} + */ +async function traverseWrite (repo, options, cid, writer, codecs, seen = new Set()) { + const b58Cid = cid.toString(base58btc) + if (seen.has(b58Cid)) { + return + } + + const block = await getBlock(repo, options, cid, codecs) + + log(`Adding block ${cid} to car`) + await writer.put(block) + seen.add(b58Cid) + + // recursive traversal of all links + for (const link of block.links) { + await traverseWrite(repo, options, link, writer, codecs, seen) + } +} + +/** + * @param {IPFSRepo} repo + * @param {AbortOptions} options + * @param {CID} cid + * @param {import('ipfs-core-utils/src/multicodecs')} codecs + * @returns {Promise<{cid:CID, bytes:Uint8Array, links:CID[]}>} + */ +async function getBlock (repo, options, cid, codecs) { + const bytes = await repo.blocks.get(cid, options) + + /** @type {CID[]} */ + let links = [] + const codec = await codecs.getCodec(cid.code) + + if (codec) { + const block = Block.createUnsafe({ bytes, cid, codec }) + links = [...block.links()].map((l) => l[1]) + } else if (!NO_LINKS_CODECS.includes(cid.code)) { + throw new Error(`Can't decode links in block with codec 0x${cid.code.toString(16)} to form complete DAG`) + } + + return { cid, bytes, links } +} diff --git a/packages/ipfs-core/src/components/dag/import.js b/packages/ipfs-core/src/components/dag/import.js new file mode 100644 index 0000000000..4b095ad69a --- /dev/null +++ b/packages/ipfs-core/src/components/dag/import.js @@ -0,0 +1,106 @@ +'use strict' + +const { CarBlockIterator } = require('@ipld/car/iterator') +const withTimeoutOption = require('ipfs-core-utils/src/with-timeout-option') +const itPeekable = require('it-peekable') +const drain = require('it-drain') +const map = require('it-map') +const log = require('debug')('ipfs:components:dag:import') + +/** + * @typedef {import('multiformats/cid').CID} CID + * @typedef {import('ipfs-repo').IPFSRepo} IPFSRepo + * @typedef {import('ipfs-core-types/src/utils').AbortOptions} AbortOptions + * @typedef {import('ipfs-core-types/src/dag/').ImportRootStatus} RootStatus + */ + +/** + * @param {Object} config + * @param {IPFSRepo} config.repo + */ +module.exports = ({ repo }) => { + /** + * @type {import('ipfs-core-types/src/dag').API["import"]} + */ + async function * dagImport (sources, options = {}) { + const release = await repo.gcLock.readLock() + + try { + const abortOptions = { signal: options.signal, timeout: options.timeout } + const peekable = itPeekable(sources) + + const { value, done } = await peekable.peek() + + if (done) { + return + } + + if (value) { + peekable.push(value) + } + + /** + * @type {AsyncIterable> | Iterable>} + */ + let cars + + if (value instanceof Uint8Array) { + // @ts-ignore + cars = [peekable] + } else { + cars = peekable + } + + for await (const car of cars) { + const roots = await importCar(repo, abortOptions, car) + + if (options.pinRoots !== false) { // default=true + for (const cid of roots) { + let pinErrorMsg = '' + + try { // eslint-disable-line max-depth + if (await repo.blocks.has(cid)) { // eslint-disable-line max-depth + log(`Pinning root ${cid}`) + await repo.pins.pinRecursively(cid) + } else { + pinErrorMsg = 'blockstore: block not found' + } + } catch (err) { + pinErrorMsg = err.message + } + + yield { root: { cid, pinErrorMsg } } + } + } + } + } finally { + release() + } + } + + return withTimeoutOption(dagImport) +} + +/** + * @param {IPFSRepo} repo + * @param {AbortOptions} options + * @param {AsyncIterable} source + * @returns {Promise} + */ +async function importCar (repo, options, source) { + const reader = await CarBlockIterator.fromIterable(source) + const roots = await reader.getRoots() + + await drain( + repo.blocks.putMany( + map(reader, ({ cid: key, bytes: value }) => { + log(`Import block ${key}`) + + return { key, value } + }), + { signal: options.signal } + ) + ) + + return roots +} diff --git a/packages/ipfs-core/src/components/dag/index.js b/packages/ipfs-core/src/components/dag/index.js index a749082554..554643dc61 100644 --- a/packages/ipfs-core/src/components/dag/index.js +++ b/packages/ipfs-core/src/components/dag/index.js @@ -1,8 +1,10 @@ 'use strict' +const createExport = require('./export') const createGet = require('./get') -const createResolve = require('./resolve') +const createImport = require('./import') const createPut = require('./put') +const createResolve = require('./resolve') class DagAPI { /** @@ -13,7 +15,9 @@ class DagAPI { * @param {import('ipfs-repo').IPFSRepo} config.repo */ constructor ({ repo, codecs, hashers, preload }) { + this.export = createExport({ repo, preload, codecs }) this.get = createGet({ codecs, repo, preload }) + this.import = createImport({ repo }) this.resolve = createResolve({ repo, codecs, preload }) this.put = createPut({ repo, codecs, hashers, preload }) } diff --git a/packages/ipfs-grpc-client/package.json b/packages/ipfs-grpc-client/package.json index a4474643c4..63da9af4e3 100644 --- a/packages/ipfs-grpc-client/package.json +++ b/packages/ipfs-grpc-client/package.json @@ -41,7 +41,7 @@ "ipfs-grpc-protocol": "^0.3.0", "ipfs-unixfs": "^5.0.0", "it-first": "^1.0.4", - "it-pushable": "^1.4.0", + "it-pushable": "^1.4.2", "multiaddr": "^10.0.0", "multiformats": "^9.4.1", "protobufjs": "^6.10.2", diff --git a/packages/ipfs-grpc-server/package.json b/packages/ipfs-grpc-server/package.json index f0a5765d97..b9ae278c1f 100644 --- a/packages/ipfs-grpc-server/package.json +++ b/packages/ipfs-grpc-server/package.json @@ -38,9 +38,9 @@ "ipfs-grpc-protocol": "^0.3.0", "it-first": "^1.0.4", "it-map": "^1.0.4", - "it-peekable": "^1.0.1", + "it-peekable": "^1.0.2", "it-pipe": "^1.1.0", - "it-pushable": "^1.4.0", + "it-pushable": "^1.4.2", "multiaddr": "^10.0.0", "protobufjs": "^6.10.2", "ws": "^7.3.1" diff --git a/packages/ipfs-http-client/src/dag/export.js b/packages/ipfs-http-client/src/dag/export.js new file mode 100644 index 0000000000..ea048a2aae --- /dev/null +++ b/packages/ipfs-http-client/src/dag/export.js @@ -0,0 +1,29 @@ +'use strict' + +const configure = require('../lib/configure') +const toUrlSearchParams = require('../lib/to-url-search-params') + +/** + * @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions + * @typedef {import('ipfs-core-types/src/dag').API} DAGAPI + */ + +module.exports = configure(api => { + /** + * @type {DAGAPI["export"]} + */ + async function * dagExport (root, options = {}) { + const res = await api.post('dag/export', { + timeout: options.timeout, + signal: options.signal, + searchParams: toUrlSearchParams({ + arg: root.toString() + }), + headers: options.headers + }) + + yield * res.iterator() + } + + return dagExport +}) diff --git a/packages/ipfs-http-client/src/dag/import.js b/packages/ipfs-http-client/src/dag/import.js new file mode 100644 index 0000000000..4f56a44a2d --- /dev/null +++ b/packages/ipfs-http-client/src/dag/import.js @@ -0,0 +1,47 @@ +'use strict' + +const configure = require('../lib/configure') +const toUrlSearchParams = require('../lib/to-url-search-params') +const abortSignal = require('../lib/abort-signal') +const multipartRequest = require('../lib/multipart-request') +const { AbortController } = require('native-abort-controller') +const { CID } = require('multiformats/cid') + +/** + * @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions + * @typedef {import('ipfs-core-types/src/dag').API} DAGAPI + */ + +module.exports = configure(api => { + /** + * @type {DAGAPI["import"]} + */ + async function * dagImport (source, options = {}) { + const controller = new AbortController() + const signal = abortSignal(controller.signal, options.signal) + const { headers, body } = await multipartRequest(source, controller, options.headers) + + const res = await api.post('dag/import', { + timeout: options.timeout, + signal, + headers, + body, + searchParams: toUrlSearchParams({ 'pin-roots': options.pinRoots }) + }) + + for await (const { Root } of res.ndjson()) { + if (Root !== undefined) { + const { Cid: { '/': Cid }, PinErrorMsg } = Root + + yield { + root: { + cid: CID.parse(Cid), + pinErrorMsg: PinErrorMsg + } + } + } + } + } + + return dagImport +}) diff --git a/packages/ipfs-http-client/src/dag/index.js b/packages/ipfs-http-client/src/dag/index.js index ec7f0ac5c3..13be3bb70d 100644 --- a/packages/ipfs-http-client/src/dag/index.js +++ b/packages/ipfs-http-client/src/dag/index.js @@ -5,7 +5,9 @@ * @param {import('../types').Options} config */ module.exports = (codecs, config) => ({ + export: require('./export')(config), get: require('./get')(codecs, config), + import: require('./import')(config), put: require('./put')(codecs, config), resolve: require('./resolve')(config) }) diff --git a/packages/ipfs-http-server/package.json b/packages/ipfs-http-server/package.json index 9459b93e46..fe031c8226 100644 --- a/packages/ipfs-http-server/package.json +++ b/packages/ipfs-http-server/package.json @@ -49,10 +49,10 @@ "it-first": "^1.0.4", "it-last": "^1.0.4", "it-map": "^1.0.4", - "it-merge": "^1.0.1", + "it-merge": "^1.0.2", "it-multipart": "^2.0.0", "it-pipe": "^1.1.0", - "it-pushable": "^1.4.0", + "it-pushable": "^1.4.2", "it-reduce": "^1.0.5", "it-tar": "^3.0.0", "joi": "^17.2.1", diff --git a/packages/ipfs-http-server/src/api/resources/dag.js b/packages/ipfs-http-server/src/api/resources/dag.js index fec23bf3c9..351e1bae4e 100644 --- a/packages/ipfs-http-server/src/api/resources/dag.js +++ b/packages/ipfs-http-server/src/api/resources/dag.js @@ -1,9 +1,11 @@ 'use strict' const multipart = require('../../utils/multipart-request-parser') +const streamResponse = require('../../utils/stream-response') const Joi = require('../../utils/joi') const Boom = require('@hapi/boom') const all = require('it-all') +const { pipe } = require('it-pipe') const uint8ArrayToString = require('uint8arrays/to-string') /** @@ -335,3 +337,150 @@ exports.resolve = { } } } + +exports.export = { + options: { + validate: { + options: { + allowUnknown: true, + stripUnknown: true + }, + query: Joi.object().keys({ + root: Joi.cid().required(), + timeout: Joi.timeout() + }) + .rename('arg', 'root', { + override: true, + ignoreUndefined: true + }) + } + }, + + /** + * @param {import('../../types').Request} request + * @param {import('@hapi/hapi').ResponseToolkit} h + */ + async handler (request, h) { + const { + app: { + signal + }, + server: { + app: { + ipfs + } + }, + query: { + root, + timeout + } + } = request + + return streamResponse(request, h, () => ipfs.dag.export(root, { + timeout, + signal + }), { + onError (err) { + err.message = 'Failed to export DAG: ' + err.message + } + }) + } +} + +exports.import = { + options: { + payload: { + parse: false, + output: 'stream', + maxBytes: Number.MAX_SAFE_INTEGER + }, + validate: { + options: { + allowUnknown: true, + stripUnknown: true + }, + query: Joi.object().keys({ + pinRoots: Joi.boolean().default(true), + timeout: Joi.timeout() + }) + .rename('pin-roots', 'pinRoots', { + override: true, + ignoreUndefined: true + }) + } + }, + + /** + * @param {import('../../types').Request} request + * @param {import('@hapi/hapi').ResponseToolkit} h + */ + async handler (request, h) { + const { + app: { + signal + }, + server: { + app: { + ipfs + } + }, + query: { + pinRoots, + timeout + } + } = request + + let filesParsed = false + + return streamResponse(request, h, () => pipe( + multipart(request.raw.req), + /** + * @param {AsyncIterable} source + */ + async function * (source) { + for await (const entry of source) { + if (entry.type !== 'file') { + throw Boom.badRequest('Unexpected upload type') + } + + filesParsed = true + yield entry.content + } + }, + /** + * @param {AsyncIterable>} source + */ + async function * (source) { + yield * ipfs.dag.import(source, { + pinRoots, + timeout, + signal + }) + }, + /** + * @param {AsyncIterable} source + */ + async function * (source) { + for await (const res of source) { + yield { + Root: { + Cid: { + '/': res.root.cid.toString() + }, + PinErrorMsg: res.root.pinErrorMsg + } + } + } + } + ), { + onError (err) { + err.message = 'Failed to import DAG: ' + err.message + }, + onEnd () { + if (!filesParsed) { + throw Boom.badRequest("File argument 'data' is required.") + } + } + }) + } +} diff --git a/packages/ipfs-http-server/src/api/resources/files-regular.js b/packages/ipfs-http-server/src/api/resources/files-regular.js index 60f19dedec..b2359491b6 100644 --- a/packages/ipfs-http-server/src/api/resources/files-regular.js +++ b/packages/ipfs-http-server/src/api/resources/files-regular.js @@ -252,14 +252,14 @@ exports.add = { } } = request + let filesParsed = false + return streamResponse(request, h, () => pipe( multipart(request.raw.req), /** * @param {AsyncIterable} source */ async function * (source) { - let filesParsed = false - for await (const entry of source) { if (entry.type === 'file') { filesParsed = true @@ -282,10 +282,6 @@ exports.add = { } } } - - if (!filesParsed) { - throw new Error("File argument 'data' is required.") - } }, /** * @param {import('ipfs-core-types/src/utils').ImportCandidateStream} source @@ -346,7 +342,13 @@ exports.add = { ) ) } - )) + ), { + onEnd () { + if (!filesParsed) { + throw Boom.badRequest("File argument 'data' is required.") + } + } + }) } } @@ -449,7 +451,6 @@ exports.ls = { throw Boom.boomify(err, { message: 'Failed to list dir' }) } } - return streamResponse(request, h, () => pipe( ipfs.ls(path, { recursive, diff --git a/packages/ipfs-http-server/src/api/routes/dag.js b/packages/ipfs-http-server/src/api/routes/dag.js index 2b352bceca..6f1804ebac 100644 --- a/packages/ipfs-http-server/src/api/routes/dag.js +++ b/packages/ipfs-http-server/src/api/routes/dag.js @@ -3,11 +3,21 @@ const resources = require('../resources') module.exports = [ + { + method: 'POST', + path: '/api/v0/dag/export', + ...resources.dag.export + }, { method: 'POST', path: '/api/v0/dag/get', ...resources.dag.get }, + { + method: 'POST', + path: '/api/v0/dag/import', + ...resources.dag.import + }, { method: 'POST', path: '/api/v0/dag/put', diff --git a/packages/ipfs-http-server/src/utils/stream-response.js b/packages/ipfs-http-server/src/utils/stream-response.js index 1838ea1987..4d3bc92ce9 100644 --- a/packages/ipfs-http-server/src/utils/stream-response.js +++ b/packages/ipfs-http-server/src/utils/stream-response.js @@ -13,7 +13,7 @@ const ERROR_TRAILER = 'X-Stream-Error' * @param {import('../types').Request} request * @param {import('@hapi/hapi').ResponseToolkit} h * @param {() => AsyncIterable} getSource - * @param {{ onError?: (error: Error) => void }} [options] + * @param {{ onError?: (error: Error) => void, onEnd?: () => void }} [options] */ async function streamResponse (request, h, getSource, options = {}) { // eslint-disable-next-line no-async-promise-executor @@ -38,6 +38,10 @@ async function streamResponse (request, h, getSource, options = {}) { } } + if (options.onEnd) { + options.onEnd() + } + if (!started) { // Maybe it was an empty source? started = true resolve(stream) diff --git a/packages/ipfs-http-server/test/inject/dag.js b/packages/ipfs-http-server/test/inject/dag.js index 5f847329e3..604b564ff5 100644 --- a/packages/ipfs-http-server/test/inject/dag.js +++ b/packages/ipfs-http-server/test/inject/dag.js @@ -9,10 +9,12 @@ const streamToPromise = require('stream-to-promise') const { CID } = require('multiformats/cid') const testHttpMethod = require('../utils/test-http-method') const http = require('../utils/http') +const matchIterable = require('../utils/match-iterable') const sinon = require('sinon') const { AbortSignal } = require('native-abort-controller') const { base58btc } = require('multiformats/bases/base58') const { base32 } = require('multiformats/bases/base32') +const drain = require('it-drain') const toHeadersAndPayload = async (thing) => { const stream = new Readable() @@ -37,7 +39,9 @@ describe('/dag', () => { dag: { get: sinon.stub(), put: sinon.stub(), - resolve: sinon.stub() + resolve: sinon.stub(), + import: sinon.stub(), + export: sinon.stub() }, block: { put: sinon.stub() @@ -519,4 +523,163 @@ describe('/dag', () => { expect(res).to.have.nested.property('result.RemPath', '') }) }) + + describe('/import', () => { + const defaultOptions = { + signal: sinon.match.instanceOf(AbortSignal), + timeout: undefined, + pinRoots: true + } + + it('only accepts POST', () => { + return testHttpMethod('/api/v0/dag/import') + }) + + it('imports car', async () => { + const cid = CID.parse('QmPZ9gcCEpqKTo6aq61g2nXGUhM4iCL3ewB6LDXZCtioEB') + + ipfs.dag.import.withArgs(matchIterable(), { + ...defaultOptions + }) + .callsFake(async function * (source) { + await drain(source) + yield { root: { cid, pinErrorMsg: '' } } + }) + + const res = await http({ + method: 'POST', + url: '/api/v0/dag/import', + ...await toHeadersAndPayload('car content') + }, { ipfs }) + + expect(res).to.have.property('statusCode', 200) + + const response = JSON.parse(res.result) + expect(response).to.have.nested.property('Root.Cid./', cid.toString()) + expect(response).to.have.nested.property('Root.PinErrorMsg').that.is.empty() + }) + + it('imports car with pin error', async () => { + const cid = CID.parse('QmPZ9gcCEpqKTo6aq61g2nXGUhM4iCL3ewB6LDXZCtioEB') + + ipfs.dag.import.withArgs(matchIterable(), { + ...defaultOptions + }) + .callsFake(async function * (source) { + await drain(source) + yield { root: { cid, pinErrorMsg: 'derp' } } + }) + + const res = await http({ + method: 'POST', + url: '/api/v0/dag/import', + ...await toHeadersAndPayload('car content') + }, { ipfs }) + + expect(res).to.have.property('statusCode', 200) + + const response = JSON.parse(res.result) + expect(response).to.have.nested.property('Root.Cid./', cid.toString()) + expect(response).to.have.nested.property('Root.PinErrorMsg').that.equals('derp') + }) + + it('imports car without pinning', async () => { + ipfs.dag.import.withArgs(matchIterable(), { + ...defaultOptions, + pinRoots: false + }) + .callsFake(async function * (source) { // eslint-disable-line require-yield + await drain(source) + }) + + const res = await http({ + method: 'POST', + url: '/api/v0/dag/import?pin-roots=false', + ...await toHeadersAndPayload('car content') + }, { ipfs }) + + expect(res).to.have.property('statusCode', 200) + expect(res.result).to.be.empty() + }) + + it('imports car with timeout', async () => { + const cid = CID.parse('QmPZ9gcCEpqKTo6aq61g2nXGUhM4iCL3ewB6LDXZCtioEB') + + ipfs.dag.import.withArgs(matchIterable(), { + ...defaultOptions, + timeout: 1000 + }) + .callsFake(async function * (source) { + await drain(source) + yield { root: { cid, pinErrorMsg: '' } } + }) + + const res = await http({ + method: 'POST', + url: '/api/v0/dag/import?timeout=1s', + ...await toHeadersAndPayload('car content') + }, { ipfs }) + + expect(res).to.have.property('statusCode', 200) + + const response = JSON.parse(res.result) + expect(response).to.have.nested.property('Root.Cid./', cid.toString()) + expect(response).to.have.nested.property('Root.PinErrorMsg').that.equals('') + }) + }) + + describe('/export', () => { + const defaultOptions = { + signal: sinon.match.instanceOf(AbortSignal), + timeout: undefined + } + + it('only accepts POST', () => { + return testHttpMethod('/api/v0/dag/export') + }) + + it('returns error for request without root', async () => { + const res = await http({ + method: 'POST', + url: '/api/v0/dag/export' + }, { ipfs }) + + expect(res).to.have.property('statusCode', 400) + }) + + it('exports car', async () => { + const cid = CID.parse('QmPZ9gcCEpqKTo6aq61g2nXGUhM4iCL3ewB6LDXZCtioEB') + + ipfs.dag.export.withArgs(cid, { + ...defaultOptions + }) + .returns(['some data']) + + const res = await http({ + method: 'POST', + url: `/api/v0/dag/export?arg=${cid}` + }, { ipfs }) + + expect(res).to.have.property('statusCode', 200) + expect(res).to.have.nested.property('result', 'some data') + }) + + it('exports car with a timeout', async () => { + const cid = CID.parse('QmPZ9gcCEpqKTo6aq61g2nXGUhM4iCL3ewB6LDXZCtioEB') + + ipfs.dag.export.withArgs(cid, { + ...defaultOptions, + timeout: 1000 + }) + .returns(['some data']) + + const res = await http({ + method: 'POST', + url: `/api/v0/dag/export?arg=${cid}&timeout=1s` + }, { ipfs }) + + expect(res).to.have.property('statusCode', 200) + expect(res).to.have.nested.property('result', 'some data') + }) + }) }) diff --git a/packages/ipfs-http-server/test/inject/files.js b/packages/ipfs-http-server/test/inject/files.js index 5b13c74d8b..b6cdb05c57 100644 --- a/packages/ipfs-http-server/test/inject/files.js +++ b/packages/ipfs-http-server/test/inject/files.js @@ -15,10 +15,8 @@ const toBuffer = require('it-to-buffer') const { AbortSignal } = require('native-abort-controller') const { base58btc } = require('multiformats/bases/base58') const { base64 } = require('multiformats/bases/base64') - -function matchIterable () { - return sinon.match((thing) => Boolean(thing[Symbol.asyncIterator]) || Boolean(thing[Symbol.iterator])) -} +const matchIterable = require('../utils/match-iterable') +const drain = require('it-drain') describe('/files', () => { const cid = CID.parse('QmUBdnXXPyoDFXj3Hj39dNJ5VkN3QFRskXxcGaYFBB8CNR') @@ -116,16 +114,20 @@ describe('/files', () => { '------------287032381131322--' ].join('\r\n')) - ipfs.addAll.withArgs(matchIterable(), defaultOptions).returns([{ - path: cid.toString(), - cid, - size: 1024 * 1024 * 2, - mode: 0o420, - mtime: { - secs: 100, - nsecs: 0 - } - }]) + ipfs.addAll.withArgs(matchIterable(), defaultOptions) + .callsFake(async function * (source) { + await drain(source) + yield { + path: cid.toString(), + cid, + size: 1024 * 1024 * 2, + mode: 0o420, + mtime: { + secs: 100, + nsecs: 0 + } + } + }) const res = await http({ method: 'POST', @@ -143,16 +145,20 @@ describe('/files', () => { ipfs.bases.getBase.withArgs('base64').returns(base64) const content = Buffer.from('TEST' + Date.now()) - ipfs.addAll.withArgs(matchIterable(), defaultOptions).returns([{ - path: cid.toString(), - cid: cid.toV1(), - size: content.byteLength, - mode: 0o420, - mtime: { - secs: 100, - nsecs: 0 - } - }]) + ipfs.addAll.withArgs(matchIterable(), defaultOptions) + .callsFake(async function * (source) { + await drain(source) + yield { + path: cid.toString(), + cid: cid.toV1(), + size: content.byteLength, + mode: 0o420, + mtime: { + secs: 100, + nsecs: 0 + } + } + }) const form = new FormData() form.append('data', content) diff --git a/packages/ipfs-http-server/test/inject/mfs/ls.js b/packages/ipfs-http-server/test/inject/mfs/ls.js index 12f7512c13..9d2bae7552 100644 --- a/packages/ipfs-http-server/test/inject/mfs/ls.js +++ b/packages/ipfs-http-server/test/inject/mfs/ls.js @@ -98,7 +98,13 @@ describe('/files/ls', () => { }) it('should stream a path', async () => { - ipfs.files.ls.withArgs(path, defaultOptions).returns([file]) + ipfs.files.ls.withArgs(path, { + ...defaultOptions + }) + .callsFake(async function * () { + yield file + }) + ipfs.bases.getBase.withArgs('base58btc').returns(base58btc) await http({ method: 'POST', @@ -131,7 +137,10 @@ describe('/files/ls', () => { ipfs.files.ls.withArgs(path, { ...defaultOptions, timeout: 1000 - }).returns([file]) + }).callsFake(async function * () { + yield file + }) + ipfs.bases.getBase.withArgs('base58btc').returns(base58btc) await http({ method: 'POST', diff --git a/packages/ipfs-http-server/test/inject/mfs/write.js b/packages/ipfs-http-server/test/inject/mfs/write.js index bc654ad468..3563cb2bda 100644 --- a/packages/ipfs-http-server/test/inject/mfs/write.js +++ b/packages/ipfs-http-server/test/inject/mfs/write.js @@ -3,6 +3,7 @@ const { expect } = require('aegir/utils/chai') const http = require('../../utils/http') +const matchIterable = require('../../utils/match-iterable') const sinon = require('sinon') const FormData = require('form-data') const streamToPromise = require('stream-to-promise') @@ -56,10 +57,6 @@ async function send (text, options = {}) { } } -function matchIterable () { - return sinon.match((thing) => Boolean(thing[Symbol.asyncIterator]) || Boolean(thing[Symbol.iterator])) -} - describe('/files/write', () => { const path = '/foo' let ipfs diff --git a/packages/ipfs-http-server/test/inject/ping.js b/packages/ipfs-http-server/test/inject/ping.js index 2bda3c2150..ad0afc59bb 100644 --- a/packages/ipfs-http-server/test/inject/ping.js +++ b/packages/ipfs-http-server/test/inject/ping.js @@ -39,7 +39,10 @@ describe('/ping', function () { }) it('returns error for incorrect Peer Id', async () => { - ipfs.ping.withArgs(peerId).throws(new Error('derp')) + ipfs.ping.withArgs(peerId) + .callsFake(async function * () { // eslint-disable-line require-yield + throw new Error('derp') + }) const res = await http({ method: 'POST', @@ -53,7 +56,7 @@ describe('/ping', function () { ipfs.ping.withArgs(peerId, { ...defaultOptions, count: 5 - }).returns([]) + }).callsFake(async function * () {}) const res = await http({ method: 'POST', @@ -67,7 +70,7 @@ describe('/ping', function () { ipfs.ping.withArgs(peerId, { ...defaultOptions, count: 5 - }).returns([]) + }).callsFake(async function * () {}) const res = await http({ method: 'POST', @@ -78,15 +81,19 @@ describe('/ping', function () { }) it('pings a remote peer', async () => { - ipfs.ping.withArgs(peerId, defaultOptions).returns([{ - success: true, - time: 1, - text: 'hello' - }, { - success: true, - time: 2, - text: 'world' - }]) + ipfs.ping.withArgs(peerId, defaultOptions) + .callsFake(async function * () { + yield { + success: true, + time: 1, + text: 'hello' + } + yield { + success: true, + time: 2, + text: 'world' + } + }) const res = await http({ method: 'POST', @@ -109,7 +116,7 @@ describe('/ping', function () { ipfs.ping.withArgs(peerId, { ...defaultOptions, timeout: 1000 - }).returns([]) + }).callsFake(async function * () {}) const res = await http({ method: 'POST', diff --git a/packages/ipfs-http-server/test/utils/match-iterable.js b/packages/ipfs-http-server/test/utils/match-iterable.js new file mode 100644 index 0000000000..2403ad21fb --- /dev/null +++ b/packages/ipfs-http-server/test/utils/match-iterable.js @@ -0,0 +1,9 @@ +'use strict' + +const sinon = require('sinon') + +function matchIterable () { + return sinon.match((thing) => Boolean(thing[Symbol.asyncIterator]) || Boolean(thing[Symbol.iterator])) +} + +module.exports = matchIterable diff --git a/packages/ipfs-message-port-client/test/interface-message-port-client.js b/packages/ipfs-message-port-client/test/interface-message-port-client.js index 9381c955dd..c704ce9d8b 100644 --- a/packages/ipfs-message-port-client/test/interface-message-port-client.js +++ b/packages/ipfs-message-port-client/test/interface-message-port-client.js @@ -114,6 +114,14 @@ describe('interface-ipfs-core tests', () => { { name: 'should get tree with CID and path as String', reason: 'Passing CID as strings is not supported' + }, + { + name: '.dag.export', + reason: 'Not implemented yet' + }, + { + name: '.dag.import', + reason: 'Not implemented yet' } ] })