From 7d5aa247e063ade5ab6e6ebe54a52d8b248ee88d Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 20 Sep 2023 15:50:21 +0100 Subject: [PATCH 1/3] feat: add blockReadConcurrency option to exporter By default we attempt to load all sibilings in a given layer of a DAG at once to allow slow/async loading routines extra time to fetch data before it is needed. Some blockstores (e.g. CAR files) require the exporter to only request the next sequential CID in a DAG. Add a `blockReadConcurrency` option (named similarly to the importer's `blockWriteConcurrency` option) to control this behaviour. Fixes #359 --- packages/ipfs-unixfs-exporter/README.md | 8 ++- packages/ipfs-unixfs-exporter/src/index.ts | 28 ++++++++ .../src/resolvers/unixfs-v1/content/file.ts | 3 +- .../test/exporter.spec.ts | 68 +++++++++++++++++++ 4 files changed, 103 insertions(+), 4 deletions(-) diff --git a/packages/ipfs-unixfs-exporter/README.md b/packages/ipfs-unixfs-exporter/README.md index 66655aa0..ea38f531 100644 --- a/packages/ipfs-unixfs-exporter/README.md +++ b/packages/ipfs-unixfs-exporter/README.md @@ -17,7 +17,7 @@ - [UnixFSEntry](#unixfsentry) - [Raw entries](#raw-entries) - [CBOR entries](#cbor-entries) - - [`entry.content({ offset, length })`](#entrycontent-offset-length-) + - [`entry.content({ offset, length, blockReadConcurrency })`](#entrycontent-offset-length-blockreadconcurrency-) - [`walkPath(cid, blockstore)`](#walkpathcid-blockstore) - [`recursive(cid, blockstore)`](#recursivecid-blockstore) - [API Docs](#api-docs) @@ -168,9 +168,11 @@ Entries with a `dag-cbor` codec `CID` return JavaScript object entries: There is no `content` function for a `CBOR` node. -### `entry.content({ offset, length })` +### `entry.content({ offset, length, blockReadConcurrency })` -When `entry` is a file or a `raw` node, `offset` and/or `length` arguments can be passed to `entry.content()` to return slices of data: +When `entry` is a file or a `raw` node, `offset` and/or `length` arguments can be passed to `entry.content()` to return slices of data. + +`blockReadConcurrency` is an advanced option that lets you control how many blocks are loaded from the blockstore at once. By default it will attempt to load all siblings from the current DAG layer in one go, but this can be reduced if, for example, your blockstore requires data access in a proscribed manner. ```javascript const length = 5 diff --git a/packages/ipfs-unixfs-exporter/src/index.ts b/packages/ipfs-unixfs-exporter/src/index.ts index e0293c53..9e9759ec 100644 --- a/packages/ipfs-unixfs-exporter/src/index.ts +++ b/packages/ipfs-unixfs-exporter/src/index.ts @@ -47,9 +47,37 @@ export type ExporterProgressEvents = ProgressEvent<'unixfs:exporter:walk:raw', ExportWalk> export interface ExporterOptions extends ProgressOptions { + /** + * An optional offset to start reading at. + * + * If the CID resolves to a file this will be a byte offset within that file, + * otherwise if it's a directory it will be a directory entry offset within + * the directory listing. (default: undefined) + */ offset?: number + + /** + * An optional length to read. + * + * If the CID resolves to a file this will be the number of bytes read from + * the file, otherwise if it's a directory it will be the number of directory + * entries read from the directory listing. (default: undefined) + */ length?: number + + /** + * This signal can be used to abort any long-lived operations such as fetching + * blocks from the network. (default: undefined) + */ signal?: AbortSignal + + /** + * When a DAG layer is encountered, all child nodes are loaded in parallel but + * processed as they arrive. This allows us to load sibling nodes in advance + * of yielding their bytes. Pass a value here to control the amount of blocks + * loaded in parallel. (default: undefined) + */ + blockReadConcurrency?: number } export interface Exportable { diff --git a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts index 1da18056..f65a449a 100644 --- a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts +++ b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts @@ -84,7 +84,8 @@ async function walkDAG (blockstore: ReadableStorage, node: dagPb.PBNode | Uint8A } }), (source) => parallel(source, { - ordered: true + ordered: true, + concurrency: options.blockReadConcurrency }), async (source) => { for await (const { link, block, blockStart } of source) { diff --git a/packages/ipfs-unixfs-exporter/test/exporter.spec.ts b/packages/ipfs-unixfs-exporter/test/exporter.spec.ts index af28be2e..ef600a43 100644 --- a/packages/ipfs-unixfs-exporter/test/exporter.spec.ts +++ b/packages/ipfs-unixfs-exporter/test/exporter.spec.ts @@ -19,6 +19,7 @@ import * as raw from 'multiformats/codecs/raw' import { identity } from 'multiformats/hashes/identity' import { sha256 } from 'multiformats/hashes/sha2' import { Readable } from 'readable-stream' +import Sinon from 'sinon' import { concat as uint8ArrayConcat } from 'uint8arrays/concat' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' @@ -1309,4 +1310,71 @@ describe('exporter', () => { dataSizeInBytes *= 10 } }) + + it('should allow control of block read concurrency', async () => { + // create a multi-layered DAG of a manageable size + const imported = await first(importer([{ + path: '1.2MiB.txt', + content: asAsyncIterable(smallFile) + }], block, { + rawLeaves: true, + chunker: fixedSize({ chunkSize: 50 }), + layout: balanced({ maxChildrenPerNode: 2 }) + })) + + if (imported == null) { + throw new Error('Nothing imported') + } + + const node = dagPb.decode(await block.get(imported.cid)) + expect(node.Links).to.have.lengthOf(2, 'imported node had too many children') + + const child1 = dagPb.decode(await block.get(node.Links[0].Hash)) + expect(child1.Links).to.have.lengthOf(2, 'layer 1 node had too many children') + + const child2 = dagPb.decode(await block.get(node.Links[1].Hash)) + expect(child2.Links).to.have.lengthOf(2, 'layer 1 node had too many children') + + // should be raw nodes + expect(child1.Links[0].Hash.code).to.equal(raw.code, 'layer 2 node had wrong codec') + expect(child1.Links[1].Hash.code).to.equal(raw.code, 'layer 2 node had wrong codec') + expect(child2.Links[0].Hash.code).to.equal(raw.code, 'layer 2 node had wrong codec') + expect(child2.Links[1].Hash.code).to.equal(raw.code, 'layer 2 node had wrong codec') + + // export file + const file = await exporter(imported.cid, block) + + // export file data with default settings + const blockReadSpy = Sinon.spy(block, 'get') + const contentWithDefaultBlockConcurrency = await toBuffer(file.content()) + + // blocks should be loaded in default order - a whole level of sibling nodes at a time + expect(blockReadSpy.getCalls().map(call => call.args[0].toString())).to.deep.equal([ + node.Links[0].Hash.toString(), + node.Links[1].Hash.toString(), + child1.Links[0].Hash.toString(), + child1.Links[1].Hash.toString(), + child2.Links[0].Hash.toString(), + child2.Links[1].Hash.toString() + ]) + + // export file data overriding read concurrency + blockReadSpy.resetHistory() + const contentWitSmallBlockConcurrency = await toBuffer(file.content({ + blockReadConcurrency: 1 + })) + + // blocks should be loaded in traversal order + expect(blockReadSpy.getCalls().map(call => call.args[0].toString())).to.deep.equal([ + node.Links[0].Hash.toString(), + child1.Links[0].Hash.toString(), + child1.Links[1].Hash.toString(), + node.Links[1].Hash.toString(), + child2.Links[0].Hash.toString(), + child2.Links[1].Hash.toString() + ]) + + // ensure exported bytes are the same + expect(contentWithDefaultBlockConcurrency).to.equalBytes(contentWitSmallBlockConcurrency) + }) }) From a2ea15a08de8377cebaa6c692556f6de5451347e Mon Sep 17 00:00:00 2001 From: achingbrain Date: Fri, 19 Jan 2024 09:22:23 +0100 Subject: [PATCH 2/3] chore: ensure dirs/shards can be read in series and add tests --- packages/ipfs-unixfs-exporter/package.json | 1 + .../resolvers/unixfs-v1/content/directory.ts | 5 +- .../content/hamt-sharded-directory.ts | 5 +- .../test/exporter.spec.ts | 160 +++++++++++++++++- 4 files changed, 168 insertions(+), 3 deletions(-) diff --git a/packages/ipfs-unixfs-exporter/package.json b/packages/ipfs-unixfs-exporter/package.json index 84d0c8e1..f4d86986 100644 --- a/packages/ipfs-unixfs-exporter/package.json +++ b/packages/ipfs-unixfs-exporter/package.json @@ -79,6 +79,7 @@ "iso-random-stream": "^2.0.2", "it-all": "^3.0.2", "it-buffer-stream": "^3.0.0", + "it-drain": "^3.0.5", "it-first": "^3.0.2", "it-to-buffer": "^4.0.2", "merge-options": "^3.0.4", diff --git a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/directory.ts b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/directory.ts index bfa1d61d..afab2634 100644 --- a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/directory.ts +++ b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/directory.ts @@ -25,7 +25,10 @@ const directoryContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, de return result.entry } }), - source => parallel(source, { ordered: true }), + source => parallel(source, { + ordered: true, + concurrency: options.blockReadConcurrency + }), source => filter(source, entry => entry != null) ) } diff --git a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts index 9e59d7c9..1c482c68 100644 --- a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts +++ b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts @@ -62,7 +62,10 @@ async function * listDirectory (node: PBNode, path: string, resolve: Resolve, de } } }), - source => parallel(source, { ordered: true }) + source => parallel(source, { + ordered: true, + concurrency: options.blockReadConcurrency + }) ) for await (const { entries } of results) { diff --git a/packages/ipfs-unixfs-exporter/test/exporter.spec.ts b/packages/ipfs-unixfs-exporter/test/exporter.spec.ts index 22ab4d0b..a8c5aa74 100644 --- a/packages/ipfs-unixfs-exporter/test/exporter.spec.ts +++ b/packages/ipfs-unixfs-exporter/test/exporter.spec.ts @@ -12,6 +12,7 @@ import { fixedSize } from 'ipfs-unixfs-importer/chunker' import { balanced, type FileLayout, flat, trickle } from 'ipfs-unixfs-importer/layout' import all from 'it-all' import randomBytes from 'it-buffer-stream' +import drain from 'it-drain' import first from 'it-first' import last from 'it-last' import toBuffer from 'it-to-buffer' @@ -1345,7 +1346,7 @@ describe('exporter', () => { } }) - it('should allow control of block read concurrency', async () => { + it('should allow control of block read concurrency for files', async () => { // create a multi-layered DAG of a manageable size const imported = await first(importer([{ path: '1.2MiB.txt', @@ -1411,4 +1412,161 @@ describe('exporter', () => { // ensure exported bytes are the same expect(contentWithDefaultBlockConcurrency).to.equalBytes(contentWitSmallBlockConcurrency) }) + + it('should allow control of block read concurrency for directories', async () => { + const entries = 1024 + + // create a largeish directory + const imported = await last(importer((async function * () { + for (let i = 0; i < entries; i++) { + yield { + path: `file-${i}.txt`, + content: Uint8Array.from([i]) + } + } + })(), block, { + wrapWithDirectory: true + })) + + if (imported == null) { + throw new Error('Nothing imported') + } + + const node = dagPb.decode(await block.get(imported.cid)) + expect(node.Links).to.have.lengthOf(entries, 'imported node had too many children') + + for (const link of node.Links) { + // should be raw nodes + expect(link.Hash.code).to.equal(raw.code, 'child node had wrong codec') + } + + // export directory + const directory = await exporter(imported.cid, block) + + // export file data with default settings + const originalGet = block.get.bind(block) + + const expectedInvocations: string[] = [] + + for (const link of node.Links) { + expectedInvocations.push(`${link.Hash.toString()}-start`) + expectedInvocations.push(`${link.Hash.toString()}-end`) + } + + const actualInvocations: string[] = [] + + block.get = async (cid) => { + actualInvocations.push(`${cid.toString()}-start`) + + // introduce a small delay - if running in parallel actualInvocations will + // be: + // `foo-start`, `bar-start`, `baz-start`, `foo-end`, `bar-end`, `baz-end` + // if in series it will be: + // `foo-start`, `foo-end`, `bar-start`, `bar-end`, `baz-start`, `baz-end` + await delay(1) + + actualInvocations.push(`${cid.toString()}-end`) + + return originalGet(cid) + } + + const blockReadSpy = Sinon.spy(block, 'get') + await drain(directory.content({ + blockReadConcurrency: 1 + })) + + // blocks should be loaded in default order - a whole level of sibling nodes at a time + expect(blockReadSpy.getCalls().map(call => call.args[0].toString())).to.deep.equal( + node.Links.map(link => link.Hash.toString()) + ) + + expect(actualInvocations).to.deep.equal(expectedInvocations) + }) + + it('should allow control of block read concurrency for HAMT sharded directories', async () => { + const entries = 1024 + + // create a sharded directory + const imported = await last(importer((async function * () { + for (let i = 0; i < entries; i++) { + yield { + path: `file-${i}.txt`, + content: Uint8Array.from([i]) + } + } + })(), block, { + wrapWithDirectory: true, + shardSplitThresholdBytes: 10 + })) + + if (imported == null) { + throw new Error('Nothing imported') + } + + const node = dagPb.decode(await block.get(imported.cid)) + const data = UnixFS.unmarshal(node.Data ?? new Uint8Array(0)) + expect(data.type).to.equal('hamt-sharded-directory') + + // traverse the shard, collect all the CIDs + async function collectCIDs (node: PBNode): Promise { + const children: CID[] = [] + + for (const link of node.Links) { + children.push(link.Hash) + + if (link.Hash.code === dagPb.code) { + const buf = await block.get(link.Hash) + const childNode = dagPb.decode(buf) + + children.push(...(await collectCIDs(childNode))) + } + } + + return children + } + + const children: CID[] = await collectCIDs(node) + + // export directory + const directory = await exporter(imported.cid, block) + + // export file data with default settings + const originalGet = block.get.bind(block) + + const expectedInvocations: string[] = [] + + for (const cid of children) { + expectedInvocations.push(`${cid.toString()}-start`) + expectedInvocations.push(`${cid.toString()}-end`) + } + + const actualInvocations: string[] = [] + + block.get = async (cid) => { + actualInvocations.push(`${cid.toString()}-start`) + + // introduce a small delay - if running in parallel actualInvocations will + // be: + // `foo-start`, `bar-start`, `baz-start`, `foo-end`, `bar-end`, `baz-end` + // if in series it will be: + // `foo-start`, `foo-end`, `bar-start`, `bar-end`, `baz-start`, `baz-end` + await delay(1) + + actualInvocations.push(`${cid.toString()}-end`) + + return originalGet(cid) + } + + const blockReadSpy = Sinon.spy(block, 'get') + await drain(directory.content({ + blockReadConcurrency: 1 + })) + + // blocks should be loaded in default order - a whole level of sibling nodes at a time + expect(blockReadSpy.getCalls().map(call => call.args[0].toString())).to.deep.equal( + children.map(link => link.toString()) + ) + + expect(actualInvocations).to.deep.equal(expectedInvocations) + }) }) From 8df5e892b8dc1f3622c53bd13264546a449367d5 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Fri, 19 Jan 2024 09:57:57 +0100 Subject: [PATCH 3/3] chore: apply suggestions from code review Co-authored-by: Rod Vagg --- packages/ipfs-unixfs-exporter/src/index.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/ipfs-unixfs-exporter/src/index.ts b/packages/ipfs-unixfs-exporter/src/index.ts index d8e2e30f..b1adc319 100644 --- a/packages/ipfs-unixfs-exporter/src/index.ts +++ b/packages/ipfs-unixfs-exporter/src/index.ts @@ -121,8 +121,11 @@ export interface ExporterOptions extends ProgressOptions /** * When a DAG layer is encountered, all child nodes are loaded in parallel but * processed as they arrive. This allows us to load sibling nodes in advance - * of yielding their bytes. Pass a value here to control the amount of blocks - * loaded in parallel. (default: undefined) + * of yielding their bytes. Pass a value here to control the number of blocks + * loaded in parallel. If a strict depth-first traversal is required, this + * value should be set to `1`, otherwise the traversal order will tend to + * resemble a breadth-first fan-out and yield a have stable ordering. + * (default: undefined) */ blockReadConcurrency?: number }