From 0749cbf99745ea6ab4363f1b5d635634ca0ddcfa Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Tue, 10 Oct 2023 17:59:12 +0100 Subject: [PATCH] feat: configurable block brokers (#280) Adds configurable block providers to allow using bitswap but also other methods such as trustless gateways and any yet-to-be-invented way of resolving a CID to a block.. --- packages/helia/package.json | 25 ++++ .../src/block-brokers/bitswap-block-broker.ts | 58 ++++++++ packages/helia/src/block-brokers/index.ts | 2 + .../trustless-gateway-block-broker.ts | 78 ++++++++++ packages/helia/src/helia.ts | 38 +---- packages/helia/src/index.ts | 37 ++++- packages/helia/src/storage.ts | 19 ++- packages/helia/src/utils/default-hashers.ts | 12 ++ packages/helia/src/utils/networked-storage.ts | 137 +++++++++++++++--- .../test/utils/networked-storage.spec.ts | 22 +-- packages/helia/typedoc.json | 3 +- packages/interface/src/blocks.ts | 27 +++- 12 files changed, 386 insertions(+), 72 deletions(-) create mode 100644 packages/helia/src/block-brokers/bitswap-block-broker.ts create mode 100644 packages/helia/src/block-brokers/index.ts create mode 100644 packages/helia/src/block-brokers/trustless-gateway-block-broker.ts create mode 100644 packages/helia/src/utils/default-hashers.ts diff --git a/packages/helia/package.json b/packages/helia/package.json index 81622ea4..f518a660 100644 --- a/packages/helia/package.json +++ b/packages/helia/package.json @@ -16,6 +16,22 @@ ], "type": "module", "types": "./dist/src/index.d.ts", + "typesVersions": { + "*": { + "*": [ + "*", + "dist/*", + "dist/src/*", + "dist/src/*/index" + ], + "src/*": [ + "*", + "dist/*", + "dist/src/*", + "dist/src/*/index" + ] + } + }, "files": [ "src", "dist", @@ -26,6 +42,14 @@ ".": { "types": "./dist/src/index.d.ts", "import": "./dist/src/index.js" + }, + "./block-brokers": { + "types": "./dist/src/block-brokers/index.d.ts", + "import": "./dist/src/block-brokers/index.js" + }, + "./hashers": { + "types": "./dist/src/utils/default-hashers.d.ts", + "import": "./dist/src/utils/default-hashers.js" } }, "eslintConfig": { @@ -68,6 +92,7 @@ "@libp2p/webrtc": "^3.1.3", "@libp2p/websockets": "^7.0.2", "@libp2p/webtransport": "^3.0.3", + "any-signal": "^4.1.1", "blockstore-core": "^4.0.0", "cborg": "^4.0.1", "datastore-core": "^9.0.0", diff --git a/packages/helia/src/block-brokers/bitswap-block-broker.ts b/packages/helia/src/block-brokers/bitswap-block-broker.ts new file mode 100644 index 00000000..20e07f24 --- /dev/null +++ b/packages/helia/src/block-brokers/bitswap-block-broker.ts @@ -0,0 +1,58 @@ +import { createBitswap } from 'ipfs-bitswap' +import type { BlockAnnouncer, BlockRetriever } from '@helia/interface/blocks' +import type { Libp2p } from '@libp2p/interface' +import type { Startable } from '@libp2p/interface/startable' +import type { Blockstore } from 'interface-blockstore' +import type { AbortOptions } from 'interface-store' +import type { Bitswap, BitswapNotifyProgressEvents, BitswapWantBlockProgressEvents } from 'ipfs-bitswap' +import type { CID } from 'multiformats/cid' +import type { MultihashHasher } from 'multiformats/hashes/interface' +import type { ProgressOptions } from 'progress-events' + +export class BitswapBlockBroker implements BlockAnnouncer>, BlockRetriever< +ProgressOptions +>, Startable { + private readonly bitswap: Bitswap + private started: boolean + + constructor (libp2p: Libp2p, blockstore: Blockstore, hashers: MultihashHasher[]) { + this.bitswap = createBitswap(libp2p, blockstore, { + hashLoader: { + getHasher: async (codecOrName: string | number): Promise> => { + const hasher = hashers.find(hasher => { + return hasher.code === codecOrName || hasher.name === codecOrName + }) + + if (hasher != null) { + return hasher + } + + throw new Error(`Could not load hasher for code/name "${codecOrName}"`) + } + } + }) + this.started = false + } + + isStarted (): boolean { + return this.started + } + + async start (): Promise { + await this.bitswap.start() + this.started = true + } + + async stop (): Promise { + await this.bitswap.stop() + this.started = false + } + + announce (cid: CID, block: Uint8Array, options?: ProgressOptions): void { + this.bitswap.notify(cid, block, options) + } + + async retrieve (cid: CID, options?: AbortOptions & ProgressOptions): Promise { + return this.bitswap.want(cid, options) + } +} diff --git a/packages/helia/src/block-brokers/index.ts b/packages/helia/src/block-brokers/index.ts new file mode 100644 index 00000000..58de6077 --- /dev/null +++ b/packages/helia/src/block-brokers/index.ts @@ -0,0 +1,2 @@ +export { BitswapBlockBroker } from './bitswap-block-broker.js' +export { TrustedGatewayBlockBroker } from './trustless-gateway-block-broker.js' diff --git a/packages/helia/src/block-brokers/trustless-gateway-block-broker.ts b/packages/helia/src/block-brokers/trustless-gateway-block-broker.ts new file mode 100644 index 00000000..23ebb4bf --- /dev/null +++ b/packages/helia/src/block-brokers/trustless-gateway-block-broker.ts @@ -0,0 +1,78 @@ +import { logger } from '@libp2p/logger' +import type { BlockRetriever } from '@helia/interface/blocks' +import type { AbortOptions } from 'interface-store' +import type { CID } from 'multiformats/cid' +import type { ProgressEvent, ProgressOptions } from 'progress-events' + +const log = logger('helia:trustless-gateway-block-provider') + +export type TrustlessGatewayGetBlockProgressEvents = + ProgressEvent<'trustless-gateway:get-block:fetch', URL> + +/** + * A class that accepts a list of trustless gateways that are queried + * for blocks. + */ +export class TrustedGatewayBlockBroker implements BlockRetriever< +ProgressOptions +> { + private readonly gateways: URL[] + + constructor (urls: Array) { + this.gateways = urls.map(url => new URL(url.toString())) + } + + async retrieve (cid: CID, options: AbortOptions & ProgressOptions = {}): Promise { + // choose a gateway + const url = this.gateways[Math.floor(Math.random() * this.gateways.length)] + + log('getting block for %c from %s', cid, url) + + try { + const block = await getRawBlockFromGateway(url, cid, options.signal) + log('got block for %c from %s', cid, url) + + return block + } catch (err: any) { + log.error('failed to get block for %c from %s', cid, url, err) + + throw err + } + } +} + +async function getRawBlockFromGateway (url: URL, cid: CID, signal?: AbortSignal): Promise { + const gwUrl = new URL(url) + gwUrl.pathname = `/ipfs/${cid.toString()}` + + // necessary as not every gateway supports dag-cbor, but every should support + // sending raw block as-is + gwUrl.search = '?format=raw' + + if (signal?.aborted === true) { + throw new Error(`Signal to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted prior to fetch`) + } + + try { + const res = await fetch(gwUrl.toString(), { + signal, + headers: { + // also set header, just in case ?format= is filtered out by some + // reverse proxy + Accept: 'application/vnd.ipld.raw' + }, + cache: 'force-cache' + }) + if (!res.ok) { + throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()}`) + } + return new Uint8Array(await res.arrayBuffer()) + } catch (cause) { + // @ts-expect-error - TS thinks signal?.aborted can only be false now + // because it was checked for true above. + if (signal?.aborted === true) { + throw new Error(`fetching raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted`) + } + throw new Error(`unable to fetch raw block for CID ${cid}`) + } +} diff --git a/packages/helia/src/helia.ts b/packages/helia/src/helia.ts index dd8ba172..9dc64446 100644 --- a/packages/helia/src/helia.ts +++ b/packages/helia/src/helia.ts @@ -1,8 +1,6 @@ +import { start, stop } from '@libp2p/interface/startable' import { logger } from '@libp2p/logger' -import { type Bitswap, createBitswap } from 'ipfs-bitswap' import drain from 'it-drain' -import { identity } from 'multiformats/hashes/identity' -import { sha256, sha512 } from 'multiformats/hashes/sha2' import { CustomProgressEvent } from 'progress-events' import { PinsImpl } from './pins.js' import { BlockStorage } from './storage.js' @@ -15,7 +13,6 @@ import type { Libp2p } from '@libp2p/interface' import type { Blockstore } from 'interface-blockstore' import type { Datastore } from 'interface-datastore' import type { CID } from 'multiformats/cid' -import type { MultihashHasher } from 'multiformats/hashes/interface' const log = logger('helia') @@ -31,34 +28,10 @@ export class HeliaImpl implements Helia { public datastore: Datastore public pins: Pins - #bitswap?: Bitswap - constructor (init: HeliaImplInit) { - const hashers: MultihashHasher[] = [ - sha256, - sha512, - identity, - ...(init.hashers ?? []) - ] - - this.#bitswap = createBitswap(init.libp2p, init.blockstore, { - hashLoader: { - getHasher: async (codecOrName: string | number): Promise> => { - const hasher = hashers.find(hasher => { - return hasher.code === codecOrName || hasher.name === codecOrName - }) - - if (hasher != null) { - return hasher - } - - throw new Error(`Could not load hasher for code/name "${codecOrName}"`) - } - } - }) - const networkedStorage = new NetworkedStorage(init.blockstore, { - bitswap: this.#bitswap + blockBrokers: init.blockBrokers, + hashers: init.hashers }) this.pins = new PinsImpl(init.datastore, networkedStorage, init.dagWalkers ?? []) @@ -72,14 +45,13 @@ export class HeliaImpl implements Helia { async start (): Promise { await assertDatastoreVersionIsCurrent(this.datastore) - - await this.#bitswap?.start() + await start(this.blockstore) await this.libp2p.start() } async stop (): Promise { await this.libp2p.stop() - await this.#bitswap?.stop() + await stop(this.blockstore) } async gc (options: GCOptions = {}): Promise { diff --git a/packages/helia/src/index.ts b/packages/helia/src/index.ts index 47606f2d..86beb8ae 100644 --- a/packages/helia/src/index.ts +++ b/packages/helia/src/index.ts @@ -24,11 +24,14 @@ import { logger } from '@libp2p/logger' import { MemoryBlockstore } from 'blockstore-core' import { MemoryDatastore } from 'datastore-core' +import { BitswapBlockBroker, TrustedGatewayBlockBroker } from './block-brokers/index.js' import { HeliaImpl } from './helia.js' +import { defaultHashers } from './utils/default-hashers.js' import { createLibp2p } from './utils/libp2p.js' import { name, version } from './version.js' import type { DefaultLibp2pServices } from './utils/libp2p-defaults.js' import type { Helia } from '@helia/interface' +import type { BlockBroker } from '@helia/interface/blocks' import type { Libp2p } from '@libp2p/interface' import type { Blockstore } from 'interface-blockstore' import type { Datastore } from 'interface-datastore' @@ -91,6 +94,12 @@ export interface HeliaInit { */ dagWalkers?: DAGWalker[] + /** + * A list of strategies used to fetch blocks when they are not present in + * the local blockstore + */ + blockBrokers?: BlockBroker[] + /** * Pass `false` to not start the Helia node */ @@ -114,6 +123,23 @@ export interface HeliaInit { holdGcLock?: boolean } +const DEFAULT_TRUSTLESS_GATEWAYS = [ + // 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/ + 'https://dweb.link', + + // 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/ + 'https://cf-ipfs.com', + + // 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/ + 'https://4everland.io', + + // 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/ + 'https://w3s.link', + + // 2023-10-03: IPNS, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/ + 'https://cloudflare-ipfs.com' +] + /** * Create and return a Helia node */ @@ -131,11 +157,20 @@ export async function createHelia (init: HeliaInit = {}): Promise libp2p = await createLibp2p(datastore, init.libp2p) } + const hashers = defaultHashers(init.hashers) + + const blockBrokers = init.blockBrokers ?? [ + new BitswapBlockBroker(libp2p, blockstore, hashers), + new TrustedGatewayBlockBroker(DEFAULT_TRUSTLESS_GATEWAYS) + ] + const helia = new HeliaImpl({ ...init, datastore, blockstore, - libp2p + libp2p, + blockBrokers, + hashers }) if (init.start !== false) { diff --git a/packages/helia/src/storage.ts b/packages/helia/src/storage.ts index e4e41508..0575cdf6 100644 --- a/packages/helia/src/storage.ts +++ b/packages/helia/src/storage.ts @@ -1,3 +1,4 @@ +import { start, stop, type Startable } from '@libp2p/interface/startable' import createMortice from 'mortice' import type { Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions } from '@helia/interface/blocks' import type { Pins } from '@helia/interface/pins' @@ -21,10 +22,11 @@ export interface GetOptions extends AbortOptions { * blockstore (that may be on disk, s3, or something else). If the blocks are * not present Bitswap will be used to fetch them from network peers. */ -export class BlockStorage implements Blocks { +export class BlockStorage implements Blocks, Startable { public lock: Mortice private readonly child: Blockstore private readonly pins: Pins + private started: boolean /** * Create a new BlockStorage @@ -35,6 +37,21 @@ export class BlockStorage implements Blocks { this.lock = createMortice({ singleProcess: options.holdGcLock }) + this.started = false + } + + isStarted (): boolean { + return this.started + } + + async start (): Promise { + await start(this.child) + this.started = true + } + + async stop (): Promise { + await stop(this.child) + this.started = false } unwrap (): Blockstore { diff --git a/packages/helia/src/utils/default-hashers.ts b/packages/helia/src/utils/default-hashers.ts new file mode 100644 index 00000000..9352e870 --- /dev/null +++ b/packages/helia/src/utils/default-hashers.ts @@ -0,0 +1,12 @@ +import { identity } from 'multiformats/hashes/identity' +import { sha256, sha512 } from 'multiformats/hashes/sha2' +import type { MultihashHasher } from 'multiformats/hashes/interface' + +export function defaultHashers (hashers: MultihashHasher[] = []): MultihashHasher[] { + return [ + sha256, + sha512, + identity, + ...hashers + ] +} diff --git a/packages/helia/src/utils/networked-storage.ts b/packages/helia/src/utils/networked-storage.ts index 4ef96a8f..871f10af 100644 --- a/packages/helia/src/utils/networked-storage.ts +++ b/packages/helia/src/utils/networked-storage.ts @@ -1,36 +1,71 @@ +import { CodeError } from '@libp2p/interface/errors' +import { start, stop, type Startable } from '@libp2p/interface/startable' +import { logger } from '@libp2p/logger' +import { anySignal } from 'any-signal' import filter from 'it-filter' import forEach from 'it-foreach' import { CustomProgressEvent, type ProgressOptions } from 'progress-events' -import type { Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions } from '@helia/interface/blocks' +import { equals as uint8ArrayEquals } from 'uint8arrays/equals' +import type { BlockBroker, Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions, BlockRetriever, BlockAnnouncer } from '@helia/interface/blocks' import type { AbortOptions } from '@libp2p/interface' import type { Blockstore } from 'interface-blockstore' import type { AwaitIterable } from 'interface-store' -import type { Bitswap } from 'ipfs-bitswap' import type { CID } from 'multiformats/cid' +import type { MultihashHasher } from 'multiformats/hashes/interface' -export interface BlockStorageInit { - holdGcLock?: boolean - bitswap?: Bitswap +const log = logger('helia:networked-storage') + +export interface NetworkedStorageStorageInit { + blockBrokers?: BlockBroker[] + hashers?: MultihashHasher[] } export interface GetOptions extends AbortOptions { progress?(evt: Event): void } +function isBlockRetriever (b: any): b is BlockRetriever { + return typeof b.retrieve === 'function' +} + +function isBlockAnnouncer (b: any): b is BlockAnnouncer { + return typeof b.announce === 'function' +} + /** * Networked storage wraps a regular blockstore - when getting blocks if the * blocks are not present Bitswap will be used to fetch them from network peers. */ -export class NetworkedStorage implements Blocks { +export class NetworkedStorage implements Blocks, Startable { private readonly child: Blockstore - private readonly bitswap?: Bitswap + private readonly blockRetrievers: BlockRetriever[] + private readonly blockAnnouncers: BlockAnnouncer[] + private readonly hashers: MultihashHasher[] + private started: boolean /** * Create a new BlockStorage */ - constructor (blockstore: Blockstore, options: BlockStorageInit = {}) { + constructor (blockstore: Blockstore, init: NetworkedStorageStorageInit) { this.child = blockstore - this.bitswap = options.bitswap + this.blockRetrievers = (init.blockBrokers ?? []).filter(isBlockRetriever) + this.blockAnnouncers = (init.blockBrokers ?? []).filter(isBlockAnnouncer) + this.hashers = init.hashers ?? [] + this.started = false + } + + isStarted (): boolean { + return this.started + } + + async start (): Promise { + await start(this.child, ...new Set([...this.blockRetrievers, ...this.blockAnnouncers])) + this.started = true + } + + async stop (): Promise { + await stop(this.child, ...new Set([...this.blockRetrievers, ...this.blockAnnouncers])) + this.started = false } unwrap (): Blockstore { @@ -46,10 +81,11 @@ export class NetworkedStorage implements Blocks { return cid } - if (this.bitswap?.isStarted() === true) { - options.onProgress?.(new CustomProgressEvent('blocks:put:bitswap:notify', cid)) - this.bitswap.notify(cid, block, options) - } + options.onProgress?.(new CustomProgressEvent('blocks:put:providers:notify', cid)) + + this.blockAnnouncers.forEach(provider => { + provider.announce(cid, block, options) + }) options.onProgress?.(new CustomProgressEvent('blocks:put:blockstore:put', cid)) @@ -71,8 +107,10 @@ export class NetworkedStorage implements Blocks { }) const notifyEach = forEach(missingBlocks, ({ cid, block }): void => { - options.onProgress?.(new CustomProgressEvent('blocks:put-many:bitswap:notify', cid)) - this.bitswap?.notify(cid, block, options) + options.onProgress?.(new CustomProgressEvent('blocks:put-many:providers:notify', cid)) + this.blockAnnouncers.forEach(provider => { + provider.announce(cid, block, options) + }) }) options.onProgress?.(new CustomProgressEvent('blocks:put-many:blockstore:put-many')) @@ -83,13 +121,19 @@ export class NetworkedStorage implements Blocks { * Get a block by cid */ async get (cid: CID, options: GetOfflineOptions & AbortOptions & ProgressOptions = {}): Promise { - if (options.offline !== true && this.bitswap?.isStarted() != null && !(await this.child.has(cid))) { - options.onProgress?.(new CustomProgressEvent('blocks:get:bitswap:get', cid)) - const block = await this.bitswap.want(cid, options) - + if (options.offline !== true && !(await this.child.has(cid))) { + // we do not have the block locally, get it from a block provider + options.onProgress?.(new CustomProgressEvent('blocks:get:providers:get', cid)) + const block = await raceBlockRetrievers(cid, this.blockRetrievers, this.hashers, options) options.onProgress?.(new CustomProgressEvent('blocks:get:blockstore:put', cid)) await this.child.put(cid, block, options) + // notify other block providers of the new block + options.onProgress?.(new CustomProgressEvent('blocks:get:providers:notify', cid)) + this.blockAnnouncers.forEach(provider => { + provider.announce(cid, block, options) + }) + return block } @@ -105,11 +149,18 @@ export class NetworkedStorage implements Blocks { options.onProgress?.(new CustomProgressEvent('blocks:get-many:blockstore:get-many')) yield * this.child.getMany(forEach(cids, async (cid): Promise => { - if (options.offline !== true && this.bitswap?.isStarted() === true && !(await this.child.has(cid))) { - options.onProgress?.(new CustomProgressEvent('blocks:get-many:bitswap:get', cid)) - const block = await this.bitswap.want(cid, options) + if (options.offline !== true && !(await this.child.has(cid))) { + // we do not have the block locally, get it from a block provider + options.onProgress?.(new CustomProgressEvent('blocks:get-many:providers:get', cid)) + const block = await raceBlockRetrievers(cid, this.blockRetrievers, this.hashers, options) options.onProgress?.(new CustomProgressEvent('blocks:get-many:blockstore:put', cid)) await this.child.put(cid, block, options) + + // notify other block providers of the new block + options.onProgress?.(new CustomProgressEvent('blocks:get-many:providers:notify', cid)) + this.blockAnnouncers.forEach(provider => { + provider.announce(cid, block, options) + }) } })) } @@ -144,3 +195,45 @@ export class NetworkedStorage implements Blocks { yield * this.child.getAll(options) } } + +/** + * Race block providers cancelling any pending requests once the block has been + * found. + */ +async function raceBlockRetrievers (cid: CID, providers: BlockRetriever[], hashers: MultihashHasher[], options: AbortOptions): Promise { + const hasher = hashers.find(hasher => hasher.code === cid.multihash.code) + + if (hasher == null) { + throw new CodeError(`No hasher configured for multihash code 0x${cid.multihash.code.toString(16)}, please configure one. You can look up which hash this is at https://github.com/multiformats/multicodec/blob/master/table.csv`, 'ERR_UNKNOWN_HASH_ALG') + } + + const controller = new AbortController() + const signal = anySignal([controller.signal, options.signal]) + + try { + return await Promise.any( + providers.map(async provider => { + try { + const block = await provider.retrieve(cid, { + ...options, + signal + }) + + // verify block + const hash = await hasher.digest(block) + + if (!uint8ArrayEquals(hash.digest, cid.multihash.digest)) { + throw new CodeError('Hash of downloaded block did not match multihash from passed CID', 'ERR_HASH_MISMATCH') + } + + return block + } catch (err) { + log.error('could not retrieve block for %c', cid, err) + throw err + } + }) + ) + } finally { + signal.clear() + } +} diff --git a/packages/helia/test/utils/networked-storage.spec.ts b/packages/helia/test/utils/networked-storage.spec.ts index 8ce33f64..5c993ed5 100644 --- a/packages/helia/test/utils/networked-storage.spec.ts +++ b/packages/helia/test/utils/networked-storage.spec.ts @@ -8,16 +8,17 @@ import drain from 'it-drain' import * as raw from 'multiformats/codecs/raw' import Sinon from 'sinon' import { type StubbedInstance, stubInterface } from 'sinon-ts' +import { defaultHashers } from '../../src/utils/default-hashers.js' import { NetworkedStorage } from '../../src/utils/networked-storage.js' import { createBlock } from '../fixtures/create-block.js' +import type { BitswapBlockBroker } from '../../src/block-brokers/bitswap-block-broker.js' import type { Blockstore } from 'interface-blockstore' -import type { Bitswap } from 'ipfs-bitswap' import type { CID } from 'multiformats/cid' describe('storage', () => { let storage: NetworkedStorage let blockstore: Blockstore - let bitswap: StubbedInstance + let bitswap: StubbedInstance let blocks: Array<{ cid: CID, block: Uint8Array }> beforeEach(async () => { @@ -28,9 +29,12 @@ describe('storage', () => { } blockstore = new MemoryBlockstore() - bitswap = stubInterface() + bitswap = stubInterface() storage = new NetworkedStorage(blockstore, { - bitswap + blockBrokers: [ + bitswap + ], + hashers: defaultHashers() }) }) @@ -114,7 +118,7 @@ describe('storage', () => { const { cid, block } = blocks[0] bitswap.isStarted.returns(true) - bitswap.want.withArgs(cid).resolves(block) + bitswap.retrieve.withArgs(cid).resolves(block) expect(await blockstore.has(cid)).to.be.false() @@ -122,7 +126,7 @@ describe('storage', () => { expect(await blockstore.has(cid)).to.be.true() expect(returned).to.equalBytes(block) - expect(bitswap.want.called).to.be.true() + expect(bitswap.retrieve.called).to.be.true() }) it('gets many blocks from bitswap when they are not in the blockstore', async () => { @@ -132,7 +136,7 @@ describe('storage', () => { for (let i = 0; i < count; i++) { const { cid, block } = blocks[i] - bitswap.want.withArgs(cid).resolves(block) + bitswap.retrieve.withArgs(cid).resolves(block) expect(await blockstore.has(cid)).to.be.false() } @@ -148,7 +152,7 @@ describe('storage', () => { for (let i = 0; i < count; i++) { const { cid } = blocks[i] - expect(bitswap.want.calledWith(cid)).to.be.true() + expect(bitswap.retrieve.calledWith(cid)).to.be.true() expect(await blockstore.has(cid)).to.be.true() } }) @@ -165,7 +169,7 @@ describe('storage', () => { await blockstore.put(blocks[4].cid, blocks[4].block) // block #2 comes from bitswap but slowly - bitswap.want.withArgs(blocks[2].cid).callsFake(async () => { + bitswap.retrieve.withArgs(blocks[2].cid).callsFake(async () => { await delay(100) return blocks[2].block }) diff --git a/packages/helia/typedoc.json b/packages/helia/typedoc.json index f599dc72..e233a1ff 100644 --- a/packages/helia/typedoc.json +++ b/packages/helia/typedoc.json @@ -1,5 +1,6 @@ { "entryPoints": [ - "./src/index.ts" + "./src/index.ts", + "./src/block-providers/index.ts" ] } diff --git a/packages/interface/src/blocks.ts b/packages/interface/src/blocks.ts index b0d49406..742a0484 100644 --- a/packages/interface/src/blocks.ts +++ b/packages/interface/src/blocks.ts @@ -1,4 +1,5 @@ import type { Blockstore } from 'interface-blockstore' +import type { AbortOptions } from 'interface-store' import type { BitswapNotifyProgressEvents, BitswapWantProgressEvents } from 'ipfs-bitswap' import type { CID } from 'multiformats/cid' import type { ProgressEvent, ProgressOptions } from 'progress-events' @@ -10,31 +11,31 @@ export interface Pair { export type HasBlockProgressEvents = ProgressEvent<'blocks:put:duplicate', CID> | - ProgressEvent<'blocks:put:bitswap:notify', CID> | + ProgressEvent<'blocks:put:providers:notify', CID> | ProgressEvent<'blocks:put:blockstore:put', CID> | BitswapNotifyProgressEvents export type PutBlockProgressEvents = ProgressEvent<'blocks:put:duplicate', CID> | - ProgressEvent<'blocks:put:bitswap:notify', CID> | + ProgressEvent<'blocks:put:providers:notify', CID> | ProgressEvent<'blocks:put:blockstore:put', CID> | BitswapNotifyProgressEvents export type PutManyBlocksProgressEvents = ProgressEvent<'blocks:put-many:duplicate', CID> | - ProgressEvent<'blocks:put-many:bitswap:notify', CID> | + ProgressEvent<'blocks:put-many:providers:notify', CID> | ProgressEvent<'blocks:put-many:blockstore:put-many'> | BitswapNotifyProgressEvents export type GetBlockProgressEvents = - ProgressEvent<'blocks:get:bitswap:want', CID> | + ProgressEvent<'blocks:get:providers:want', CID> | ProgressEvent<'blocks:get:blockstore:get', CID> | ProgressEvent<'blocks:get:blockstore:put', CID> | BitswapWantProgressEvents export type GetManyBlocksProgressEvents = ProgressEvent<'blocks:get-many:blockstore:get-many'> | - ProgressEvent<'blocks:get-many:bitswap:want', CID> | + ProgressEvent<'blocks:get-many:providers:want', CID> | ProgressEvent<'blocks:get-many:blockstore:put', CID> | BitswapWantProgressEvents @@ -61,3 +62,19 @@ ProgressOptions, ProgressOptions { } + +export interface BlockRetriever { + /** + * Retrieve a block from a source + */ + retrieve(cid: CID, options?: AbortOptions & GetProgressOptions): Promise +} + +export interface BlockAnnouncer { + /** + * Make a new block available to peers + */ + announce(cid: CID, block: Uint8Array, options?: NotifyProgressOptions): void +} + +export type BlockBroker = BlockRetriever | BlockAnnouncer