Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds progress events to the importer and exporter #302

Merged
merged 6 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions benchmarks/import/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Import Benchmark

How much memory does the importer use while importing files?

It should be relatively flat to enable importing files larger than physical memory.

## Usage

```console
$ npm i
$ npm start

> benchmarks-gc@1.0.0 start
> npm run build && node dist/src/index.js


> benchmarks-gc@1.0.0 build
> aegir build --bundle false

[14:51:28] tsc [started]
[14:51:33] tsc [completed]
generating Ed25519 keypair...
┌─────────┬────────────────┬─────────┬───────────┬──────┐
│ (index) │ Implementation │ ops/s │ ms/op │ runs │
├─────────┼────────────────┼─────────┼───────────┼──────┤
//... results here
```
42 changes: 42 additions & 0 deletions benchmarks/import/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"name": "ipfs-unixfs-memory-benchmark",
"version": "0.0.0",
"description": "Memory benchmarks for ipfs-unixfs-importer",
"license": "Apache-2.0 OR MIT",
"private": true,
"type": "module",
"types": "./dist/src/index.d.ts",
"files": [
"src",
"dist",
"!dist/test",
"!**/*.tsbuildinfo"
],
"exports": {
".": {
"types": "./dist/src/index.d.ts",
"import": "./dist/src/index.js"
}
},
"eslintConfig": {
"extends": "ipfs",
"parserOptions": {
"sourceType": "module"
}
},
"scripts": {
"build": "aegir build --bundle false",
"clean": "aegir clean",
"lint": "aegir lint",
"dep-check": "aegir dep-check",
"start": "npm run build && node --expose-gc ./dist/test/index.spec.js"
},
"devDependencies": {
"aegir": "^38.1.2",
"blockstore-core": "^4.0.1",
"blockstore-fs": "^1.0.0",
"ipfs-unixfs-importer": "../../packages/ipfs-unixfs-importer",
"it-buffer-stream": "^3.0.1",
"it-drain": "^2.0.1"
}
}
1 change: 1 addition & 0 deletions benchmarks/import/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export {}
61 changes: 61 additions & 0 deletions benchmarks/import/test/index.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/* eslint-env mocha */

import { importer, ImporterOptions } from 'ipfs-unixfs-importer'
import bufferStream from 'it-buffer-stream'
import { MemoryBlockstore } from 'blockstore-core'
import drain from 'it-drain'

const REPEATS = 10
const FILE_SIZE = Math.pow(2, 20) * 500 // 500MB
const CHUNK_SIZE = 65536

async function main (): Promise<void> {
const block = new MemoryBlockstore()
const times: number[] = []

for (let i = 0; i < REPEATS; i++) {
const size = FILE_SIZE
let read = 0
let lastDate = Date.now()
let lastPercent = 0

const options: Partial<ImporterOptions> = {
onProgress: (evt) => {
if (evt.type === 'unixfs:importer:progress:file:read') {
read += Number(evt.detail.bytesRead)

const percent = Math.round((read / size) * 100)

if (percent > lastPercent) {
times[percent] = (times[percent] ?? 0) + (Date.now() - lastDate)

lastDate = Date.now()
lastPercent = percent
}
}
}
}

const buf = new Uint8Array(CHUNK_SIZE).fill(0)

await drain(importer([{
path: '200Bytes.txt',
content: bufferStream(size, {
chunkSize: CHUNK_SIZE,
generator: () => {
return buf
}
})
}], block, options))
}

console.info('Percent\tms') // eslint-disable-line no-console
times.forEach((time, index) => {
console.info(`${index}\t${Math.round(time / REPEATS)}`) // eslint-disable-line no-console
})
}

main().catch(err => {
console.error(err) // eslint-disable-line no-console
process.exit(1)
})
10 changes: 10 additions & 0 deletions benchmarks/import/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"extends": "aegir/src/config/tsconfig.aegir.json",
"compilerOptions": {
"outDir": "dist"
},
"include": [
"src",
"test"
]
}
42 changes: 40 additions & 2 deletions packages/ipfs-unixfs-exporter/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,47 @@ import type { UnixFS } from 'ipfs-unixfs'
import type { PBNode } from '@ipld/dag-pb'
import type { Blockstore } from 'interface-blockstore'
import type { Bucket } from 'hamt-sharding'
import type { ProgressOptions } from 'progress-events'
import type { ProgressOptions, ProgressEvent } from 'progress-events'

export interface ExportProgress {
/**
* How many bytes of the file have been read
*/
bytesRead: bigint

/**
* How many bytes of the file will be read - n.b. this may be
* smaller than `fileSize` if `offset`/`length` have been
* specified
*/
totalBytes: bigint

/**
* The size of the file being read - n.b. this may be
* larger than `total` if `offset`/`length` has been
* specified
*/
fileSize: bigint
}

export interface ExportWalk {
cid: CID
}

