From 7460719be44b4ff9bad629654efa29c56242e03a Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Wed, 15 Mar 2023 07:27:37 +0100 Subject: [PATCH] feat: add bitswap progress events (#50) Upgrades all deps in order to update progress handler types to relay progress messages from bitswap and the blockstore when fetching blocks. Also adds tests for block storage. Fixes #27 --- benchmarks/gc/package.json | 8 +- packages/helia/package.json | 20 +- packages/helia/src/helia.ts | 15 +- packages/helia/src/index.ts | 19 +- packages/helia/src/storage.ts | 151 +++++++-------- packages/helia/test/fixtures/create-block.ts | 14 +- packages/helia/test/fixtures/create-dag.ts | 16 +- packages/helia/test/gc.spec.ts | 32 ++-- packages/helia/test/pins.spec.ts | 18 +- packages/helia/test/storage.spec.ts | 178 ++++++++++++++++++ packages/interface/package.json | 9 +- packages/interface/src/blocks.ts | 144 ++++++++++++++ packages/interface/src/index.ts | 6 +- packages/interop/package.json | 6 +- .../test/fixtures/create-helia.browser.ts | 3 - .../interop/test/fixtures/create-helia.ts | 3 - 16 files changed, 488 insertions(+), 154 deletions(-) create mode 100644 packages/helia/test/storage.spec.ts create mode 100644 packages/interface/src/blocks.ts diff --git a/benchmarks/gc/package.json b/benchmarks/gc/package.json index 7d17a1a39..bbee27912 100644 --- a/benchmarks/gc/package.json +++ b/benchmarks/gc/package.json @@ -18,9 +18,9 @@ "@libp2p/websockets": "^5.0.3", "aegir": "^38.1.5", "blockstore-datastore-adapter": "^5.0.0", - "datastore-core": "^8.0.4", - "datastore-fs": "^8.0.0", - "datastore-level": "^9.0.4", + "datastore-core": "^9.0.0", + "datastore-fs": "^9.0.0", + "datastore-level": "^10.0.1", "execa": "^7.0.0", "go-ipfs": "^0.18.1", "helia": "~0.0.0", @@ -29,7 +29,7 @@ "it-all": "^2.0.0", "it-drain": "^2.0.0", "kubo-rpc-client": "^3.0.1", - "libp2p": "^0.42.2", + "libp2p": "next", "multiformats": "^11.0.1", "tinybench": "^2.4.0" } diff --git a/packages/helia/package.json b/packages/helia/package.json index 0b12be83d..2818d0a09 100644 --- a/packages/helia/package.json +++ b/packages/helia/package.json @@ -143,18 +143,17 @@ "@libp2p/interface-libp2p": "^1.1.0", "@libp2p/interfaces": "^3.3.1", "@libp2p/logger": "^2.0.5", - "blockstore-core": "^3.0.0", + "blockstore-core": "^4.0.0", "cborg": "^1.10.0", - "datastore-core": "^8.0.4", - "interface-blockstore": "^4.0.1", - "interface-datastore": "^7.0.3", - "interface-store": "^3.0.4", - "ipfs-bitswap": "^16.0.0", + "datastore-core": "^9.0.0", + "interface-blockstore": "^5.0.0", + "interface-datastore": "^8.0.0", + "interface-store": "^4.0.0", + "ipfs-bitswap": "^17.0.0", "it-all": "^2.0.0", "it-drain": "^2.0.0", "it-filter": "^2.0.0", - "it-merge": "^2.0.0", - "it-pushable": "^3.1.2", + "it-foreach": "^1.0.1", "mortice": "^3.0.1", "multiformats": "^11.0.1", "p-defer": "^4.0.0", @@ -169,7 +168,10 @@ "@ipld/dag-json": "^10.0.1", "@libp2p/websockets": "^5.0.3", "aegir": "^38.1.0", - "libp2p": "^0.42.2" + "delay": "^5.0.0", + "libp2p": "next", + "sinon": "^15.0.2", + "sinon-ts": "^1.0.0" }, "typedoc": { "entryPoint": "./src/index.ts" diff --git a/packages/helia/src/helia.ts b/packages/helia/src/helia.ts index 22a2e28b6..deb41ccaa 100644 --- a/packages/helia/src/helia.ts +++ b/packages/helia/src/helia.ts @@ -59,8 +59,6 @@ export class HeliaImpl implements Helia { } }) - this.pins = new PinsImpl(datastore, blockstore, init.dagWalkers ?? []) - if (init.libp2p != null) { this.#bitswap = createBitswap(libp2p, blockstore, { hashLoader: { @@ -79,21 +77,26 @@ export class HeliaImpl implements Helia { }) } + this.pins = new PinsImpl(datastore, blockstore, init.dagWalkers ?? []) + this.libp2p = libp2p - this.blockstore = new BlockStorage(blockstore, this.pins, this.#bitswap) + this.blockstore = new BlockStorage(blockstore, this.pins, { + bitswap: this.#bitswap, + holdGcLock: init.holdGcLock + }) this.datastore = datastore } async start (): Promise { await assertDatastoreVersionIsCurrent(this.datastore) - this.#bitswap?.start() + await this.#bitswap?.start() await this.libp2p.start() } async stop (): Promise { - this.#bitswap?.stop() await this.libp2p.stop() + await this.#bitswap?.stop() } async gc (options: GCOptions = {}): Promise { @@ -106,7 +109,7 @@ export class HeliaImpl implements Helia { log('gc start') await drain(blockstore.deleteMany((async function * () { - for await (const cid of blockstore.queryKeys({})) { + for await (const { cid } of blockstore.getAll()) { try { if (await helia.pins.isPinned(cid, options)) { continue diff --git a/packages/helia/src/index.ts b/packages/helia/src/index.ts index 202630dbf..9b49d0e2e 100644 --- a/packages/helia/src/index.ts +++ b/packages/helia/src/index.ts @@ -75,9 +75,26 @@ export interface HeliaInit { dagWalkers?: DAGWalker[] /** - * Pass `false` to not start the helia node + * Pass `false` to not start the Helia node */ start?: boolean + + /** + * Garbage collection requires preventing blockstore writes during searches + * for unpinned blocks as DAGs are typically pinned after they've been + * imported - without locking this could lead to the deletion of blocks while + * they are being added to the blockstore. + * + * By default this lock is held on the main process (e.g. node cluster's + * primary process, the renderer thread in browsers) and other processes will + * contact the main process for access (worker processes in node cluster, + * webworkers in the browser). + * + * If Helia is being run wholly in a non-primary process, with no other process + * expected to access the blockstore (e.g. being run in the background in a + * webworker), pass true here to hold the gc lock in this process. + */ + holdGcLock?: boolean } /** diff --git a/packages/helia/src/storage.ts b/packages/helia/src/storage.ts index ef56fc805..527f9849c 100644 --- a/packages/helia/src/storage.ts +++ b/packages/helia/src/storage.ts @@ -1,8 +1,6 @@ -import { BaseBlockstore } from 'blockstore-core' -import merge from 'it-merge' -import { pushable } from 'it-pushable' import filter from 'it-filter' -import type { Blockstore, KeyQuery, Query } from 'interface-blockstore' +import type { Blockstore } from 'interface-blockstore' +import type { Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents } from '@helia/interface/blocks' import type { Bitswap } from 'ipfs-bitswap' import type { CID } from 'multiformats/cid' import type { AbortOptions } from '@libp2p/interfaces' @@ -10,8 +8,15 @@ import type { AwaitIterable } from 'interface-store' import type { Mortice } from 'mortice' import createMortice from 'mortice' import type { Pins } from '@helia/interface/pins' +import forEach from 'it-foreach' +import { CustomProgressEvent, ProgressOptions } from 'progress-events' -export interface BlockStorageOptions extends AbortOptions { +export interface BlockStorageInit { + holdGcLock?: boolean + bitswap?: Bitswap +} + +export interface GetOptions extends AbortOptions { progress?: (evt: Event) => void } @@ -20,7 +25,7 @@ export interface BlockStorageOptions extends AbortOptions { * blockstore (that may be on disk, s3, or something else). If the blocks are * not present Bitswap will be used to fetch them from network peers. */ -export class BlockStorage extends BaseBlockstore implements Blockstore { +export class BlockStorage implements Blocks { public lock: Mortice private readonly child: Blockstore private readonly bitswap?: Bitswap @@ -29,21 +34,13 @@ export class BlockStorage extends BaseBlockstore implements Blockstore { /** * Create a new BlockStorage */ - constructor (blockstore: Blockstore, pins: Pins, bitswap?: Bitswap) { - super() - + constructor (blockstore: Blockstore, pins: Pins, options: BlockStorageInit = {}) { this.child = blockstore - this.bitswap = bitswap + this.bitswap = options.bitswap this.pins = pins - this.lock = createMortice() - } - - async open (): Promise { - await this.child.open() - } - - async close (): Promise { - await this.child.close() + this.lock = createMortice({ + singleProcess: options.holdGcLock + }) } unwrap (): Blockstore { @@ -53,15 +50,22 @@ export class BlockStorage extends BaseBlockstore implements Blockstore { /** * Put a block to the underlying datastore */ - async put (cid: CID, block: Uint8Array, options: AbortOptions = {}): Promise { + async put (cid: CID, block: Uint8Array, options: AbortOptions & ProgressOptions = {}): Promise { const releaseLock = await this.lock.readLock() try { + if (await this.child.has(cid)) { + options.onProgress?.(new CustomProgressEvent('blocks:put:duplicate', cid)) + return + } + if (this.bitswap?.isStarted() === true) { - await this.bitswap.put(cid, block, options) - } else { - await this.child.put(cid, block, options) + options.onProgress?.(new CustomProgressEvent('blocks:put:bitswap:notify', cid)) + this.bitswap.notify(cid, block, options) } + + options.onProgress?.(new CustomProgressEvent('blocks:put:blockstore:put', cid)) + await this.child.put(cid, block, options) } finally { releaseLock() } @@ -70,17 +74,27 @@ export class BlockStorage extends BaseBlockstore implements Blockstore { /** * Put a multiple blocks to the underlying datastore */ - async * putMany (blocks: AwaitIterable<{ key: CID, value: Uint8Array }>, options: AbortOptions = {}): AsyncGenerator<{ key: CID, value: Uint8Array }, void, undefined> { + async * putMany (blocks: AwaitIterable<{ cid: CID, block: Uint8Array }>, options: AbortOptions & ProgressOptions = {}): AsyncIterable { const releaseLock = await this.lock.readLock() try { - const missingBlocks = filter(blocks, async ({ key }) => { - return !(await this.child.has(key)) + const missingBlocks = filter(blocks, async ({ cid }) => { + const has = await this.child.has(cid) + + if (has) { + options.onProgress?.(new CustomProgressEvent('blocks:put-many:duplicate', cid)) + } + + return !has }) - const store = this.bitswap?.isStarted() === true ? this.bitswap : this.child + const notifyEach = forEach(missingBlocks, ({ cid, block }) => { + options.onProgress?.(new CustomProgressEvent('blocks:put-many:bitswap:notify', cid)) + this.bitswap?.notify(cid, block, options) + }) - yield * store.putMany(missingBlocks, options) + options.onProgress?.(new CustomProgressEvent('blocks:put-many:blockstore:put-many')) + yield * this.child.putMany(notifyEach, options) } finally { releaseLock() } @@ -89,54 +103,43 @@ export class BlockStorage extends BaseBlockstore implements Blockstore { /** * Get a block by cid */ - async get (cid: CID, options: BlockStorageOptions = {}): Promise { + async get (cid: CID, options: AbortOptions & ProgressOptions = {}): Promise { const releaseLock = await this.lock.readLock() try { - if (!(await this.has(cid)) && this.bitswap?.isStarted() === true) { - return await this.bitswap?.get(cid, options) - } else { - return await this.child.get(cid, options) + if (this.bitswap?.isStarted() != null && !(await this.child.has(cid))) { + options.onProgress?.(new CustomProgressEvent('blocks:get:bitswap:get', cid)) + const block = await this.bitswap.want(cid, options) + + options.onProgress?.(new CustomProgressEvent('blocks:get:blockstore:put', cid)) + await this.child.put(cid, block, options) + + return block } + + options.onProgress?.(new CustomProgressEvent('blocks:get:blockstore:get', cid)) + return await this.child.get(cid, options) } finally { releaseLock() } } /** - * Get multiple blocks back from an array of cids + * Get multiple blocks back from an (async) iterable of cids */ - async * getMany (cids: AwaitIterable, options: BlockStorageOptions = {}): AsyncGenerator { + async * getMany (cids: AwaitIterable, options: AbortOptions & ProgressOptions = {}): AsyncIterable { const releaseLock = await this.lock.readLock() try { - const getFromBitswap = pushable({ objectMode: true }) - const getFromChild = pushable({ objectMode: true }) - - void Promise.resolve().then(async () => { - for await (const cid of cids) { - if (!(await this.has(cid)) && this.bitswap?.isStarted() === true) { - getFromBitswap.push(cid) - } else { - getFromChild.push(cid) - } + options.onProgress?.(new CustomProgressEvent('blocks:get-many:blockstore:get-many')) + yield * this.child.getMany(forEach(cids, async (cid) => { + if (this.bitswap?.isStarted() === true && !(await this.child.has(cid))) { + options.onProgress?.(new CustomProgressEvent('blocks:get-many:bitswap:get', cid)) + const block = await this.bitswap.want(cid, options) + options.onProgress?.(new CustomProgressEvent('blocks:get-many:blockstore:put', cid)) + await this.child.put(cid, block, options) } - - getFromBitswap.end() - getFromChild.end() - }).catch(err => { - getFromBitswap.throw(err) - }) - - const streams = [ - this.child.getMany(getFromChild, options) - ] - - if (this.bitswap?.isStarted() === true) { - streams.push(this.bitswap.getMany(getFromBitswap, options)) - } - - yield * merge(...streams) + })) } finally { releaseLock() } @@ -145,7 +148,7 @@ export class BlockStorage extends BaseBlockstore implements Blockstore { /** * Delete a block from the blockstore */ - async delete (cid: CID, options: AbortOptions = {}): Promise { + async delete (cid: CID, options: AbortOptions & ProgressOptions = {}): Promise { const releaseLock = await this.lock.writeLock() try { @@ -153,6 +156,7 @@ export class BlockStorage extends BaseBlockstore implements Blockstore { throw new Error('CID was pinned') } + options.onProgress?.(new CustomProgressEvent('blocks:delete:blockstore:delete', cid)) await this.child.delete(cid, options) } finally { releaseLock() @@ -162,12 +166,13 @@ export class BlockStorage extends BaseBlockstore implements Blockstore { /** * Delete multiple blocks from the blockstore */ - async * deleteMany (cids: AwaitIterable, options: AbortOptions = {}): AsyncGenerator { + async * deleteMany (cids: AwaitIterable, options: AbortOptions & ProgressOptions = {}): AsyncIterable { const releaseLock = await this.lock.writeLock() try { const storage = this + options.onProgress?.(new CustomProgressEvent('blocks:delete-many:blockstore:delete-many')) yield * this.child.deleteMany((async function * () { for await (const cid of cids) { if (await storage.pins.isPinned(cid)) { @@ -191,24 +196,4 @@ export class BlockStorage extends BaseBlockstore implements Blockstore { releaseLock() } } - - async * query (q: Query, options: AbortOptions = {}): AsyncGenerator<{ key: CID, value: Uint8Array }, void, undefined> { - const releaseLock = await this.lock.readLock() - - try { - yield * this.child.query(q, options) - } finally { - releaseLock() - } - } - - async * queryKeys (q: KeyQuery, options: AbortOptions = {}): AsyncGenerator { - const releaseLock = await this.lock.readLock() - - try { - yield * this.child.queryKeys(q, options) - } finally { - releaseLock() - } - } } diff --git a/packages/helia/test/fixtures/create-block.ts b/packages/helia/test/fixtures/create-block.ts index 1d13b6295..ec2e20348 100644 --- a/packages/helia/test/fixtures/create-block.ts +++ b/packages/helia/test/fixtures/create-block.ts @@ -1,12 +1,18 @@ -import type { Blockstore } from 'blockstore-core/dist/src/base' +import type { Blocks } from '@helia/interface/blocks' import { CID } from 'multiformats/cid' import { sha256 } from 'multiformats/hashes/sha2' -export async function createBlock (codec: Codec, block: Uint8Array, blockstore: Blockstore): Promise> { +export async function createBlock (codec: Codec, block: Uint8Array): Promise<{ cid: CID, block: Uint8Array }> { const mh = await sha256.digest(block) const cid = CID.createV1(codec, mh) - await blockstore.put(cid, block) + return { cid, block } +} + +export async function createAndPutBlock (codec: Codec, block: Uint8Array, blockstore: Blocks): Promise> { + const result = await createBlock(codec, block) + + await blockstore.put(result.cid, block) - return cid + return result.cid } diff --git a/packages/helia/test/fixtures/create-dag.ts b/packages/helia/test/fixtures/create-dag.ts index ae41d98bb..132002a52 100644 --- a/packages/helia/test/fixtures/create-dag.ts +++ b/packages/helia/test/fixtures/create-dag.ts @@ -1,7 +1,7 @@ -import type { Blockstore } from 'interface-blockstore' +import type { Blocks } from '@helia/interface/blocks' import type { CID } from 'multiformats/cid' import { fromString as uint8arrayFromString } from 'uint8arrays/from-string' -import { createBlock } from './create-block.js' +import { createAndPutBlock } from './create-block.js' export interface DAGNode { cid: CID @@ -58,16 +58,16 @@ export interface DAGNode { * } * ``` */ -export async function createDag (codec: number, blockstore: Blockstore, depth: number, children: number): Promise> { +export async function createDag (codec: number, blocks: Blocks, depth: number, children: number): Promise> { const dag: Record = {} - const root = await createBlock(codec, uint8arrayFromString('level-0'), blockstore) + const root = await createAndPutBlock(codec, uint8arrayFromString('level-0'), blocks) - await addChildren(root, 'level', 0, 0, depth, children, dag, codec, blockstore) + await addChildren(root, 'level', 0, 0, depth, children, dag, codec, blocks) return dag } -async function addChildren (cid: CID, name: string, level: number, index: number, depth: number, children: number, dag: Record, codec: number, blockstore: Blockstore): Promise { +async function addChildren (cid: CID, name: string, level: number, index: number, depth: number, children: number, dag: Record, codec: number, blocks: Blocks): Promise { if (depth === 0) { return } @@ -81,10 +81,10 @@ async function addChildren (cid: CID, name: string, level: number, index: number } for (let i = 0; i < children; i++) { - const subChild = await createBlock(codec, uint8arrayFromString(`${name}-${i}`), blockstore) + const subChild = await createAndPutBlock(codec, uint8arrayFromString(`${name}-${i}`), blocks) dag[name].links.push(subChild) - await addChildren(subChild, name, level + 1, index + i, depth - 1, children, dag, codec, blockstore) + await addChildren(subChild, name, level + 1, index + i, depth - 1, children, dag, codec, blocks) } } diff --git a/packages/helia/test/gc.spec.ts b/packages/helia/test/gc.spec.ts index d9122218f..9b7d2a1be 100644 --- a/packages/helia/test/gc.spec.ts +++ b/packages/helia/test/gc.spec.ts @@ -9,7 +9,7 @@ import { yamux } from '@chainsafe/libp2p-yamux' import { createHelia } from '../src/index.js' import type { GcEvents, Helia } from '@helia/interface' import * as raw from 'multiformats/codecs/raw' -import { createBlock } from './fixtures/create-block.js' +import { createAndPutBlock } from './fixtures/create-block.js' import * as dagPb from '@ipld/dag-pb' import * as dagCbor from '@ipld/dag-cbor' import * as dagJson from '@ipld/dag-json' @@ -42,16 +42,16 @@ describe('gc', () => { }) it('pins a dag-pb node and does not garbage collect it or it\'s children', async () => { - const child1 = await createBlock(dagPb.code, dagPb.encode({ + const child1 = await createAndPutBlock(dagPb.code, dagPb.encode({ Data: Uint8Array.from([0, 1, 2, 3]), Links: [] }), helia.blockstore) - const child2 = await createBlock(dagPb.code, dagPb.encode({ + const child2 = await createAndPutBlock(dagPb.code, dagPb.encode({ Data: Uint8Array.from([4, 5, 6, 7]), Links: [] }), helia.blockstore) - const node = await createBlock(dagPb.code, dagPb.encode({ + const node = await createAndPutBlock(dagPb.code, dagPb.encode({ Links: [{ Hash: child1, Name: 'child1' @@ -64,7 +64,7 @@ describe('gc', () => { await helia.pins.add(node) // this block will be garbage collected - const doomed = await createBlock(dagPb.code, dagPb.encode({ + const doomed = await createAndPutBlock(dagPb.code, dagPb.encode({ Data: Uint8Array.from([8, 9, 0, 1]), Links: [] }), helia.blockstore) @@ -83,14 +83,14 @@ describe('gc', () => { }) it('pins a dag-cbor node and does not garbage collect it or it\'s children', async () => { - const child1 = await createBlock(dagCbor.code, dagCbor.encode({ + const child1 = await createAndPutBlock(dagCbor.code, dagCbor.encode({ foo: 'bar' }), helia.blockstore) - const child2 = await createBlock(dagCbor.code, dagCbor.encode({ + const child2 = await createAndPutBlock(dagCbor.code, dagCbor.encode({ baz: 'qux' }), helia.blockstore) - const node = await createBlock(dagCbor.code, dagCbor.encode({ + const node = await createAndPutBlock(dagCbor.code, dagCbor.encode({ children: [ child1, child2 @@ -100,7 +100,7 @@ describe('gc', () => { await helia.pins.add(node) // this block will be garbage collected - const doomed = await createBlock(dagCbor.code, dagJson.encode({ + const doomed = await createAndPutBlock(dagCbor.code, dagJson.encode({ quux: 'garply' }), helia.blockstore) @@ -118,14 +118,14 @@ describe('gc', () => { }) it('pins a dag-json node and does not garbage collect it or it\'s children', async () => { - const child1 = await createBlock(dagJson.code, dagJson.encode({ + const child1 = await createAndPutBlock(dagJson.code, dagJson.encode({ foo: 'bar' }), helia.blockstore) - const child2 = await createBlock(dagJson.code, dagJson.encode({ + const child2 = await createAndPutBlock(dagJson.code, dagJson.encode({ baz: 'qux' }), helia.blockstore) - const node = await createBlock(dagJson.code, dagJson.encode({ + const node = await createAndPutBlock(dagJson.code, dagJson.encode({ children: [ child1, child2 @@ -135,7 +135,7 @@ describe('gc', () => { await helia.pins.add(node) // this block will be garbage collected - const doomed = await createBlock(dagJson.code, dagJson.encode({ + const doomed = await createAndPutBlock(dagJson.code, dagJson.encode({ quux: 'garply' }), helia.blockstore) @@ -153,12 +153,12 @@ describe('gc', () => { }) it('pins a raw node and does not garbage collect it', async () => { - const cid = await createBlock(raw.code, Uint8Array.from([0, 1, 2, 3]), helia.blockstore) + const cid = await createAndPutBlock(raw.code, Uint8Array.from([0, 1, 2, 3]), helia.blockstore) await helia.pins.add(cid) // this block will be garbage collected - const doomed = await createBlock(raw.code, Uint8Array.from([4, 5, 6, 7]), helia.blockstore) + const doomed = await createAndPutBlock(raw.code, Uint8Array.from([4, 5, 6, 7]), helia.blockstore) await expect(helia.blockstore.has(cid)).to.eventually.be.true() await expect(helia.blockstore.has(doomed)).to.eventually.be.true() @@ -170,7 +170,7 @@ describe('gc', () => { }) it('can garbage collect around a CID that causes an error', async () => { - const cid = await createBlock(0x10, Uint8Array.from([0, 1, 2, 3]), helia.blockstore) + const cid = await createAndPutBlock(0x10, Uint8Array.from([0, 1, 2, 3]), helia.blockstore) await expect(helia.blockstore.has(cid)).to.eventually.be.true('did not have cid') diff --git a/packages/helia/test/pins.spec.ts b/packages/helia/test/pins.spec.ts index b7a8afdf6..07d236f07 100644 --- a/packages/helia/test/pins.spec.ts +++ b/packages/helia/test/pins.spec.ts @@ -10,7 +10,7 @@ import { createHelia } from '../src/index.js' import type { Helia } from '@helia/interface' import { CID } from 'multiformats/cid' import * as raw from 'multiformats/codecs/raw' -import { createBlock } from './fixtures/create-block.js' +import { createAndPutBlock } from './fixtures/create-block.js' import all from 'it-all' describe('pins', () => { @@ -41,7 +41,7 @@ describe('pins', () => { }) it('pins a block', async () => { - const cidV1 = await createBlock(raw.code, Uint8Array.from([0, 1, 2, 3]), helia.blockstore) + const cidV1 = await createAndPutBlock(raw.code, Uint8Array.from([0, 1, 2, 3]), helia.blockstore) const cidV0 = CID.createV0(cidV1.multihash) await helia.pins.add(cidV1) @@ -51,7 +51,7 @@ describe('pins', () => { }) it('unpins a block', async () => { - const cidV1 = await createBlock(raw.code, Uint8Array.from([0, 1, 2, 3]), helia.blockstore) + const cidV1 = await createAndPutBlock(raw.code, Uint8Array.from([0, 1, 2, 3]), helia.blockstore) const cidV0 = CID.createV0(cidV1.multihash) await helia.pins.add(cidV1) @@ -66,7 +66,7 @@ describe('pins', () => { }) it('does not delete a pinned block', async () => { - const cid = await createBlock(raw.code, Uint8Array.from([0, 1, 2, 3]), helia.blockstore) + const cid = await createAndPutBlock(raw.code, Uint8Array.from([0, 1, 2, 3]), helia.blockstore) await helia.pins.add(cid) @@ -75,7 +75,7 @@ describe('pins', () => { }) it('lists pins created with default args', async () => { - const cidV1 = await createBlock(raw.code, Uint8Array.from([0, 1, 2, 3]), helia.blockstore) + const cidV1 = await createAndPutBlock(raw.code, Uint8Array.from([0, 1, 2, 3]), helia.blockstore) await helia.pins.add(cidV1) @@ -88,7 +88,7 @@ describe('pins', () => { }) it('lists pins with depth', async () => { - const cidV1 = await createBlock(raw.code, Uint8Array.from([0, 1, 2, 3]), helia.blockstore) + const cidV1 = await createAndPutBlock(raw.code, Uint8Array.from([0, 1, 2, 3]), helia.blockstore) await helia.pins.add(cidV1, { depth: 5 @@ -102,7 +102,7 @@ describe('pins', () => { }) it('lists pins with metadata', async () => { - const cidV1 = await createBlock(raw.code, Uint8Array.from([0, 1, 2, 3]), helia.blockstore) + const cidV1 = await createAndPutBlock(raw.code, Uint8Array.from([0, 1, 2, 3]), helia.blockstore) const metadata = { foo: 'bar', baz: 5, @@ -121,8 +121,8 @@ describe('pins', () => { }) it('lists pins directly', async () => { - const cid1 = await createBlock(raw.code, Uint8Array.from([0, 1, 2, 3]), helia.blockstore) - const cid2 = await createBlock(raw.code, Uint8Array.from([4, 5, 6, 7]), helia.blockstore) + const cid1 = await createAndPutBlock(raw.code, Uint8Array.from([0, 1, 2, 3]), helia.blockstore) + const cid2 = await createAndPutBlock(raw.code, Uint8Array.from([4, 5, 6, 7]), helia.blockstore) await helia.pins.add(cid1) await helia.pins.add(cid2) diff --git a/packages/helia/test/storage.spec.ts b/packages/helia/test/storage.spec.ts new file mode 100644 index 000000000..aabd6891a --- /dev/null +++ b/packages/helia/test/storage.spec.ts @@ -0,0 +1,178 @@ +/* eslint-env mocha */ + +import { expect } from 'aegir/chai' +import { MemoryBlockstore } from 'blockstore-core' +import { MemoryDatastore } from 'datastore-core' +import { BlockStorage } from '../src/storage.js' +import type { Blockstore } from 'interface-blockstore' +import type { Bitswap } from 'ipfs-bitswap' +import { StubbedInstance, stubInterface } from 'sinon-ts' +import type { Pins } from '@helia/interface/pins' +import { PinsImpl } from '../src/pins.js' +import type { CID } from 'multiformats/cid' +import { createBlock } from './fixtures/create-block.js' +import * as raw from 'multiformats/codecs/raw' +import all from 'it-all' +import delay from 'delay' +import drain from 'it-drain' +import Sinon from 'sinon' + +describe('storage', () => { + let storage: BlockStorage + let blockstore: Blockstore + let bitswap: StubbedInstance + let pins: Pins + let blocks: Array<{ cid: CID, block: Uint8Array }> + + beforeEach(async () => { + blocks = [] + + for (let i = 0; i < 10; i++) { + blocks.push(await createBlock(raw.code, Uint8Array.from([0, 1, 2, i]))) + } + + const datastore = new MemoryDatastore() + + blockstore = new MemoryBlockstore() + bitswap = stubInterface() + pins = new PinsImpl(datastore, blockstore, []) + storage = new BlockStorage(blockstore, pins, { + bitswap, + holdGcLock: true + }) + }) + + it('gets a block from the blockstore', async () => { + const { cid, block } = blocks[0] + await blockstore.put(cid, block) + + const retrieved = await storage.get(cid) + expect(retrieved).to.equalBytes(block) + }) + + it('gets a block from the blockstore with progress', async () => { + const { cid, block } = blocks[0] + await blockstore.put(cid, block) + + const onProgress = Sinon.stub() + + await storage.get(cid, { + onProgress + }) + expect(onProgress.called).to.be.true() + }) + + it('gets many blocks from the blockstore', async () => { + const count = 5 + + for (let i = 0; i < count; i++) { + const { cid, block } = blocks[i] + await blockstore.put(cid, block) + } + + const retrieved = await all(storage.getMany(async function * () { + for (let i = 0; i < count; i++) { + yield blocks[i].cid + await delay(10) + } + }())) + + expect(retrieved).to.deep.equal(new Array(count).fill(0).map((_, i) => blocks[i].block)) + }) + + it('puts a block into the blockstore', async () => { + const { cid, block } = blocks[0] + await storage.put(cid, block) + + const retrieved = await blockstore.get(cid) + expect(retrieved).to.equalBytes(block) + }) + + it('puts many blocks into the blockstore', async () => { + const count = 5 + + await drain(storage.putMany(async function * () { + for (let i = 0; i < count; i++) { + yield { cid: blocks[i].cid, block: blocks[i].block } + await delay(10) + } + }())) + + const retrieved = await all(blockstore.getMany(new Array(count).fill(0).map((_, i) => blocks[i].cid))) + expect(retrieved).to.deep.equal(retrieved) + }) + + it('gets a block from bitswap when it is not in the blockstore', async () => { + const { cid, block } = blocks[0] + + bitswap.isStarted.returns(true) + bitswap.want.withArgs(cid).resolves(block) + + await expect(blockstore.has(cid)).to.eventually.be.false() + + const returned = await storage.get(cid) + + await expect(blockstore.has(cid)).to.eventually.be.true() + expect(returned).to.equalBytes(block) + expect(bitswap.want.called).to.be.true() + }) + + it('gets many blocks from bitswap when they are not in the blockstore', async () => { + bitswap.isStarted.returns(true) + + const count = 5 + + for (let i = 0; i < count; i++) { + const { cid, block } = blocks[i] + bitswap.want.withArgs(cid).resolves(block) + + await expect(blockstore.has(cid)).to.eventually.be.false() + } + + const retrieved = await all(storage.getMany(async function * () { + for (let i = 0; i < count; i++) { + yield blocks[i].cid + await delay(10) + } + }())) + + expect(retrieved).to.deep.equal(new Array(count).fill(0).map((_, i) => blocks[i].block)) + + for (let i = 0; i < count; i++) { + const { cid } = blocks[i] + expect(bitswap.want.calledWith(cid)).to.be.true() + await expect(blockstore.has(cid)).to.eventually.be.true() + } + }) + + it('gets some blocks from bitswap when they are not in the blockstore', async () => { + bitswap.isStarted.returns(true) + + const count = 5 + + // blocks 0,1,3,4 are in the blockstore + await blockstore.put(blocks[0].cid, blocks[0].block) + await blockstore.put(blocks[1].cid, blocks[1].block) + await blockstore.put(blocks[3].cid, blocks[3].block) + await blockstore.put(blocks[4].cid, blocks[4].block) + + // block #2 comes from bitswap but slowly + bitswap.want.withArgs(blocks[2].cid).callsFake(async () => { + await delay(100) + return blocks[2].block + }) + + const retrieved = await all(storage.getMany(async function * () { + for (let i = 0; i < count; i++) { + yield blocks[i].cid + await delay(10) + } + }())) + + expect(retrieved).to.deep.equal(new Array(count).fill(0).map((_, i) => blocks[i].block)) + + for (let i = 0; i < count; i++) { + await expect(blockstore.has(blocks[i].cid)).to.eventually.be.true() + } + }) +}) diff --git a/packages/interface/package.json b/packages/interface/package.json index 833160fcf..f2a62f270 100644 --- a/packages/interface/package.json +++ b/packages/interface/package.json @@ -47,6 +47,10 @@ "types": "./dist/src/index.d.ts", "import": "./dist/src/index.js" }, + "./blocks": { + "types": "./dist/src/blocks.d.ts", + "import": "./dist/src/blocks.js" + }, "./pins": { "types": "./dist/src/pins.d.ts", "import": "./dist/src/pins.js" @@ -154,9 +158,8 @@ "@libp2p/interface-libp2p": "^1.1.0", "@libp2p/interface-peer-id": "^2.0.1", "@libp2p/interfaces": "^3.3.1", - "@multiformats/multiaddr": "^11.1.5", - "interface-blockstore": "^4.0.1", - "interface-datastore": "^7.0.3", + "interface-datastore": "^8.0.0", + "ipfs-bitswap": "^17.0.0", "multiformats": "^11.0.1", "progress-events": "^1.0.0" }, diff --git a/packages/interface/src/blocks.ts b/packages/interface/src/blocks.ts new file mode 100644 index 000000000..c2da33312 --- /dev/null +++ b/packages/interface/src/blocks.ts @@ -0,0 +1,144 @@ +import type { AbortOptions } from '@libp2p/interfaces' +import type { ProgressEvent, ProgressOptions } from 'progress-events' +import type { CID } from 'multiformats/cid' +import type { BitswapNotifyProgressEvents, BitswapWantProgressEvents } from 'ipfs-bitswap' +import type { AwaitIterable } from './index.js' + +export interface Pair { + cid: CID + block: Uint8Array +} + +export type PutBlockProgressEvents = + ProgressEvent<'blocks:put:duplicate', CID> | + ProgressEvent<'blocks:put:bitswap:notify', CID> | + ProgressEvent<'blocks:put:blockstore:put', CID> | + BitswapNotifyProgressEvents + +export type PutManyBlocksProgressEvents = + ProgressEvent<'blocks:put-many:duplicate', CID> | + ProgressEvent<'blocks:put-many:bitswap:notify', CID> | + ProgressEvent<'blocks:put-many:blockstore:put-many'> | + BitswapNotifyProgressEvents + +export type GetBlockProgressEvents = + ProgressEvent<'blocks:get:bitswap:want', CID> | + ProgressEvent<'blocks:get:blockstore:get', CID> | + ProgressEvent<'blocks:get:blockstore:put', CID> | + BitswapWantProgressEvents + +export type GetManyBlocksProgressEvents = + ProgressEvent<'blocks:get-many:blockstore:get-many'> | + ProgressEvent<'blocks:get-many:bitswap:want', CID> | + ProgressEvent<'blocks:get-many:blockstore:put', CID> | + BitswapWantProgressEvents + +export type DeleteBlockProgressEvents = + ProgressEvent<'blocks:delete:blockstore:delete', CID> + +export type DeleteManyBlocksProgressEvents = + ProgressEvent<'blocks:delete-many:blockstore:delete-many'> + +export interface Blocks { + /** + * Store the passed block under the passed CID + * + * @example + * + * ```js + * await store.put([{ key: new Key('awesome'), value: new Uint8Array([0, 1, 2, 3]) }]) + * ``` + */ + put: (key: CID, val: Uint8Array, options?: AbortOptions & ProgressOptions) => Promise + + /** + * Retrieve the value stored under the given key + * + * @example + * ```js + * const value = await store.get(new Key('awesome')) + * console.log('got content: %s', value.toString('utf8')) + * // => got content: datastore + * ``` + */ + get: (key: CID, options?: AbortOptions & ProgressOptions) => Promise + + /** + * Check for the existence of a value for the passed key + * + * @example + * ```js + *const exists = await store.has(new Key('awesome')) + * + *if (exists) { + * console.log('it is there') + *} else { + * console.log('it is not there') + *} + *``` + */ + has: (key: CID, options?: AbortOptions) => Promise + + /** + * Remove the record for the passed key + * + * @example + * + * ```js + * await store.delete(new Key('awesome')) + * console.log('deleted awesome content :(') + * ``` + */ + delete: (key: CID, options?: AbortOptions & ProgressOptions) => Promise + + /** + * Store the given key/value pairs + * + * @example + * ```js + * const source = [{ key: new Key('awesome'), value: new Uint8Array([0, 1, 2, 3]) }] + * + * for await (const { key, value } of store.putMany(source)) { + * console.info(`put content for key ${key}`) + * } + * ``` + */ + putMany: ( + source: AwaitIterable, + options?: AbortOptions & ProgressOptions + ) => AsyncIterable + + /** + * Retrieve values for the passed keys + * + * @example + * ```js + * for await (const value of store.getMany([new Key('awesome')])) { + * console.log('got content:', new TextDecoder('utf8').decode(value)) + * // => got content: datastore + * } + * ``` + */ + getMany: ( + source: AwaitIterable, + options?: AbortOptions & ProgressOptions + ) => AsyncIterable + + /** + * Remove values for the passed keys + * + * @example + * + * ```js + * const source = [new Key('awesome')] + * + * for await (const key of store.deleteMany(source)) { + * console.log(`deleted content with key ${key}`) + * } + * ``` + */ + deleteMany: ( + source: AwaitIterable, + options?: AbortOptions & ProgressOptions + ) => AsyncIterable +} diff --git a/packages/interface/src/index.ts b/packages/interface/src/index.ts index 12ccaa3e4..ff7dc26ce 100644 --- a/packages/interface/src/index.ts +++ b/packages/interface/src/index.ts @@ -15,13 +15,15 @@ */ import type { Libp2p } from '@libp2p/interface-libp2p' -import type { Blockstore } from 'interface-blockstore' import type { AbortOptions } from '@libp2p/interfaces' import type { PeerId } from '@libp2p/interface-peer-id' import type { Datastore } from 'interface-datastore' import type { Pins } from './pins.js' import type { ProgressEvent, ProgressOptions } from 'progress-events' import type { CID } from 'multiformats/cid' +import type { Blocks } from './blocks.js' + +export type AwaitIterable = Iterable | AsyncIterable /** * The API presented by a Helia node. @@ -35,7 +37,7 @@ export interface Helia { /** * Where the blocks are stored */ - blockstore: Blockstore + blockstore: Blocks /** * A key/value store diff --git a/packages/interop/package.json b/packages/interop/package.json index d5802dc69..ff744f2c1 100644 --- a/packages/interop/package.json +++ b/packages/interop/package.json @@ -145,14 +145,14 @@ "@libp2p/websockets": "^5.0.3", "@multiformats/sha3": "^2.0.15", "aegir": "^38.1.0", - "blockstore-core": "^3.0.0", - "datastore-core": "^8.0.4", + "blockstore-core": "^4.0.0", + "datastore-core": "^9.0.0", "go-ipfs": "^0.18.1", "helia": "~0.0.0", "ipfsd-ctl": "^13.0.0", "it-to-buffer": "^3.0.0", "kubo-rpc-client": "^3.0.0", - "libp2p": "^0.42.2", + "libp2p": "next", "multiformats": "^11.0.1" }, "browser": { diff --git a/packages/interop/test/fixtures/create-helia.browser.ts b/packages/interop/test/fixtures/create-helia.browser.ts index da35e7f2b..83fe40d02 100644 --- a/packages/interop/test/fixtures/create-helia.browser.ts +++ b/packages/interop/test/fixtures/create-helia.browser.ts @@ -28,9 +28,6 @@ export async function createHeliaNode (init?: Partial): Promise): Promise