Skip to content

Commit

Permalink
fix: pass options to blockstore.get during pin.add (#148)
Browse files Browse the repository at this point in the history
To allow cancelling pin operations and getting progress events, pass
the options to blockstore.get
  • Loading branch information
achingbrain authored Jun 7, 2023
1 parent 3323a5c commit 3a5234e
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 6 deletions.
6 changes: 6 additions & 0 deletions packages/helia/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import type { PubSub } from '@libp2p/interface-pubsub'
import type { DualKadDHT } from '@libp2p/kad-dht'
import type { Blockstore } from 'interface-blockstore'
import type { Datastore } from 'interface-datastore'
import type { AbortOptions } from 'interface-store'
import type { Libp2pOptions } from 'libp2p'
import type { CID } from 'multiformats/cid'
import type { MultihashHasher } from 'multiformats/hashes/interface'
Expand All @@ -47,6 +48,11 @@ export interface DAGWalker {
walk: (block: Uint8Array) => AsyncGenerator<CID, void, undefined>
}

export interface Thing {
fetch: (cid: CID, options?: AbortOptions) => Promise<Uint8Array>
notify?: (cid: CID, block: Uint8Array, options?: AbortOptions) => void
}

/**
* Options used to create a Helia node.
*/
Expand Down
11 changes: 7 additions & 4 deletions packages/helia/src/pins.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import { base36 } from 'multiformats/bases/base36'
import { CID, type Version } from 'multiformats/cid'
import defer from 'p-defer'
import PQueue from 'p-queue'
import { CustomProgressEvent, type ProgressOptions } from 'progress-events'
import { equals as uint8ArrayEquals } from 'uint8arrays/equals'
import { cborWalker, dagPbWalker, jsonWalker, rawWalker } from './utils/dag-walkers.js'
import type { DAGWalker } from './index.js'
import type { AddOptions, IsPinnedOptions, LsOptions, Pin, Pins, RmOptions } from '@helia/interface/pins'
import type { AddOptions, AddPinEvents, IsPinnedOptions, LsOptions, Pin, Pins, RmOptions } from '@helia/interface/pins'
import type { GetBlockProgressEvents } from '@helia/interface/src/blocks.js'
import type { AbortOptions } from '@libp2p/interfaces'
import type { Blockstore } from 'interface-blockstore'

Expand Down Expand Up @@ -41,7 +43,7 @@ const DATASTORE_ENCODING = base36
// const DAG_WALK_MAX_QUEUE_LENGTH = 10
const DAG_WALK_QUEUE_CONCURRENCY = 1

interface WalkDagOptions extends AbortOptions {
interface WalkDagOptions extends AbortOptions, ProgressOptions<GetBlockProgressEvents | AddPinEvents> {
depth: number
}

Expand Down Expand Up @@ -142,7 +144,7 @@ export class PinsImpl implements Pins {
throw new Error(`No dag walker found for cid codec ${cid.code}`)
}

const block = await this.blockstore.get(cid)
const block = await this.blockstore.get(cid, options)

await this.#updatePinnedBlock(cid, withPinnedBlock, options)

Expand All @@ -160,7 +162,7 @@ export class PinsImpl implements Pins {
/**
* Update the pin count for the CID
*/
async #updatePinnedBlock (cid: CID, withPinnedBlock: (pinnedBlock: DatastorePinnedBlock) => void, options: AbortOptions): Promise<void> {
async #updatePinnedBlock (cid: CID, withPinnedBlock: (pinnedBlock: DatastorePinnedBlock) => void, options: AddOptions): Promise<void> {
const blockKey = new Key(`${DATASTORE_BLOCK_PREFIX}${DATASTORE_ENCODING.encode(cid.multihash.bytes)}`)

let pinnedBlock: DatastorePinnedBlock = {
Expand All @@ -186,6 +188,7 @@ export class PinsImpl implements Pins {
}

await this.datastore.put(blockKey, cborg.encode(pinnedBlock), options)
options.onProgress?.(new CustomProgressEvent<CID>('helia:pin:add', { detail: cid }))
}

async rm (cid: CID<unknown, number, number, Version>, options: RmOptions = {}): Promise<Pin> {
Expand Down
18 changes: 18 additions & 0 deletions packages/helia/test/pins.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import * as raw from 'multiformats/codecs/raw'
import { createAndPutBlock } from './fixtures/create-block.js'
import { createHelia } from './fixtures/create-helia.js'
import type { Helia } from '@helia/interface'
import type { ProgressEvent } from 'progress-events'

describe('pins', () => {
let helia: Helia
Expand Down Expand Up @@ -37,6 +38,23 @@ describe('pins', () => {
await expect(helia.pins.isPinned(cidV0)).to.eventually.be.true('did not pin v0 CID')
})

it('pins a block with progress events', async () => {
const cidV1 = await createAndPutBlock(raw.code, Uint8Array.from([0, 1, 2, 3]), helia.blockstore)

const events: ProgressEvent[] = []

await helia.pins.add(cidV1, {
onProgress: (evt) => {
events.push(evt)
}
})

expect(events.map(e => e.type)).to.include.members([
'blocks:get:blockstore:get',
'helia:pin:add'
])
})

it('unpins a block', async () => {
const cidV1 = await createAndPutBlock(raw.code, Uint8Array.from([0, 1, 2, 3]), helia.blockstore)
const cidV0 = CID.createV0(cidV1.multihash)
Expand Down
5 changes: 3 additions & 2 deletions packages/interface/src/pins.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { GetBlockProgressEvents } from './blocks'
import type { AbortOptions } from '@libp2p/interfaces'
import type { CID } from 'multiformats/cid'
import type { ProgressEvent, ProgressOptions } from 'progress-events'
Expand All @@ -11,9 +12,9 @@ export interface Pin {
}

export type AddPinEvents =
ProgressEvent<'helia:pin:add', unknown>
ProgressEvent<'helia:pin:add', CID>

export interface AddOptions extends AbortOptions, ProgressOptions<AddPinEvents> {
export interface AddOptions extends AbortOptions, ProgressOptions<AddPinEvents | GetBlockProgressEvents> {
/**
* How deeply to pin the DAG, defaults to Infinity
*/
Expand Down

0 comments on commit 3a5234e

Please sign in to comment.