export interface ExporterOptions extends ProgressOptions {
/**
* Progress events emitted by the exporter
*/
export type ExporterProgressEvents =
ProgressEvent<'unixfs:exporter:progress:unixfs:file', ExportProgress> |
ProgressEvent<'unixfs:exporter:progress:unixfs:raw', ExportProgress> |
ProgressEvent<'unixfs:exporter:progress:raw', ExportProgress> |
ProgressEvent<'unixfs:exporter:progress:identity', ExportProgress> |
ProgressEvent<'unixfs:exporter:walk:file', ExportWalk> |
ProgressEvent<'unixfs:exporter:walk:directory', ExportWalk> |
ProgressEvent<'unixfs:exporter:walk:hamt-sharded-directory', ExportWalk> |
ProgressEvent<'unixfs:exporter:walk:raw', ExportWalk>

export interface ExporterOptions extends ProgressOptions<ExporterProgressEvents> {
offset?: number
length?: number
signal?: AbortSignal
Expand Down
13 changes: 11 additions & 2 deletions packages/ipfs-unixfs-exporter/src/resolvers/identity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import errCode from 'err-code'
import extractDataFromBlock from '../utils/extract-data-from-block.js'
import validateOffsetAndLength from '../utils/validate-offset-and-length.js'
import * as mh from 'multiformats/hashes/digest'
import type { ExporterOptions, Resolver } from '../index.js'
import type { ExporterOptions, Resolver, ExportProgress } from '../index.js'
import { CustomProgressEvent } from 'progress-events'

const rawContent = (node: Uint8Array): ((options?: ExporterOptions) => AsyncGenerator<Uint8Array, void, undefined>) => {
async function * contentGenerator (options: ExporterOptions = {}): AsyncGenerator<Uint8Array, void, undefined> {
Expand All @@ -11,7 +12,15 @@ const rawContent = (node: Uint8Array): ((options?: ExporterOptions) => AsyncGene
length
} = validateOffsetAndLength(node.length, options.offset, options.length)

yield extractDataFromBlock(node, 0n, offset, offset + length)
const buf = extractDataFromBlock(node, 0n, offset, offset + length)

options.onProgress?.(new CustomProgressEvent<ExportProgress>('unixfs:exporter:progress:identity', {
bytesRead: BigInt(buf.byteLength),
totalBytes: length - offset,
fileSize: BigInt(node.byteLength)
}))

yield buf
}

return contentGenerator
Expand Down
13 changes: 11 additions & 2 deletions packages/ipfs-unixfs-exporter/src/resolvers/raw.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import errCode from 'err-code'
import type { ExporterOptions, Resolver } from '../index.js'
import type { ExporterOptions, Resolver, ExportProgress } from '../index.js'
import extractDataFromBlock from '../utils/extract-data-from-block.js'
import validateOffsetAndLength from '../utils/validate-offset-and-length.js'
import { CustomProgressEvent } from 'progress-events'

const rawContent = (node: Uint8Array): ((options?: ExporterOptions) => AsyncGenerator<Uint8Array, void, undefined>) => {
async function * contentGenerator (options: ExporterOptions = {}): AsyncGenerator<Uint8Array, void, undefined> {
Expand All @@ -10,7 +11,15 @@ const rawContent = (node: Uint8Array): ((options?: ExporterOptions) => AsyncGene
length
} = validateOffsetAndLength(node.length, options.offset, options.length)

yield extractDataFromBlock(node, 0n, offset, offset + length)
const buf = extractDataFromBlock(node, 0n, offset, offset + length)

options.onProgress?.(new CustomProgressEvent<ExportProgress>('unixfs:exporter:progress:raw', {
bytesRead: BigInt(buf.byteLength),
totalBytes: length - offset,
fileSize: BigInt(node.byteLength)
}))

yield buf
}

return contentGenerator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@ import parallel from 'it-parallel'
import { pipe } from 'it-pipe'
import map from 'it-map'
import filter from 'it-filter'
import type { ExporterOptions, UnixfsV1DirectoryContent, UnixfsV1Resolver } from '../../../index.js'
import type { ExporterOptions, ExportWalk, UnixfsV1DirectoryContent, UnixfsV1Resolver } from '../../../index.js'
import { CustomProgressEvent } from 'progress-events'

const directoryContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth, blockstore) => {
async function * yieldDirectoryContent (options: ExporterOptions = {}): UnixfsV1DirectoryContent {
const offset = options.offset ?? 0
const length = options.length ?? node.Links.length
const links = node.Links.slice(offset, length)

options.onProgress?.(new CustomProgressEvent<ExportWalk>('unixfs:exporter:walk:directory', {
cid
}))

yield * pipe(
links,
source => map(source, link => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import parallel from 'it-parallel'
import { pipe } from 'it-pipe'
import map from 'it-map'
import PQueue from 'p-queue'
import type { ExporterOptions, UnixfsV1FileContent, UnixfsV1Resolver, ReadableStorage } from '../../../index.js'
import type { ExporterOptions, UnixfsV1FileContent, UnixfsV1Resolver, ReadableStorage, ExportProgress, ExportWalk } from '../../../index.js'
import { CustomProgressEvent } from 'progress-events'

async function walkDAG (blockstore: ReadableStorage, node: dagPb.PBNode | Uint8Array, queue: Pushable<Uint8Array>, streamPosition: bigint, start: bigint, end: bigint, options: ExporterOptions): Promise<void> {
// a `raw` node
Expand Down Expand Up @@ -110,6 +111,10 @@ async function walkDAG (blockstore: ReadableStorage, node: dagPb.PBNode | Uint8A

// if the job rejects the 'error' event will be emitted on the child queue
void childQueue.add(async () => {
options.onProgress?.(new CustomProgressEvent<ExportWalk>('unixfs:exporter:walk:file', {
cid: link.Hash
}))

await walkDAG(blockstore, child, queue, blockStart, start, end, options)
})

Expand Down Expand Up @@ -138,12 +143,15 @@ const fileContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth,
}

let read = 0n
const wanted = length - offset
const queue = pushable()

options.onProgress?.(new CustomProgressEvent<ExportWalk>('unixfs:exporter:walk:file', {
cid
}))

void walkDAG(blockstore, node, queue, 0n, offset, offset + length, options)
.then(() => {
const wanted = length - offset

if (read < wanted) {
throw errCode(new Error('Traversed entire DAG but did not read enough bytes'), 'ERR_UNDER_READ')
}
Expand All @@ -169,6 +177,12 @@ const fileContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth,
queue.end()
}

options.onProgress?.(new CustomProgressEvent<ExportProgress>('unixfs:exporter:progress:unixfs:file', {
bytesRead: read,
totalBytes: wanted,
fileSize
}))

yield buf
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@ import parallel from 'it-parallel'
import { pipe } from 'it-pipe'
import map from 'it-map'
import { decode, PBNode } from '@ipld/dag-pb'
import type { ExporterOptions, Resolve, UnixfsV1DirectoryContent, UnixfsV1Resolver, ReadableStorage } from '../../../index.js'
import type { ExporterOptions, Resolve, UnixfsV1DirectoryContent, UnixfsV1Resolver, ReadableStorage, ExportWalk } from '../../../index.js'
import { CustomProgressEvent } from 'progress-events'

const hamtShardedDirectoryContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth, blockstore) => {
function yieldHamtDirectoryContent (options: ExporterOptions = {}): UnixfsV1DirectoryContent {
options.onProgress?.(new CustomProgressEvent<ExportWalk>('unixfs:exporter:walk:hamt-sharded-directory', {
cid
}))

return listDirectory(node, path, resolve, depth, blockstore, options)
}

Expand All @@ -30,6 +35,10 @@ async function * listDirectory (node: PBNode, path: string, resolve: Resolve, de
const block = await blockstore.get(link.Hash, options)
node = decode(block)

options.onProgress?.(new CustomProgressEvent<ExportWalk>('unixfs:exporter:walk:hamt-sharded-directory', {
cid: link.Hash
}))

return { entries: listDirectory(node, path, resolve, depth, blockstore, options) }
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { ExporterOptions, UnixfsV1Resolver } from '../../../index.js'
import { CustomProgressEvent } from 'progress-events'
import type { ExporterOptions, ExportProgress, ExportWalk, UnixfsV1Resolver } from '../../../index.js'
import extractDataFromBlock from '../../../utils/extract-data-from-block.js'
import validateOffsetAndLength from '../../../utils/validate-offset-and-length.js'

Expand All @@ -15,7 +16,19 @@ const rawContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth, b
length
} = validateOffsetAndLength(size, options.offset, options.length)

yield extractDataFromBlock(unixfs.data, 0n, offset, offset + length)
options.onProgress?.(new CustomProgressEvent<ExportWalk>('unixfs:exporter:walk:raw', {
cid
}))

const buf = extractDataFromBlock(unixfs.data, 0n, offset, offset + length)

options.onProgress?.(new CustomProgressEvent<ExportProgress>('unixfs:exporter:progress:unixfs:raw', {
bytesRead: BigInt(buf.byteLength),
totalBytes: length - offset,
fileSize: BigInt(unixfs.data.byteLength)
}))

yield buf
}

return yieldRawContent
Expand Down
4 changes: 2 additions & 2 deletions packages/ipfs-unixfs-exporter/test/importer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -652,8 +652,8 @@ strategies.forEach((strategy) => {
}], block, options))

expect(onProgress.called).to.equal(true)
expect(onProgress.getCall(0).args[0]).to.have.property('type', 'unixfs:importer:progress')
expect(onProgress.getCall(0).args[0]).to.have.deep.property('detail', { bytes: chunkSize, path })
expect(onProgress.getCall(0).args[0]).to.have.property('type', 'unixfs:importer:progress:file:read')
expect(onProgress.getCall(0).args[0]).to.have.deep.property('detail', { bytesRead: BigInt(chunkSize), chunkSize: BigInt(chunkSize), path })
})

it('will import files with CID version 1', async () => {
Expand Down
Loading