Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

feat: expose progress events from importer, blockstore and bitswap #13

Merged
merged 2 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/interop/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@
"go-ipfs": "^0.18.1",
"helia": "next",
"ipfs-core-types": "^0.14.0",
"ipfs-unixfs-importer": "^15.0.0",
"ipfs-unixfs-importer": "^15.0.1",
"ipfsd-ctl": "^13.0.0",
"it-to-buffer": "^3.0.1",
"kubo-rpc-client": "^3.0.0",
"libp2p": "next",
"merge-options": "^3.0.4",
Expand Down
82 changes: 82 additions & 0 deletions packages/interop/test/bitswap.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/* eslint-env mocha */

import { expect } from 'aegir/chai'
import { createHeliaNode } from './fixtures/create-helia.js'
import { createKuboNode } from './fixtures/create-kubo.js'
import type { Helia } from '@helia/interface'
import type { Controller } from 'ipfsd-ctl'
import { UnixFS, unixfs } from '@helia/unixfs'
import type { FileCandidate } from 'ipfs-unixfs-importer'
import toBuffer from 'it-to-buffer'

describe('unixfs bitswap interop', () => {
let helia: Helia
let unixFs: UnixFS
let kubo: Controller

beforeEach(async () => {
helia = await createHeliaNode()
unixFs = unixfs(helia)
kubo = await createKuboNode()

// connect helia to kubo
await helia.libp2p.peerStore.addressBook.add(kubo.peer.id, kubo.peer.addresses)
await helia.libp2p.dial(kubo.peer.id)
})

afterEach(async () => {
if (helia != null) {
await helia.stop()
}

if (kubo != null) {
await kubo.stop()
}
})

it('should add a large file to helia and fetch it from kubo', async () => {
const chunkSize = 1024 * 1024
const size = chunkSize * 10
const input: Uint8Array[] = []

const candidate: FileCandidate = {
content: (async function * () {
for (let i = 0; i < size; i += chunkSize) {
const buf = new Uint8Array(chunkSize)
input.push(buf)

yield buf
}
}())
}

const cid = await unixFs.addFile(candidate)

const bytes = await toBuffer(kubo.api.cat(cid))

expect(bytes).to.equalBytes(await toBuffer(input))
})

it('should add a large file to kubo and fetch it from helia', async () => {
const chunkSize = 1024 * 1024
const size = chunkSize * 10
const input: Uint8Array[] = []

const candidate: FileCandidate = {
content: (async function * () {
for (let i = 0; i < size; i += chunkSize) {
const buf = new Uint8Array(chunkSize)
input.push(buf)

yield buf
}
}())
}

const { cid } = await kubo.api.add(candidate.content)

const bytes = await toBuffer(unixFs.cat(cid))

expect(bytes).to.equalBytes(await toBuffer(input))
})
})
12 changes: 6 additions & 6 deletions packages/interop/test/files.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,31 @@ import { createHeliaNode } from './fixtures/create-helia.js'
import { createKuboNode } from './fixtures/create-kubo.js'
import type { Helia } from '@helia/interface'
import type { Controller } from 'ipfsd-ctl'
import { UnixFS, unixfs } from '@helia/unixfs'
import { AddOptions, UnixFS, unixfs } from '@helia/unixfs'
import { balanced } from 'ipfs-unixfs-importer/layout'
import { fixedSize } from 'ipfs-unixfs-importer/chunker'
import type { FileCandidate, ImporterOptions } from 'ipfs-unixfs-importer'
import type { FileCandidate } from 'ipfs-unixfs-importer'
import type { CID } from 'multiformats/cid'
import type { AddOptions } from 'ipfs-core-types/src/root.js'
import type { AddOptions as KuboAddOptions } from 'ipfs-core-types/src/root.js'

