diff --git a/index.js b/index.js index 250b6a2..19074ff 100644 --- a/index.js +++ b/index.js @@ -3,7 +3,7 @@ import { CID } from 'multiformats/cid' import * as dagPB from '@ipld/dag-pb' import * as Block from 'multiformats/block' import { exporter, walkPath } from 'ipfs-unixfs-exporter' -import { transform } from 'streaming-iterables' +import { parallelMap, transform } from 'streaming-iterables' import { Decoders, Hashers } from './defaults.js' import { identity } from 'multiformats/hashes/identity' @@ -38,9 +38,10 @@ export class Dagula { */ async * get (cid, options = {}) { cid = typeof cid === 'string' ? CID.parse(cid) : cid + const order = options.order ?? 'rnd' log('getting DAG %s', cid) let cids = Array.isArray(cid) ? cid : [cid] - const search = options.search || breadthFirstSearch() + const search = options.search || blockLinks() /** @type {AbortController[]} */ let aborters = [] @@ -49,7 +50,8 @@ export class Dagula { while (cids.length > 0) { log('fetching %d CIDs', cids.length) - const fetchBlocks = transform(cids.length, async cid => { + const parallelFn = order === 'dfs' ? parallelMap : transform + const fetchBlocks = parallelFn(cids.length, async cid => { if (signal) { const aborter = new AbortController() aborters.push(aborter) @@ -77,7 +79,12 @@ export class Dagula { // createUnsafe here. const block = await Block.create({ bytes, cid, codec: decoder, hasher }) yield block - nextCids = nextCids.concat(search(block)) + const blockCids = search(block) + if (order === 'dfs') { + yield * this.get(blockCids, options) + } else { + nextCids = nextCids.concat(blockCids) + } } log('%d CIDs in links', nextCids.length) cids = nextCids @@ -94,6 +101,7 @@ export class Dagula { * @param {string} cidPath * @param {object} [options] * @param {AbortSignal} [options.signal] + * @param {'dfs'|'unk'} [options.order] Specify desired block ordering. `dfs` - Depth First Search, `unk` - unknown ordering. * @param {'all'|'file'|'block'} [options.carScope] control how many layers of the dag are returned * 'all': return the entire dag starting at path. (default) * 'block': return the block identified by the path. @@ -142,7 +150,7 @@ export class Dagula { const links = getLinks(base, this.#decoders) // fetch the entire dag rooted at the end of the provided path if (links.length) { - yield * this.get(links, { signal: options.signal }) + yield * this.get(links, { signal: options.signal, order: options.order }) } } // non-files, like directories, and IPLD Maps only return blocks necessary for their enumeration @@ -152,7 +160,7 @@ export class Dagula { if (base.unixfs.type === 'hamt-sharded-directory') { const hamtLinks = base.node.Links?.filter(l => l.Name.length === 2).map(l => l.Hash) || [] if (hamtLinks.length) { - yield * this.get(hamtLinks, { search: hamtSearch, signal: options.signal }) + yield * this.get(hamtLinks, { search: hamtSearch, signal: options.signal, order: options.order }) } } } @@ -221,7 +229,7 @@ export class Dagula { * * @param {([name, cid]: [string, Link]) => boolean} linkFilter */ -export function breadthFirstSearch (linkFilter = () => true) { +export function blockLinks (linkFilter = () => true) { /** * @param {import('multiformats').BlockView} block */ @@ -245,7 +253,7 @@ export function breadthFirstSearch (linkFilter = () => true) { } } -export const hamtSearch = breadthFirstSearch(([name]) => name.length === 2) +export const hamtSearch = blockLinks(([name]) => name.length === 2) /** * Get links as array of CIDs for a UnixFS entry. diff --git a/test/getPath.test.js b/test/getPath.test.js index b06e5ab..83f09d1 100644 --- a/test/getPath.test.js +++ b/test/getPath.test.js @@ -200,6 +200,171 @@ test('should getPath on file with carScope=file', async t => { t.deepEqual(blocks.at(3).bytes, filePart2.bytes) }) +test('should getPath on large file with carScope=file, default ordering', async t => { + // return all blocks in path and all blocks for resolved target of path + const filePart1 = await Block.decode({ codec: raw, bytes: fromString(`MORE TEST DATA ${Date.now()}`), hasher: sha256 }) + const filePart2 = await Block.decode({ codec: raw, bytes: fromString(`EVEN MORE TEST DATA ${Date.now()}`), hasher: sha256 }) + const filePart3 = await Block.decode({ codec: raw, bytes: fromString(`SO MUCH TEST DATA ${Date.now()}`), hasher: sha256 }) + const filePart4 = await Block.decode({ codec: raw, bytes: fromString(`TEST DATA DOING THE MOST ${Date.now()}`), hasher: sha256 }) + const fileSubNode1 = await Block.encode({ + codec: dagPB, + hasher: sha256, + value: { + Data: new UnixFSv1({ type: 'file' }).marshal(), + Links: [ + { Name: '0', Hash: filePart1.cid }, + { Name: '1', Hash: filePart2.cid } + ] + } + }) + const fileSubNode2 = await Block.encode({ + codec: dagPB, + hasher: sha256, + value: { + Data: new UnixFSv1({ type: 'file' }).marshal(), + Links: [ + { Name: '0', Hash: filePart3.cid }, + { Name: '1', Hash: filePart4.cid } + ] + } + }) + + const fileNode = await Block.encode({ + codec: dagPB, + hasher: sha256, + value: { + Data: new UnixFSv1({ type: 'file' }).marshal(), + Links: [ + { Name: '0', Hash: fileSubNode1.cid }, + { Name: '1', Hash: fileSubNode2.cid } + ] + } + }) + + const dirNode = await Block.encode({ + codec: dagPB, + hasher: sha256, + value: { + Data: new UnixFSv1({ type: 'directory' }).marshal(), + Links: [ + { Name: 'foo', Hash: fileNode.cid }, + { Name: 'other', Hash: CID.parse('QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn') } + ] + } + }) + + const peer = await startBitswapPeer([filePart1, filePart2, filePart3, filePart4, fileSubNode1, fileSubNode2, fileNode, dirNode]) + + const libp2p = await getLibp2p() + const dagula = await fromNetwork(libp2p, { peer: peer.libp2p.getMultiaddrs()[0] }) + + const blocks = [] + const carScope = 'file' + for await (const entry of dagula.getPath(`${dirNode.cid}/foo`, { carScope })) { + blocks.push(entry) + } + // did not try and return block for `other` + t.is(blocks.length, 8) + t.deepEqual(blocks.at(0).cid, dirNode.cid) + t.deepEqual(blocks.at(0).bytes, dirNode.bytes) + t.deepEqual(blocks.at(1).cid, fileNode.cid) + t.deepEqual(blocks.at(1).bytes, fileNode.bytes) + t.deepEqual(blocks.at(2).cid, fileSubNode1.cid) + t.deepEqual(blocks.at(2).bytes, fileSubNode1.bytes) + t.deepEqual(blocks.at(3).cid, fileSubNode2.cid) + t.deepEqual(blocks.at(3).bytes, fileSubNode2.bytes) + t.deepEqual(blocks.at(4).cid, filePart1.cid) + t.deepEqual(blocks.at(4).bytes, filePart1.bytes) + t.deepEqual(blocks.at(5).cid, filePart2.cid) + t.deepEqual(blocks.at(5).bytes, filePart2.bytes) + t.deepEqual(blocks.at(6).cid, filePart3.cid) + t.deepEqual(blocks.at(6).bytes, filePart3.bytes) + t.deepEqual(blocks.at(7).cid, filePart4.cid) + t.deepEqual(blocks.at(7).bytes, filePart4.bytes) +}) + +test('should getPath on large file with carScope=file, dfs ordering', async t => { + // return all blocks in path and all blocks for resolved target of path + const filePart1 = await Block.decode({ codec: raw, bytes: fromString(`MORE TEST DATA ${Date.now()}`), hasher: sha256 }) + const filePart2 = await Block.decode({ codec: raw, bytes: fromString(`EVEN MORE TEST DATA ${Date.now()}`), hasher: sha256 }) + const filePart3 = await Block.decode({ codec: raw, bytes: fromString(`SO MUCH TEST DATA ${Date.now()}`), hasher: sha256 }) + const filePart4 = await Block.decode({ codec: raw, bytes: fromString(`TEST DATA DOING THE MOST ${Date.now()}`), hasher: sha256 }) + const fileSubNode1 = await Block.encode({ + codec: dagPB, + hasher: sha256, + value: { + Data: new UnixFSv1({ type: 'file' }).marshal(), + Links: [ + { Name: '0', Hash: filePart1.cid }, + { Name: '1', Hash: filePart2.cid } + ] + } + }) + const fileSubNode2 = await Block.encode({ + codec: dagPB, + hasher: sha256, + value: { + Data: new UnixFSv1({ type: 'file' }).marshal(), + Links: [ + { Name: '0', Hash: filePart3.cid }, + { Name: '1', Hash: filePart4.cid } + ] + } + }) + + const fileNode = await Block.encode({ + codec: dagPB, + hasher: sha256, + value: { + Data: new UnixFSv1({ type: 'file' }).marshal(), + Links: [ + { Name: '0', Hash: fileSubNode1.cid }, + { Name: '1', Hash: fileSubNode2.cid } + ] + } + }) + + const dirNode = await Block.encode({ + codec: dagPB, + hasher: sha256, + value: { + Data: new UnixFSv1({ type: 'directory' }).marshal(), + Links: [ + { Name: 'foo', Hash: fileNode.cid }, + { Name: 'other', Hash: CID.parse('QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn') } + ] + } + }) + + const peer = await startBitswapPeer([filePart1, filePart2, filePart3, filePart4, fileSubNode1, fileSubNode2, fileNode, dirNode]) + + const libp2p = await getLibp2p() + const dagula = await fromNetwork(libp2p, { peer: peer.libp2p.getMultiaddrs()[0] }) + + const blocks = [] + const carScope = 'file' + for await (const entry of dagula.getPath(`${dirNode.cid}/foo`, { carScope, order: 'dfs' })) { + blocks.push(entry) + } + // did not try and return block for `other` + t.is(blocks.length, 8) + t.deepEqual(blocks.at(0).cid, dirNode.cid) + t.deepEqual(blocks.at(0).bytes, dirNode.bytes) + t.deepEqual(blocks.at(1).cid, fileNode.cid) + t.deepEqual(blocks.at(1).bytes, fileNode.bytes) + t.deepEqual(blocks.at(2).cid, fileSubNode1.cid) + t.deepEqual(blocks.at(2).bytes, fileSubNode1.bytes) + t.deepEqual(blocks.at(3).cid, filePart1.cid) + t.deepEqual(blocks.at(3).bytes, filePart1.bytes) + t.deepEqual(blocks.at(4).cid, filePart2.cid) + t.deepEqual(blocks.at(4).bytes, filePart2.bytes) + t.deepEqual(blocks.at(5).cid, fileSubNode2.cid) + t.deepEqual(blocks.at(5).bytes, fileSubNode2.bytes) + t.deepEqual(blocks.at(6).cid, filePart3.cid) + t.deepEqual(blocks.at(6).bytes, filePart3.bytes) + t.deepEqual(blocks.at(7).cid, filePart4.cid) + t.deepEqual(blocks.at(7).bytes, filePart4.bytes) +}) test('should getPath on file with carScope=block', async t => { // return all blocks in path and all blocks for resolved target of path const filePart1 = await Block.decode({ codec: raw, bytes: fromString(`MORE TEST DATA ${Date.now()}`), hasher: sha256 })