describe('unixfs interop', () => {
let helia: Helia
let unixFs: UnixFS
let kubo: Controller

async function importToHelia (data: FileCandidate, opts?: Partial<ImporterOptions>): Promise<CID> {
async function importToHelia (data: FileCandidate, opts?: Partial<AddOptions>): Promise<CID> {
const cid = await unixFs.addFile(data, opts)

return cid
}

async function importToKubo (data: FileCandidate, opts?: AddOptions): Promise<CID> {
async function importToKubo (data: FileCandidate, opts?: KuboAddOptions): Promise<CID> {
const result = await kubo.api.add(data.content, opts)

return result.cid
}

async function expectSameCid (data: () => FileCandidate, heliaOpts: Partial<ImporterOptions> = {}, kuboOpts: AddOptions = {}): Promise<void> {
async function expectSameCid (data: () => FileCandidate, heliaOpts: Partial<AddOptions> = {}, kuboOpts: KuboAddOptions = {}): Promise<void> {
const heliaCid = await importToHelia(data(), {
// these are the default kubo options
cidVersion: 0,
Expand Down
8 changes: 6 additions & 2 deletions packages/unixfs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,13 @@
"hamt-sharding": "^3.0.2",
"interface-blockstore": "^5.0.0",
"ipfs-unixfs": "^11.0.0",
"ipfs-unixfs-exporter": "^13.0.0",
"ipfs-unixfs-importer": "^15.0.0",
"ipfs-unixfs-exporter": "^13.0.1",
"ipfs-unixfs-importer": "^15.0.1",
"it-last": "^2.0.0",
"it-pipe": "^2.0.5",
"merge-options": "^3.0.4",
"multiformats": "^11.0.1",
"progress-events": "^1.0.0",
"sparse-array": "^1.3.2"
},
"devDependencies": {
Expand All @@ -164,5 +165,8 @@
"it-first": "^2.0.0",
"it-to-buffer": "^3.0.0",
"uint8arrays": "^4.0.3"
},
"typedoc": {
"entryPoint": "./src/index.ts"
}
}
3 changes: 2 additions & 1 deletion packages/unixfs/src/commands/chmod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export async function chmod (cid: CID, mode: number, blockstore: Blocks, options
// but do not reimport files, only manipulate dag-pb nodes
const root = await pipe(
async function * () {
for await (const entry of recursive(resolved.cid, blockstore)) {
for await (const entry of recursive(resolved.cid, blockstore, options)) {
let metadata: UnixFS
let links: PBLink[] = []

Expand All @@ -63,6 +63,7 @@ export async function chmod (cid: CID, mode: number, blockstore: Blocks, options
}
}
},
// @ts-expect-error cannot combine progress types
(source) => importer(source, blockstore, {
...opts,
dagBuilder: async function * (source, block) {
Expand Down
50 changes: 29 additions & 21 deletions packages/unixfs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
*/

import type { CID, Version } from 'multiformats/cid'
import type { Blocks } from '@helia/interface/blocks'
import type { Blocks, GetBlockProgressEvents, PutBlockProgressEvents } from '@helia/interface/blocks'
import type { AbortOptions } from '@libp2p/interfaces'
import { addAll, addBytes, addByteStream, addDirectory, addFile } from './commands/add.js'
import { cat } from './commands/cat.js'
Expand All @@ -45,16 +45,24 @@ import { touch } from './commands/touch.js'
import { chmod } from './commands/chmod.js'
import type { UnixFSEntry } from 'ipfs-unixfs-exporter'
import { ls } from './commands/ls.js'
import type { ByteStream, DirectoryCandidate, FileCandidate, ImportCandidateStream, ImporterOptions, ImportResult } from 'ipfs-unixfs-importer'
import type { ByteStream, DirectoryCandidate, FileCandidate, ImportCandidateStream, ImporterOptions, ImportProgressEvents, ImportResult } from 'ipfs-unixfs-importer'
import type { ProgressOptions } from 'progress-events'

export interface UnixFSComponents {
blockstore: Blocks
}

export type AddEvents = PutBlockProgressEvents
| ImportProgressEvents

export interface AddOptions extends AbortOptions, Omit<ImporterOptions, 'onProgress'>, ProgressOptions<AddEvents> {

}

/**
* Options to pass to the cat command
*/
export interface CatOptions extends AbortOptions {
export interface CatOptions extends AbortOptions, ProgressOptions<GetBlockProgressEvents> {
/**
* Start reading the file at this offset
*/
Expand All @@ -74,7 +82,7 @@ export interface CatOptions extends AbortOptions {
/**
* Options to pass to the chmod command
*/
export interface ChmodOptions extends AbortOptions {
export interface ChmodOptions extends AbortOptions, ProgressOptions<GetBlockProgressEvents | PutBlockProgressEvents> {
/**
* If the target of the operation is a directory and this is true,
* apply the new mode to all directory contents
Expand All @@ -96,7 +104,7 @@ export interface ChmodOptions extends AbortOptions {
/**
* Options to pass to the cp command
*/
export interface CpOptions extends AbortOptions {
export interface CpOptions extends AbortOptions, ProgressOptions<GetBlockProgressEvents | PutBlockProgressEvents> {
/**
* If true, allow overwriting existing directory entries (default: false)
*/
Expand All @@ -112,7 +120,7 @@ export interface CpOptions extends AbortOptions {
/**
* Options to pass to the ls command
*/
export interface LsOptions extends AbortOptions {
export interface LsOptions extends AbortOptions, ProgressOptions<GetBlockProgressEvents> {
/**
* Optional path to list subdirectory contents if the target CID resolves to
* a directory
Expand All @@ -133,7 +141,7 @@ export interface LsOptions extends AbortOptions {
/**
* Options to pass to the mkdir command
*/
export interface MkdirOptions extends AbortOptions {
export interface MkdirOptions extends AbortOptions, ProgressOptions<GetBlockProgressEvents | PutBlockProgressEvents> {
/**
* The CID version to create the new directory with - defaults to the same
* version as the containing directory
Expand Down Expand Up @@ -165,7 +173,7 @@ export interface MkdirOptions extends AbortOptions {
/**
* Options to pass to the rm command
*/
export interface RmOptions extends AbortOptions {
export interface RmOptions extends AbortOptions, ProgressOptions<GetBlockProgressEvents | PutBlockProgressEvents> {
/**
* DAGs with a root block larger than this value will be sharded. Blocks
* smaller than this value will be regular UnixFS directories.
Expand All @@ -176,7 +184,7 @@ export interface RmOptions extends AbortOptions {
/**
* Options to pass to the stat command
*/
export interface StatOptions extends AbortOptions {
export interface StatOptions extends AbortOptions, ProgressOptions<GetBlockProgressEvents> {
/**
* An optional path to allow statting paths inside directories
*/
Expand Down Expand Up @@ -292,7 +300,7 @@ export interface UnixFS {
* }
* ```
*/
addAll: (source: ImportCandidateStream, options?: Partial<ImporterOptions>) => AsyncIterable<ImportResult>
addAll: (source: ImportCandidateStream, options?: Partial<AddOptions>) => AsyncIterable<ImportResult>

/**
* Add a single `Uint8Array` to your Helia node as a file.
Expand All @@ -305,7 +313,7 @@ export interface UnixFS {
* console.info(cid)
* ```
*/
addBytes: (bytes: Uint8Array, options?: Partial<ImporterOptions>) => Promise<CID>
addBytes: (bytes: Uint8Array, options?: Partial<AddOptions>) => Promise<CID>

/**
* Add a stream of `Uint8Array` to your Helia node as a file.
Expand All @@ -321,7 +329,7 @@ export interface UnixFS {
* console.info(cid)
* ```
*/
addByteStream: (bytes: ByteStream, options?: Partial<ImporterOptions>) => Promise<CID>
addByteStream: (bytes: ByteStream, options?: Partial<AddOptions>) => Promise<CID>

/**
* Add a file to your Helia node with optional metadata.
Expand All @@ -342,7 +350,7 @@ export interface UnixFS {
* console.info(cid)
* ```
*/
addFile: (file: FileCandidate, options?: Partial<ImporterOptions>) => Promise<CID>
addFile: (file: FileCandidate, options?: Partial<AddOptions>) => Promise<CID>

/**
* Add a directory to your Helia node.
Expand All @@ -355,7 +363,7 @@ export interface UnixFS {
* console.info(cid)
* ```
*/
addDirectory: (dir?: Partial<DirectoryCandidate>, options?: Partial<ImporterOptions>) => Promise<CID>
addDirectory: (dir?: Partial<DirectoryCandidate>, options?: Partial<AddOptions>) => Promise<CID>

/**
* Retrieve the contents of a file from your Helia node.
Expand All @@ -368,7 +376,7 @@ export interface UnixFS {
* }
* ```
*/
cat: (cid: CID, options?: Partial<CatOptions>) => AsyncIterable<Uint8Array>
cat: (cid: CID, options?: Partial<CatOptions> & ProgressOptions<GetBlockProgressEvents>) => AsyncIterable<Uint8Array>

/**
* Change the permissions on a file or directory in a DAG
Expand Down Expand Up @@ -415,7 +423,7 @@ export interface UnixFS {
* }
* ```
*/
ls: (cid: CID, options?: Partial<LsOptions>) => AsyncIterable<UnixFSEntry>
ls: (cid: CID, options?: Partial<LsOptions> & ProgressOptions<GetBlockProgressEvents>) => AsyncIterable<UnixFSEntry>

/**
* Make a new directory under an existing directory.
Expand Down Expand Up @@ -489,23 +497,23 @@ class DefaultUnixFS implements UnixFS {
this.components = components
}

async * addAll (source: ImportCandidateStream, options: Partial<ImporterOptions> = {}): AsyncIterable<ImportResult> {
async * addAll (source: ImportCandidateStream, options: Partial<AddOptions> = {}): AsyncIterable<ImportResult> {
yield * addAll(source, this.components.blockstore, options)
}

async addBytes (bytes: Uint8Array, options: Partial<ImporterOptions> = {}): Promise<CID> {
async addBytes (bytes: Uint8Array, options: Partial<AddOptions> = {}): Promise<CID> {
return await addBytes(bytes, this.components.blockstore, options)
}

async addByteStream (bytes: ByteStream, options: Partial<ImporterOptions> = {}): Promise<CID> {
async addByteStream (bytes: ByteStream, options: Partial<AddOptions> = {}): Promise<CID> {
return await addByteStream(bytes, this.components.blockstore, options)
}

async addFile (file: FileCandidate, options: Partial<ImporterOptions> = {}): Promise<CID> {
async addFile (file: FileCandidate, options: Partial<AddOptions> = {}): Promise<CID> {
return await addFile(file, this.components.blockstore, options)
}

async addDirectory (dir: Partial<DirectoryCandidate> = {}, options: Partial<ImporterOptions> = {}): Promise<CID> {
async addDirectory (dir: Partial<DirectoryCandidate> = {}, options: Partial<AddOptions> = {}): Promise<CID> {
return await addDirectory(dir, this.components.blockstore, options)
}

Expand Down