From 4dd83b75dd77eb428a9813f1249811ecd4695b31 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Mon, 9 Oct 2023 16:01:49 +0100 Subject: [PATCH 1/7] feat: configurable block providers Adds configurable block providers to allow using bitswap but also other methods such as trustless gateways and any yet-to-be-invented way of resolving a CID to a block.. --- package.json | 3 + packages/helia/package.json | 21 ++++ .../block-providers/bitswap-block-provider.ts | 59 +++++++++ packages/helia/src/block-providers/index.ts | 2 + .../trustless-gateway-block-provider.ts | 83 ++++++++++++ packages/helia/src/helia.ts | 38 +----- packages/helia/src/index.ts | 43 ++++++- packages/helia/src/storage.ts | 19 ++- packages/helia/src/utils/networked-storage.ts | 119 ++++++++++++++---- .../test/utils/networked-storage.spec.ts | 20 +-- packages/helia/typedoc.json | 3 +- packages/interface/src/blocks.ts | 26 +++- 12 files changed, 364 insertions(+), 72 deletions(-) create mode 100644 packages/helia/src/block-providers/bitswap-block-provider.ts create mode 100644 packages/helia/src/block-providers/index.ts create mode 100644 packages/helia/src/block-providers/trustless-gateway-block-provider.ts diff --git a/package.json b/package.json index bda639ad..6e5a2c76 100644 --- a/package.json +++ b/package.json @@ -36,6 +36,9 @@ "docs": "NODE_OPTIONS=--max_old_space_size=8192 aegir docs -- --exclude packages/interop --excludeExternals", "docs:no-publish": "NODE_OPTIONS=--max_old_space_size=8192 aegir docs --publish false -- --exclude packages/interop" }, + "dependencies": { + "any-signal": "^4.1.1" + }, "devDependencies": { "aegir": "^41.0.0", "npm-run-all": "^4.1.5", diff --git a/packages/helia/package.json b/packages/helia/package.json index 81622ea4..5962501d 100644 --- a/packages/helia/package.json +++ b/packages/helia/package.json @@ -16,6 +16,22 @@ ], "type": "module", "types": "./dist/src/index.d.ts", + "typesVersions": { + "*": { + "*": [ + "*", + "dist/*", + "dist/src/*", + "dist/src/*/index" + ], + "src/*": [ + "*", + "dist/*", + "dist/src/*", + "dist/src/*/index" + ] + } + }, "files": [ "src", "dist", @@ -26,6 +42,10 @@ ".": { "types": "./dist/src/index.d.ts", "import": "./dist/src/index.js" + }, + "./block-providers": { + "types": "./dist/src/block-providers/index.d.ts", + "import": "./dist/src/block-providers/index.js" } }, "eslintConfig": { @@ -68,6 +88,7 @@ "@libp2p/webrtc": "^3.1.3", "@libp2p/websockets": "^7.0.2", "@libp2p/webtransport": "^3.0.3", + "any-signal": "^4.1.1", "blockstore-core": "^4.0.0", "cborg": "^4.0.1", "datastore-core": "^9.0.0", diff --git a/packages/helia/src/block-providers/bitswap-block-provider.ts b/packages/helia/src/block-providers/bitswap-block-provider.ts new file mode 100644 index 00000000..2da48251 --- /dev/null +++ b/packages/helia/src/block-providers/bitswap-block-provider.ts @@ -0,0 +1,59 @@ +import { createBitswap } from 'ipfs-bitswap' +import type { BlockProvider } from '@helia/interface/blocks' +import type { Libp2p } from '@libp2p/interface' +import type { Startable } from '@libp2p/interface/startable' +import type { Blockstore } from 'interface-blockstore' +import type { AbortOptions } from 'interface-store' +import type { Bitswap, BitswapNotifyProgressEvents, BitswapWantBlockProgressEvents } from 'ipfs-bitswap' +import type { CID } from 'multiformats/cid' +import type { MultihashHasher } from 'multiformats/hashes/interface' +import type { ProgressOptions } from 'progress-events' + +export class BitswapBlockProvider implements BlockProvider< +ProgressOptions, +ProgressOptions +>, Startable { + private readonly bitswap: Bitswap + private started: boolean + + constructor (libp2p: Libp2p, blockstore: Blockstore, hashers: MultihashHasher[]) { + this.bitswap = createBitswap(libp2p, blockstore, { + hashLoader: { + getHasher: async (codecOrName: string | number): Promise> => { + const hasher = hashers.find(hasher => { + return hasher.code === codecOrName || hasher.name === codecOrName + }) + + if (hasher != null) { + return hasher + } + + throw new Error(`Could not load hasher for code/name "${codecOrName}"`) + } + } + }) + this.started = false + } + + isStarted (): boolean { + return this.started + } + + async start (): Promise { + await this.bitswap.start() + this.started = true + } + + async stop (): Promise { + await this.bitswap.stop() + this.started = false + } + + notify (cid: CID, block: Uint8Array, options?: ProgressOptions): void { + this.bitswap.notify(cid, block, options) + } + + async get (cid: CID, options?: AbortOptions & ProgressOptions): Promise { + return this.bitswap.want(cid, options) + } +} diff --git a/packages/helia/src/block-providers/index.ts b/packages/helia/src/block-providers/index.ts new file mode 100644 index 00000000..6d5ab76f --- /dev/null +++ b/packages/helia/src/block-providers/index.ts @@ -0,0 +1,2 @@ +export { BitswapBlockProvider } from './bitswap-block-provider.js' +export { TrustedGatewayBlockProvider } from './trustless-gateway-block-provider.js' diff --git a/packages/helia/src/block-providers/trustless-gateway-block-provider.ts b/packages/helia/src/block-providers/trustless-gateway-block-provider.ts new file mode 100644 index 00000000..b4ddb4f2 --- /dev/null +++ b/packages/helia/src/block-providers/trustless-gateway-block-provider.ts @@ -0,0 +1,83 @@ +import { logger } from '@libp2p/logger' +import type { BlockProvider } from '@helia/interface/blocks' +import type { AbortOptions } from 'interface-store' +import type { CID } from 'multiformats/cid' +import type { ProgressEvent, ProgressOptions } from 'progress-events' + +const log = logger('helia:trustless-gateway-block-provider') + +export type TrustlessGatewayGetBlockProgressEvents = + ProgressEvent<'trustless-gateway:get-block:fetch', URL> + +/** + * A BlockProvider that accepts a list of trustless gateways that are queried + * for blocks. Individual gateways are randomly chosen. + */ +export class TrustedGatewayBlockProvider implements BlockProvider< +ProgressOptions, +ProgressOptions +> { + private readonly gateways: URL[] + + constructor (urls: string[]) { + this.gateways = urls.map(url => new URL(url.toString())) + } + + notify (cid: CID, block: Uint8Array, options?: ProgressOptions): void { + // no-op + } + + async get (cid: CID, options: AbortOptions & ProgressOptions = {}): Promise { + // choose a gateway + const url = this.gateways[Math.floor(Math.random() * this.gateways.length)] + + log('getting block for %c from %s', cid, url) + + try { + const block = await getRawBlockFromGateway(url, cid, options.signal) + log('got block for %c from %s', cid, url) + + return block + } catch (err: any) { + log.error('failed to get block for %c from %s', cid, url, err) + + throw err + } + } +} + +async function getRawBlockFromGateway (url: URL, cid: CID, signal?: AbortSignal): Promise { + const gwUrl = new URL(url) + gwUrl.pathname = `/ipfs/${cid.toString()}` + + // necessary as not every gateway supports dag-cbor, but every should support + // sending raw block as-is + gwUrl.search = '?format=raw' + + if (signal?.aborted === true) { + throw new Error(`Signal to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted prior to fetch`) + } + + try { + const res = await fetch(gwUrl.toString(), { + signal, + headers: { + // also set header, just in case ?format= is filtered out by some + // reverse proxy + Accept: 'application/vnd.ipld.raw' + }, + cache: 'force-cache' + }) + if (!res.ok) { + throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()}`) + } + return new Uint8Array(await res.arrayBuffer()) + } catch (cause) { + // @ts-expect-error - TS thinks signal?.aborted can only be false now + // because it was checked for true above. + if (signal?.aborted === true) { + throw new Error(`fetching raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted`) + } + throw new Error(`unable to fetch raw block for CID ${cid}`) + } +} diff --git a/packages/helia/src/helia.ts b/packages/helia/src/helia.ts index dd8ba172..85b80ae3 100644 --- a/packages/helia/src/helia.ts +++ b/packages/helia/src/helia.ts @@ -1,8 +1,6 @@ +import { start, stop } from '@libp2p/interface/startable' import { logger } from '@libp2p/logger' -import { type Bitswap, createBitswap } from 'ipfs-bitswap' import drain from 'it-drain' -import { identity } from 'multiformats/hashes/identity' -import { sha256, sha512 } from 'multiformats/hashes/sha2' import { CustomProgressEvent } from 'progress-events' import { PinsImpl } from './pins.js' import { BlockStorage } from './storage.js' @@ -15,7 +13,6 @@ import type { Libp2p } from '@libp2p/interface' import type { Blockstore } from 'interface-blockstore' import type { Datastore } from 'interface-datastore' import type { CID } from 'multiformats/cid' -import type { MultihashHasher } from 'multiformats/hashes/interface' const log = logger('helia') @@ -31,34 +28,10 @@ export class HeliaImpl implements Helia { public datastore: Datastore public pins: Pins - #bitswap?: Bitswap - constructor (init: HeliaImplInit) { - const hashers: MultihashHasher[] = [ - sha256, - sha512, - identity, - ...(init.hashers ?? []) - ] - - this.#bitswap = createBitswap(init.libp2p, init.blockstore, { - hashLoader: { - getHasher: async (codecOrName: string | number): Promise> => { - const hasher = hashers.find(hasher => { - return hasher.code === codecOrName || hasher.name === codecOrName - }) - - if (hasher != null) { - return hasher - } - - throw new Error(`Could not load hasher for code/name "${codecOrName}"`) - } - } - }) - const networkedStorage = new NetworkedStorage(init.blockstore, { - bitswap: this.#bitswap + blockProviders: init.blockProviders, + hashers: init.hashers }) this.pins = new PinsImpl(init.datastore, networkedStorage, init.dagWalkers ?? []) @@ -72,14 +45,13 @@ export class HeliaImpl implements Helia { async start (): Promise { await assertDatastoreVersionIsCurrent(this.datastore) - - await this.#bitswap?.start() + await start(this.blockstore) await this.libp2p.start() } async stop (): Promise { await this.libp2p.stop() - await this.#bitswap?.stop() + await stop(this.blockstore) } async gc (options: GCOptions = {}): Promise { diff --git a/packages/helia/src/index.ts b/packages/helia/src/index.ts index 47606f2d..d2239481 100644 --- a/packages/helia/src/index.ts +++ b/packages/helia/src/index.ts @@ -24,11 +24,16 @@ import { logger } from '@libp2p/logger' import { MemoryBlockstore } from 'blockstore-core' import { MemoryDatastore } from 'datastore-core' +import { identity } from 'multiformats/hashes/identity' +import { sha256, sha512 } from 'multiformats/hashes/sha2' +import { BitswapBlockProvider } from './block-providers/bitswap-block-provider.js' +import { TrustedGatewayBlockProvider } from './block-providers/trustless-gateway-block-provider.js' import { HeliaImpl } from './helia.js' import { createLibp2p } from './utils/libp2p.js' import { name, version } from './version.js' import type { DefaultLibp2pServices } from './utils/libp2p-defaults.js' import type { Helia } from '@helia/interface' +import type { BlockProvider } from '@helia/interface/blocks' import type { Libp2p } from '@libp2p/interface' import type { Blockstore } from 'interface-blockstore' import type { Datastore } from 'interface-datastore' @@ -91,6 +96,12 @@ export interface HeliaInit { */ dagWalkers?: DAGWalker[] + /** + * A list of strategies used to fetch blocks when they are not present in + * the local blockstore + */ + blockProviders?: BlockProvider[] + /** * Pass `false` to not start the Helia node */ @@ -114,6 +125,23 @@ export interface HeliaInit { holdGcLock?: boolean } +const DEFAULT_TRUSTLESS_GATEWAYS = [ + // 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/ + 'https://dweb.link', + + // 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/ + 'https://cf-ipfs.com', + + // 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/ + 'https://4everland.io', + + // 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/ + 'https://w3s.link', + + // 2023-10-03: IPNS, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/ + 'https://cloudflare-ipfs.com' +] + /** * Create and return a Helia node */ @@ -131,11 +159,24 @@ export async function createHelia (init: HeliaInit = {}): Promise libp2p = await createLibp2p(datastore, init.libp2p) } + const hashers: MultihashHasher[] = [ + sha256, + sha512, + identity, + ...(init.hashers ?? []) + ] + + const blockProviders = init.blockProviders ?? [ + new BitswapBlockProvider(libp2p, blockstore, hashers), + new TrustedGatewayBlockProvider(DEFAULT_TRUSTLESS_GATEWAYS) + ] + const helia = new HeliaImpl({ ...init, datastore, blockstore, - libp2p + libp2p, + blockProviders }) if (init.start !== false) { diff --git a/packages/helia/src/storage.ts b/packages/helia/src/storage.ts index e4e41508..0575cdf6 100644 --- a/packages/helia/src/storage.ts +++ b/packages/helia/src/storage.ts @@ -1,3 +1,4 @@ +import { start, stop, type Startable } from '@libp2p/interface/startable' import createMortice from 'mortice' import type { Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions } from '@helia/interface/blocks' import type { Pins } from '@helia/interface/pins' @@ -21,10 +22,11 @@ export interface GetOptions extends AbortOptions { * blockstore (that may be on disk, s3, or something else). If the blocks are * not present Bitswap will be used to fetch them from network peers. */ -export class BlockStorage implements Blocks { +export class BlockStorage implements Blocks, Startable { public lock: Mortice private readonly child: Blockstore private readonly pins: Pins + private started: boolean /** * Create a new BlockStorage @@ -35,6 +37,21 @@ export class BlockStorage implements Blocks { this.lock = createMortice({ singleProcess: options.holdGcLock }) + this.started = false + } + + isStarted (): boolean { + return this.started + } + + async start (): Promise { + await start(this.child) + this.started = true + } + + async stop (): Promise { + await stop(this.child) + this.started = false } unwrap (): Blockstore { diff --git a/packages/helia/src/utils/networked-storage.ts b/packages/helia/src/utils/networked-storage.ts index 4ef96a8f..3e0dd5c8 100644 --- a/packages/helia/src/utils/networked-storage.ts +++ b/packages/helia/src/utils/networked-storage.ts @@ -1,16 +1,20 @@ +import { CodeError } from '@libp2p/interface/errors' +import { start, stop, type Startable } from '@libp2p/interface/startable' +import { anySignal } from 'any-signal' import filter from 'it-filter' import forEach from 'it-foreach' import { CustomProgressEvent, type ProgressOptions } from 'progress-events' -import type { Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions } from '@helia/interface/blocks' +import { equals as uint8ArrayEquals } from 'uint8arrays/equals' +import type { BlockProvider, Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions } from '@helia/interface/blocks' import type { AbortOptions } from '@libp2p/interface' import type { Blockstore } from 'interface-blockstore' import type { AwaitIterable } from 'interface-store' -import type { Bitswap } from 'ipfs-bitswap' import type { CID } from 'multiformats/cid' +import type { MultihashHasher } from 'multiformats/hashes/interface' -export interface BlockStorageInit { - holdGcLock?: boolean - bitswap?: Bitswap +export interface NetworkedStorageStorageInit { + blockProviders?: BlockProvider[] + hashers?: MultihashHasher[] } export interface GetOptions extends AbortOptions { @@ -21,16 +25,34 @@ export interface GetOptions extends AbortOptions { * Networked storage wraps a regular blockstore - when getting blocks if the * blocks are not present Bitswap will be used to fetch them from network peers. */ -export class NetworkedStorage implements Blocks { +export class NetworkedStorage implements Blocks, Startable { private readonly child: Blockstore - private readonly bitswap?: Bitswap + private readonly blockProviders: BlockProvider[] + private readonly hashers: MultihashHasher[] + private started: boolean /** * Create a new BlockStorage */ - constructor (blockstore: Blockstore, options: BlockStorageInit = {}) { + constructor (blockstore: Blockstore, init: NetworkedStorageStorageInit) { this.child = blockstore - this.bitswap = options.bitswap + this.blockProviders = init.blockProviders ?? [] + this.hashers = init.hashers ?? [] + this.started = false + } + + isStarted (): boolean { + return this.started + } + + async start (): Promise { + await start(this.child, ...this.blockProviders) + this.started = true + } + + async stop (): Promise { + await stop(this.child, ...this.blockProviders) + this.started = false } unwrap (): Blockstore { @@ -46,10 +68,11 @@ export class NetworkedStorage implements Blocks { return cid } - if (this.bitswap?.isStarted() === true) { - options.onProgress?.(new CustomProgressEvent('blocks:put:bitswap:notify', cid)) - this.bitswap.notify(cid, block, options) - } + options.onProgress?.(new CustomProgressEvent('blocks:put:providers:notify', cid)) + + this.blockProviders.forEach(provider => { + provider.notify(cid, block, options) + }) options.onProgress?.(new CustomProgressEvent('blocks:put:blockstore:put', cid)) @@ -71,8 +94,10 @@ export class NetworkedStorage implements Blocks { }) const notifyEach = forEach(missingBlocks, ({ cid, block }): void => { - options.onProgress?.(new CustomProgressEvent('blocks:put-many:bitswap:notify', cid)) - this.bitswap?.notify(cid, block, options) + options.onProgress?.(new CustomProgressEvent('blocks:put-many:providers:notify', cid)) + this.blockProviders.forEach(provider => { + provider.notify(cid, block, options) + }) }) options.onProgress?.(new CustomProgressEvent('blocks:put-many:blockstore:put-many')) @@ -83,13 +108,19 @@ export class NetworkedStorage implements Blocks { * Get a block by cid */ async get (cid: CID, options: GetOfflineOptions & AbortOptions & ProgressOptions = {}): Promise { - if (options.offline !== true && this.bitswap?.isStarted() != null && !(await this.child.has(cid))) { - options.onProgress?.(new CustomProgressEvent('blocks:get:bitswap:get', cid)) - const block = await this.bitswap.want(cid, options) - + if (options.offline !== true && !(await this.child.has(cid))) { + // we do not have the block locally, get it from a block provider + options.onProgress?.(new CustomProgressEvent('blocks:get:providers:get', cid)) + const block = await raceBlockProviders(cid, this.blockProviders, this.hashers, options) options.onProgress?.(new CustomProgressEvent('blocks:get:blockstore:put', cid)) await this.child.put(cid, block, options) + // notify other block providers of the new block + options.onProgress?.(new CustomProgressEvent('blocks:get:providers:notify', cid)) + this.blockProviders.forEach(provider => { + provider.notify(cid, block, options) + }) + return block } @@ -105,11 +136,18 @@ export class NetworkedStorage implements Blocks { options.onProgress?.(new CustomProgressEvent('blocks:get-many:blockstore:get-many')) yield * this.child.getMany(forEach(cids, async (cid): Promise => { - if (options.offline !== true && this.bitswap?.isStarted() === true && !(await this.child.has(cid))) { - options.onProgress?.(new CustomProgressEvent('blocks:get-many:bitswap:get', cid)) - const block = await this.bitswap.want(cid, options) + if (options.offline !== true && !(await this.child.has(cid))) { + // we do not have the block locally, get it from a block provider + options.onProgress?.(new CustomProgressEvent('blocks:get-many:providers:get', cid)) + const block = await raceBlockProviders(cid, this.blockProviders, this.hashers, options) options.onProgress?.(new CustomProgressEvent('blocks:get-many:blockstore:put', cid)) await this.child.put(cid, block, options) + + // notify other block providers of the new block + options.onProgress?.(new CustomProgressEvent('blocks:get-many:providers:notify', cid)) + this.blockProviders.forEach(provider => { + provider.notify(cid, block, options) + }) } })) } @@ -144,3 +182,40 @@ export class NetworkedStorage implements Blocks { yield * this.child.getAll(options) } } + +/** + * Race block providers cancelling any pending requests once the block has been + * found. + */ +async function raceBlockProviders (cid: CID, providers: BlockProvider[], hashers: MultihashHasher[], options: AbortOptions): Promise { + const hasher = hashers.find(hasher => hasher.code === cid.multihash.code) + + if (hasher == null) { + throw new CodeError(`No hasher configured for multihash code ${cid.code}, please configure one`, 'ERR_UNKNOWN_HASH_ALG') + } + + const controller = new AbortController() + const signal = anySignal([controller.signal, options.signal]) + + try { + return await Promise.any( + providers.map(async provider => { + const block = await provider.get(cid, { + ...options, + signal + }) + + // verify block + const hash = await hasher.digest(block) + + if (!uint8ArrayEquals(hash.digest, cid.multihash.digest)) { + throw new CodeError('Hash of downloaded block did not match multihash from passed CID', 'ERR_HASH_MISMATCH') + } + + return block + }) + ) + } finally { + signal.clear() + } +} diff --git a/packages/helia/test/utils/networked-storage.spec.ts b/packages/helia/test/utils/networked-storage.spec.ts index 8ce33f64..606f7b17 100644 --- a/packages/helia/test/utils/networked-storage.spec.ts +++ b/packages/helia/test/utils/networked-storage.spec.ts @@ -10,14 +10,14 @@ import Sinon from 'sinon' import { type StubbedInstance, stubInterface } from 'sinon-ts' import { NetworkedStorage } from '../../src/utils/networked-storage.js' import { createBlock } from '../fixtures/create-block.js' +import type { BitswapBlockProvider } from '../../src/block-providers/bitswap-block-provider.js' import type { Blockstore } from 'interface-blockstore' -import type { Bitswap } from 'ipfs-bitswap' import type { CID } from 'multiformats/cid' describe('storage', () => { let storage: NetworkedStorage let blockstore: Blockstore - let bitswap: StubbedInstance + let bitswap: StubbedInstance let blocks: Array<{ cid: CID, block: Uint8Array }> beforeEach(async () => { @@ -28,9 +28,11 @@ describe('storage', () => { } blockstore = new MemoryBlockstore() - bitswap = stubInterface() + bitswap = stubInterface() storage = new NetworkedStorage(blockstore, { - bitswap + blockProviders: [ + bitswap + ] }) }) @@ -114,7 +116,7 @@ describe('storage', () => { const { cid, block } = blocks[0] bitswap.isStarted.returns(true) - bitswap.want.withArgs(cid).resolves(block) + bitswap.get.withArgs(cid).resolves(block) expect(await blockstore.has(cid)).to.be.false() @@ -122,7 +124,7 @@ describe('storage', () => { expect(await blockstore.has(cid)).to.be.true() expect(returned).to.equalBytes(block) - expect(bitswap.want.called).to.be.true() + expect(bitswap.get.called).to.be.true() }) it('gets many blocks from bitswap when they are not in the blockstore', async () => { @@ -132,7 +134,7 @@ describe('storage', () => { for (let i = 0; i < count; i++) { const { cid, block } = blocks[i] - bitswap.want.withArgs(cid).resolves(block) + bitswap.get.withArgs(cid).resolves(block) expect(await blockstore.has(cid)).to.be.false() } @@ -148,7 +150,7 @@ describe('storage', () => { for (let i = 0; i < count; i++) { const { cid } = blocks[i] - expect(bitswap.want.calledWith(cid)).to.be.true() + expect(bitswap.get.calledWith(cid)).to.be.true() expect(await blockstore.has(cid)).to.be.true() } }) @@ -165,7 +167,7 @@ describe('storage', () => { await blockstore.put(blocks[4].cid, blocks[4].block) // block #2 comes from bitswap but slowly - bitswap.want.withArgs(blocks[2].cid).callsFake(async () => { + bitswap.get.withArgs(blocks[2].cid).callsFake(async () => { await delay(100) return blocks[2].block }) diff --git a/packages/helia/typedoc.json b/packages/helia/typedoc.json index f599dc72..e233a1ff 100644 --- a/packages/helia/typedoc.json +++ b/packages/helia/typedoc.json @@ -1,5 +1,6 @@ { "entryPoints": [ - "./src/index.ts" + "./src/index.ts", + "./src/block-providers/index.ts" ] } diff --git a/packages/interface/src/blocks.ts b/packages/interface/src/blocks.ts index b0d49406..4beba0e2 100644 --- a/packages/interface/src/blocks.ts +++ b/packages/interface/src/blocks.ts @@ -1,4 +1,5 @@ import type { Blockstore } from 'interface-blockstore' +import type { AbortOptions } from 'interface-store' import type { BitswapNotifyProgressEvents, BitswapWantProgressEvents } from 'ipfs-bitswap' import type { CID } from 'multiformats/cid' import type { ProgressEvent, ProgressOptions } from 'progress-events' @@ -10,31 +11,31 @@ export interface Pair { export type HasBlockProgressEvents = ProgressEvent<'blocks:put:duplicate', CID> | - ProgressEvent<'blocks:put:bitswap:notify', CID> | + ProgressEvent<'blocks:put:providers:notify', CID> | ProgressEvent<'blocks:put:blockstore:put', CID> | BitswapNotifyProgressEvents export type PutBlockProgressEvents = ProgressEvent<'blocks:put:duplicate', CID> | - ProgressEvent<'blocks:put:bitswap:notify', CID> | + ProgressEvent<'blocks:put:providers:notify', CID> | ProgressEvent<'blocks:put:blockstore:put', CID> | BitswapNotifyProgressEvents export type PutManyBlocksProgressEvents = ProgressEvent<'blocks:put-many:duplicate', CID> | - ProgressEvent<'blocks:put-many:bitswap:notify', CID> | + ProgressEvent<'blocks:put-many:providers:notify', CID> | ProgressEvent<'blocks:put-many:blockstore:put-many'> | BitswapNotifyProgressEvents export type GetBlockProgressEvents = - ProgressEvent<'blocks:get:bitswap:want', CID> | + ProgressEvent<'blocks:get:providers:want', CID> | ProgressEvent<'blocks:get:blockstore:get', CID> | ProgressEvent<'blocks:get:blockstore:put', CID> | BitswapWantProgressEvents export type GetManyBlocksProgressEvents = ProgressEvent<'blocks:get-many:blockstore:get-many'> | - ProgressEvent<'blocks:get-many:bitswap:want', CID> | + ProgressEvent<'blocks:get-many:providers:want', CID> | ProgressEvent<'blocks:get-many:blockstore:put', CID> | BitswapWantProgressEvents @@ -61,3 +62,18 @@ ProgressOptions, ProgressOptions { } + +export interface BlockProvider< + NotifyProgressOptions extends ProgressOptions = ProgressOptions, + WantProgressOptions extends ProgressOptions = ProgressOptions +> { + /** + * Notify a block provider that a new block is available + */ + notify(cid: CID, block: Uint8Array, options?: NotifyProgressOptions): void + + /** + * Retrieve a block + */ + get(cid: CID, options?: AbortOptions & WantProgressOptions): Promise +} From ca3d68735fd269d38a45638ccd0c1cc87a37f3e2 Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Mon, 9 Oct 2023 16:21:20 -0700 Subject: [PATCH 2/7] feat: rank trustless gateways --- packages/helia/src/block-providers/index.ts | 2 +- .../trustless-gateway-block-provider.ts | 171 ++++++++++++------ packages/helia/src/index.ts | 4 +- packages/helia/src/utils/networked-storage.ts | 8 +- packages/interface/src/blocks.ts | 2 +- 5 files changed, 126 insertions(+), 61 deletions(-) diff --git a/packages/helia/src/block-providers/index.ts b/packages/helia/src/block-providers/index.ts index 6d5ab76f..6c67d83a 100644 --- a/packages/helia/src/block-providers/index.ts +++ b/packages/helia/src/block-providers/index.ts @@ -1,2 +1,2 @@ export { BitswapBlockProvider } from './bitswap-block-provider.js' -export { TrustedGatewayBlockProvider } from './trustless-gateway-block-provider.js' +export { TrustlessGatewayBlockProvider } from './trustless-gateway-block-provider.js' diff --git a/packages/helia/src/block-providers/trustless-gateway-block-provider.ts b/packages/helia/src/block-providers/trustless-gateway-block-provider.ts index b4ddb4f2..1378807e 100644 --- a/packages/helia/src/block-providers/trustless-gateway-block-provider.ts +++ b/packages/helia/src/block-providers/trustless-gateway-block-provider.ts @@ -6,78 +6,143 @@ import type { ProgressEvent, ProgressOptions } from 'progress-events' const log = logger('helia:trustless-gateway-block-provider') -export type TrustlessGatewayGetBlockProgressEvents = - ProgressEvent<'trustless-gateway:get-block:fetch', URL> - /** - * A BlockProvider that accepts a list of trustless gateways that are queried - * for blocks. Individual gateways are randomly chosen. + * A BlockProvider constructs instances of `TrustlessGateway` + * keeps track of the number of attempts, errors, and successes for a given + * gateway url. */ -export class TrustedGatewayBlockProvider implements BlockProvider< -ProgressOptions, -ProgressOptions -> { - private readonly gateways: URL[] +class TrustlessGateway { + public readonly url: URL + /** + * The number of times this gateway has been attempted to be used to fetch a + * block. This includes successful, errored, and aborted attempts. By counting + * even aborted attempts, slow gateways that are out-raced by others will be + * considered less reliable. + */ + #attempts = 0 - constructor (urls: string[]) { - this.gateways = urls.map(url => new URL(url.toString())) - } + /** + * The number of times this gateway has errored while attempting to fetch a + * block. This includes `response.ok === false` and any other errors that + * throw while attempting to fetch a block. + */ + #errors = 0 - notify (cid: CID, block: Uint8Array, options?: ProgressOptions): void { - // no-op + /** + * The number of times this gateway has successfully fetched a block. + */ + #successes = 0 + constructor (url: URL | string) { + this.url = url instanceof URL ? url : new URL(url) } - async get (cid: CID, options: AbortOptions & ProgressOptions = {}): Promise { - // choose a gateway - const url = this.gateways[Math.floor(Math.random() * this.gateways.length)] + /** + * Fetch a raw block from `this.url` following the specification defined at + * https://specs.ipfs.tech/http-gateways/trustless-gateway/ + */ + async getRawBlock (cid: CID, signal?: AbortSignal): Promise { + const gwUrl = this.url + gwUrl.pathname = `/ipfs/${cid.toString()}` - log('getting block for %c from %s', cid, url) + // necessary as not every gateway supports dag-cbor, but every should support + // sending raw block as-is + gwUrl.search = '?format=raw' + + if (signal?.aborted === true) { + throw new Error(`Signal to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted prior to fetch`) + } try { - const block = await getRawBlockFromGateway(url, cid, options.signal) - log('got block for %c from %s', cid, url) + this.#attempts++ + const res = await fetch(gwUrl.toString(), { + signal, + headers: { + // also set header, just in case ?format= is filtered out by some + // reverse proxy + Accept: 'application/vnd.ipld.raw' + }, + cache: 'force-cache' + }) + if (!res.ok) { + this.#errors++ + throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()}`) + } + this.#successes++ + return new Uint8Array(await res.arrayBuffer()) + } catch (cause) { + // @ts-expect-error - TS thinks signal?.aborted can only be false now + // because it was checked for true above. + if (signal?.aborted === true) { + throw new Error(`fetching raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted`) + } + this.#errors++ + throw new Error(`unable to fetch raw block for CID ${cid}`) + } + } - return block - } catch (err: any) { - log.error('failed to get block for %c from %s', cid, url, err) + /** + * Encapsulate the logic for determining whether a gateway is considered + * reliable, for prioritization. This is based on the number of successful attempts made + * and the number of errors encountered. + * + * * Unused gateways have 100% reliability + * * Gateways that have never errored have 100% reliability + */ + get reliability (): number { + // if we have never tried to use this gateway, it is considered the most + // reliable until we determine otherwise + // (prioritize unused gateways) + if (this.#attempts === 0) { + return 1 + } - throw err + // The gateway has > 0 attempts; If we have never encountered an error, consider it 100% reliable. + // Even if a gateway has no successes, it is still considered more reliable than a gateway with errors, + // because it may have been used and aborted, or beaten by another BlockProvider. + if (this.#errors === 0) { + return 1 } + + // We have encountered errors, so we need to calculate the reliability + // based on the number of attempts, errors, and successes. Gateways that + // return a single error should drop their reliability score more than a + // success increases it. + // Play around with the below reliability function at https://www.desmos.com/calculator/d6hfhf5ukm + return this.#successes / (this.#attempts + (this.#errors * 3)) } } -async function getRawBlockFromGateway (url: URL, cid: CID, signal?: AbortSignal): Promise { - const gwUrl = new URL(url) - gwUrl.pathname = `/ipfs/${cid.toString()}` +export type TrustlessGatewayGetBlockProgressEvents = + ProgressEvent<'trustless-gateway:get-block:fetch', URL> - // necessary as not every gateway supports dag-cbor, but every should support - // sending raw block as-is - gwUrl.search = '?format=raw' +/** + * A BlockProvider that accepts a list of trustless gateways that are queried + * for blocks. Individual gateways are randomly chosen. + */ +export class TrustlessGatewayBlockProvider implements BlockProvider< +ProgressOptions, +ProgressOptions +> { + private readonly gateways: TrustlessGateway[] - if (signal?.aborted === true) { - throw new Error(`Signal to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted prior to fetch`) + constructor (urls: Array) { + this.gateways = urls.map((url) => new TrustlessGateway(url)) } - try { - const res = await fetch(gwUrl.toString(), { - signal, - headers: { - // also set header, just in case ?format= is filtered out by some - // reverse proxy - Accept: 'application/vnd.ipld.raw' - }, - cache: 'force-cache' - }) - if (!res.ok) { - throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()}`) - } - return new Uint8Array(await res.arrayBuffer()) - } catch (cause) { - // @ts-expect-error - TS thinks signal?.aborted can only be false now - // because it was checked for true above. - if (signal?.aborted === true) { - throw new Error(`fetching raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted`) + async get (cid: CID, options: AbortOptions & ProgressOptions = {}): Promise { + // Loop through the gateways until we get a block or run out of gateways + for (const gateway of this.gateways.sort((a, b) => b.reliability - a.reliability)) { + log('getting block for %c from %s', cid, gateway.url) + try { + const block = await gateway.getRawBlock(cid, options.signal) + log('got block for %c from %s', cid, gateway.url) + + return block + } catch (err) { + log.error('failed to get block for %c from %s', cid, gateway.url, err) + } } - throw new Error(`unable to fetch raw block for CID ${cid}`) + + throw new Error(`unable to fetch raw block for CID ${cid} from any gateway`) } } diff --git a/packages/helia/src/index.ts b/packages/helia/src/index.ts index d2239481..c4480746 100644 --- a/packages/helia/src/index.ts +++ b/packages/helia/src/index.ts @@ -27,7 +27,7 @@ import { MemoryDatastore } from 'datastore-core' import { identity } from 'multiformats/hashes/identity' import { sha256, sha512 } from 'multiformats/hashes/sha2' import { BitswapBlockProvider } from './block-providers/bitswap-block-provider.js' -import { TrustedGatewayBlockProvider } from './block-providers/trustless-gateway-block-provider.js' +import { TrustlessGatewayBlockProvider } from './block-providers/trustless-gateway-block-provider.js' import { HeliaImpl } from './helia.js' import { createLibp2p } from './utils/libp2p.js' import { name, version } from './version.js' @@ -168,7 +168,7 @@ export async function createHelia (init: HeliaInit = {}): Promise const blockProviders = init.blockProviders ?? [ new BitswapBlockProvider(libp2p, blockstore, hashers), - new TrustedGatewayBlockProvider(DEFAULT_TRUSTLESS_GATEWAYS) + new TrustlessGatewayBlockProvider(DEFAULT_TRUSTLESS_GATEWAYS) ] const helia = new HeliaImpl({ diff --git a/packages/helia/src/utils/networked-storage.ts b/packages/helia/src/utils/networked-storage.ts index 3e0dd5c8..46da0dfe 100644 --- a/packages/helia/src/utils/networked-storage.ts +++ b/packages/helia/src/utils/networked-storage.ts @@ -71,7 +71,7 @@ export class NetworkedStorage implements Blocks, Startable { options.onProgress?.(new CustomProgressEvent('blocks:put:providers:notify', cid)) this.blockProviders.forEach(provider => { - provider.notify(cid, block, options) + provider.notify?.(cid, block, options) }) options.onProgress?.(new CustomProgressEvent('blocks:put:blockstore:put', cid)) @@ -96,7 +96,7 @@ export class NetworkedStorage implements Blocks, Startable { const notifyEach = forEach(missingBlocks, ({ cid, block }): void => { options.onProgress?.(new CustomProgressEvent('blocks:put-many:providers:notify', cid)) this.blockProviders.forEach(provider => { - provider.notify(cid, block, options) + provider.notify?.(cid, block, options) }) }) @@ -118,7 +118,7 @@ export class NetworkedStorage implements Blocks, Startable { // notify other block providers of the new block options.onProgress?.(new CustomProgressEvent('blocks:get:providers:notify', cid)) this.blockProviders.forEach(provider => { - provider.notify(cid, block, options) + provider.notify?.(cid, block, options) }) return block @@ -146,7 +146,7 @@ export class NetworkedStorage implements Blocks, Startable { // notify other block providers of the new block options.onProgress?.(new CustomProgressEvent('blocks:get-many:providers:notify', cid)) this.blockProviders.forEach(provider => { - provider.notify(cid, block, options) + provider.notify?.(cid, block, options) }) } })) diff --git a/packages/interface/src/blocks.ts b/packages/interface/src/blocks.ts index 4beba0e2..d6e1ab7e 100644 --- a/packages/interface/src/blocks.ts +++ b/packages/interface/src/blocks.ts @@ -70,7 +70,7 @@ export interface BlockProvider< /** * Notify a block provider that a new block is available */ - notify(cid: CID, block: Uint8Array, options?: NotifyProgressOptions): void + notify?(cid: CID, block: Uint8Array, options?: NotifyProgressOptions): void /** * Retrieve a block From c5253a4908c3f49daaabc769639c98218d6f1280 Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Mon, 9 Oct 2023 16:26:07 -0700 Subject: [PATCH 3/7] chore: update TrustlessGatewayBlockProvider jsdoc --- .../src/block-providers/trustless-gateway-block-provider.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/helia/src/block-providers/trustless-gateway-block-provider.ts b/packages/helia/src/block-providers/trustless-gateway-block-provider.ts index 1378807e..a385c09b 100644 --- a/packages/helia/src/block-providers/trustless-gateway-block-provider.ts +++ b/packages/helia/src/block-providers/trustless-gateway-block-provider.ts @@ -117,7 +117,8 @@ export type TrustlessGatewayGetBlockProgressEvents = /** * A BlockProvider that accepts a list of trustless gateways that are queried - * for blocks. Individual gateways are randomly chosen. + * for blocks. Gateways are queried in order of reliability, with the most + * reliable gateways being queried first. */ export class TrustlessGatewayBlockProvider implements BlockProvider< ProgressOptions, From 17eb4cc0c990c025b88a9864c87e172103f9040e Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Tue, 10 Oct 2023 10:30:56 -0700 Subject: [PATCH 4/7] refactor: use new block-broker interface --- .../trustless-gateway-block-broker.ts | 176 +++++++++++++----- .../block-providers/bitswap-block-provider.ts | 59 ------ packages/helia/src/block-providers/index.ts | 2 - .../trustless-gateway-block-provider.ts | 149 --------------- 4 files changed, 125 insertions(+), 261 deletions(-) delete mode 100644 packages/helia/src/block-providers/bitswap-block-provider.ts delete mode 100644 packages/helia/src/block-providers/index.ts delete mode 100644 packages/helia/src/block-providers/trustless-gateway-block-provider.ts diff --git a/packages/helia/src/block-brokers/trustless-gateway-block-broker.ts b/packages/helia/src/block-brokers/trustless-gateway-block-broker.ts index 23ebb4bf..ae710788 100644 --- a/packages/helia/src/block-brokers/trustless-gateway-block-broker.ts +++ b/packages/helia/src/block-brokers/trustless-gateway-block-broker.ts @@ -4,75 +4,149 @@ import type { AbortOptions } from 'interface-store' import type { CID } from 'multiformats/cid' import type { ProgressEvent, ProgressOptions } from 'progress-events' -const log = logger('helia:trustless-gateway-block-provider') - -export type TrustlessGatewayGetBlockProgressEvents = - ProgressEvent<'trustless-gateway:get-block:fetch', URL> +const log = logger('helia:trustless-gateway-block-broker') /** - * A class that accepts a list of trustless gateways that are queried - * for blocks. + * A BlockProvider constructs instances of `TrustlessGateway` + * keeps track of the number of attempts, errors, and successes for a given + * gateway url. */ -export class TrustedGatewayBlockBroker implements BlockRetriever< -ProgressOptions -> { - private readonly gateways: URL[] +class TrustlessGateway { + public readonly url: URL + /** + * The number of times this gateway has been attempted to be used to fetch a + * block. This includes successful, errored, and aborted attempts. By counting + * even aborted attempts, slow gateways that are out-raced by others will be + * considered less reliable. + */ + #attempts = 0 - constructor (urls: Array) { - this.gateways = urls.map(url => new URL(url.toString())) + /** + * The number of times this gateway has errored while attempting to fetch a + * block. This includes `response.ok === false` and any other errors that + * throw while attempting to fetch a block. + */ + #errors = 0 + + /** + * The number of times this gateway has successfully fetched a block. + */ + #successes = 0 + constructor (url: URL | string) { + this.url = url instanceof URL ? url : new URL(url) } - async retrieve (cid: CID, options: AbortOptions & ProgressOptions = {}): Promise { - // choose a gateway - const url = this.gateways[Math.floor(Math.random() * this.gateways.length)] + /** + * Fetch a raw block from `this.url` following the specification defined at + * https://specs.ipfs.tech/http-gateways/trustless-gateway/ + */ + async getRawBlock (cid: CID, signal?: AbortSignal): Promise { + const gwUrl = this.url + gwUrl.pathname = `/ipfs/${cid.toString()}` - log('getting block for %c from %s', cid, url) + // necessary as not every gateway supports dag-cbor, but every should support + // sending raw block as-is + gwUrl.search = '?format=raw' - try { - const block = await getRawBlockFromGateway(url, cid, options.signal) - log('got block for %c from %s', cid, url) + if (signal?.aborted === true) { + throw new Error(`Signal to fetch raw block for CID ${cid} from gateway ${this.url} was aborted prior to fetch`) + } - return block - } catch (err: any) { - log.error('failed to get block for %c from %s', cid, url, err) + try { + this.#attempts++ + const res = await fetch(gwUrl.toString(), { + signal, + headers: { + // also set header, just in case ?format= is filtered out by some + // reverse proxy + Accept: 'application/vnd.ipld.raw' + }, + cache: 'force-cache' + }) + if (!res.ok) { + this.#errors++ + throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${this.url}`) + } + this.#successes++ + return new Uint8Array(await res.arrayBuffer()) + } catch (cause) { + // @ts-expect-error - TS thinks signal?.aborted can only be false now + // because it was checked for true above. + if (signal?.aborted === true) { + throw new Error(`fetching raw block for CID ${cid} from gateway ${this.url} was aborted`) + } + this.#errors++ + throw new Error(`unable to fetch raw block for CID ${cid}`) + } + } - throw err + /** + * Encapsulate the logic for determining whether a gateway is considered + * reliable, for prioritization. This is based on the number of successful attempts made + * and the number of errors encountered. + * + * * Unused gateways have 100% reliability + * * Gateways that have never errored have 100% reliability + */ + get reliability (): number { + // if we have never tried to use this gateway, it is considered the most + // reliable until we determine otherwise + // (prioritize unused gateways) + if (this.#attempts === 0) { + return 1 } + + // We have attempted the gateway, so we need to calculate the reliability + // based on the number of attempts, errors, and successes. Gateways that + // return a single error should drop their reliability score more than a + // success increases it. + // Play around with the below reliability function at https://www.desmos.com/calculator/d6hfhf5ukm + return this.#successes / (this.#attempts + (this.#errors * 3)) } } -async function getRawBlockFromGateway (url: URL, cid: CID, signal?: AbortSignal): Promise { - const gwUrl = new URL(url) - gwUrl.pathname = `/ipfs/${cid.toString()}` +export type TrustlessGatewayGetBlockProgressEvents = + ProgressEvent<'trustless-gateway:get-block:fetch', URL> - // necessary as not every gateway supports dag-cbor, but every should support - // sending raw block as-is - gwUrl.search = '?format=raw' +/** + * A class that accepts a list of trustless gateways that are queried + * for blocks. + */ +export class TrustedGatewayBlockBroker implements BlockRetriever< +ProgressOptions +> { + private readonly gateways: TrustlessGateway[] - if (signal?.aborted === true) { - throw new Error(`Signal to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted prior to fetch`) + constructor (urls: Array) { + this.gateways = urls.map((url) => new TrustlessGateway(url)) } - try { - const res = await fetch(gwUrl.toString(), { - signal, - headers: { - // also set header, just in case ?format= is filtered out by some - // reverse proxy - Accept: 'application/vnd.ipld.raw' - }, - cache: 'force-cache' - }) - if (!res.ok) { - throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()}`) - } - return new Uint8Array(await res.arrayBuffer()) - } catch (cause) { - // @ts-expect-error - TS thinks signal?.aborted can only be false now - // because it was checked for true above. - if (signal?.aborted === true) { - throw new Error(`fetching raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted`) + async retrieve (cid: CID, options: AbortOptions & ProgressOptions = {}): Promise { + // Loop through the gateways until we get a block or run out of gateways + const sortedGateways = this.gateways.sort((a, b) => b.reliability - a.reliability) + const aggregateErrors: Error[] = [] + for (const gateway of sortedGateways) { + log('getting block for %c from %s', cid, gateway.url) + try { + const block = await gateway.getRawBlock(cid, options.signal) + log.trace('got block for %c from %s', cid, gateway.url) + + return block + } catch (err: unknown) { + log.error('failed to get block for %c from %s', cid, gateway.url, err) + if (err instanceof Error) { + aggregateErrors.push(err) + } else { + aggregateErrors.push(new Error(`unable to fetch raw block for CID ${cid} from gateway ${gateway.url}`)) + } + // if signal was aborted, exit the loop + if (options.signal?.aborted === true) { + log.trace('request aborted while fetching raw block for CID %c from gateway %s', cid, gateway.url) + break + } + } } - throw new Error(`unable to fetch raw block for CID ${cid}`) + + throw new AggregateError(aggregateErrors, `unable to fetch raw block for CID ${cid} from any gateway`) } } diff --git a/packages/helia/src/block-providers/bitswap-block-provider.ts b/packages/helia/src/block-providers/bitswap-block-provider.ts deleted file mode 100644 index 2da48251..00000000 --- a/packages/helia/src/block-providers/bitswap-block-provider.ts +++ /dev/null @@ -1,59 +0,0 @@ -import { createBitswap } from 'ipfs-bitswap' -import type { BlockProvider } from '@helia/interface/blocks' -import type { Libp2p } from '@libp2p/interface' -import type { Startable } from '@libp2p/interface/startable' -import type { Blockstore } from 'interface-blockstore' -import type { AbortOptions } from 'interface-store' -import type { Bitswap, BitswapNotifyProgressEvents, BitswapWantBlockProgressEvents } from 'ipfs-bitswap' -import type { CID } from 'multiformats/cid' -import type { MultihashHasher } from 'multiformats/hashes/interface' -import type { ProgressOptions } from 'progress-events' - -export class BitswapBlockProvider implements BlockProvider< -ProgressOptions, -ProgressOptions ->, Startable { - private readonly bitswap: Bitswap - private started: boolean - - constructor (libp2p: Libp2p, blockstore: Blockstore, hashers: MultihashHasher[]) { - this.bitswap = createBitswap(libp2p, blockstore, { - hashLoader: { - getHasher: async (codecOrName: string | number): Promise> => { - const hasher = hashers.find(hasher => { - return hasher.code === codecOrName || hasher.name === codecOrName - }) - - if (hasher != null) { - return hasher - } - - throw new Error(`Could not load hasher for code/name "${codecOrName}"`) - } - } - }) - this.started = false - } - - isStarted (): boolean { - return this.started - } - - async start (): Promise { - await this.bitswap.start() - this.started = true - } - - async stop (): Promise { - await this.bitswap.stop() - this.started = false - } - - notify (cid: CID, block: Uint8Array, options?: ProgressOptions): void { - this.bitswap.notify(cid, block, options) - } - - async get (cid: CID, options?: AbortOptions & ProgressOptions): Promise { - return this.bitswap.want(cid, options) - } -} diff --git a/packages/helia/src/block-providers/index.ts b/packages/helia/src/block-providers/index.ts deleted file mode 100644 index 6c67d83a..00000000 --- a/packages/helia/src/block-providers/index.ts +++ /dev/null @@ -1,2 +0,0 @@ -export { BitswapBlockProvider } from './bitswap-block-provider.js' -export { TrustlessGatewayBlockProvider } from './trustless-gateway-block-provider.js' diff --git a/packages/helia/src/block-providers/trustless-gateway-block-provider.ts b/packages/helia/src/block-providers/trustless-gateway-block-provider.ts deleted file mode 100644 index a385c09b..00000000 --- a/packages/helia/src/block-providers/trustless-gateway-block-provider.ts +++ /dev/null @@ -1,149 +0,0 @@ -import { logger } from '@libp2p/logger' -import type { BlockProvider } from '@helia/interface/blocks' -import type { AbortOptions } from 'interface-store' -import type { CID } from 'multiformats/cid' -import type { ProgressEvent, ProgressOptions } from 'progress-events' - -const log = logger('helia:trustless-gateway-block-provider') - -/** - * A BlockProvider constructs instances of `TrustlessGateway` - * keeps track of the number of attempts, errors, and successes for a given - * gateway url. - */ -class TrustlessGateway { - public readonly url: URL - /** - * The number of times this gateway has been attempted to be used to fetch a - * block. This includes successful, errored, and aborted attempts. By counting - * even aborted attempts, slow gateways that are out-raced by others will be - * considered less reliable. - */ - #attempts = 0 - - /** - * The number of times this gateway has errored while attempting to fetch a - * block. This includes `response.ok === false` and any other errors that - * throw while attempting to fetch a block. - */ - #errors = 0 - - /** - * The number of times this gateway has successfully fetched a block. - */ - #successes = 0 - constructor (url: URL | string) { - this.url = url instanceof URL ? url : new URL(url) - } - - /** - * Fetch a raw block from `this.url` following the specification defined at - * https://specs.ipfs.tech/http-gateways/trustless-gateway/ - */ - async getRawBlock (cid: CID, signal?: AbortSignal): Promise { - const gwUrl = this.url - gwUrl.pathname = `/ipfs/${cid.toString()}` - - // necessary as not every gateway supports dag-cbor, but every should support - // sending raw block as-is - gwUrl.search = '?format=raw' - - if (signal?.aborted === true) { - throw new Error(`Signal to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted prior to fetch`) - } - - try { - this.#attempts++ - const res = await fetch(gwUrl.toString(), { - signal, - headers: { - // also set header, just in case ?format= is filtered out by some - // reverse proxy - Accept: 'application/vnd.ipld.raw' - }, - cache: 'force-cache' - }) - if (!res.ok) { - this.#errors++ - throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()}`) - } - this.#successes++ - return new Uint8Array(await res.arrayBuffer()) - } catch (cause) { - // @ts-expect-error - TS thinks signal?.aborted can only be false now - // because it was checked for true above. - if (signal?.aborted === true) { - throw new Error(`fetching raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted`) - } - this.#errors++ - throw new Error(`unable to fetch raw block for CID ${cid}`) - } - } - - /** - * Encapsulate the logic for determining whether a gateway is considered - * reliable, for prioritization. This is based on the number of successful attempts made - * and the number of errors encountered. - * - * * Unused gateways have 100% reliability - * * Gateways that have never errored have 100% reliability - */ - get reliability (): number { - // if we have never tried to use this gateway, it is considered the most - // reliable until we determine otherwise - // (prioritize unused gateways) - if (this.#attempts === 0) { - return 1 - } - - // The gateway has > 0 attempts; If we have never encountered an error, consider it 100% reliable. - // Even if a gateway has no successes, it is still considered more reliable than a gateway with errors, - // because it may have been used and aborted, or beaten by another BlockProvider. - if (this.#errors === 0) { - return 1 - } - - // We have encountered errors, so we need to calculate the reliability - // based on the number of attempts, errors, and successes. Gateways that - // return a single error should drop their reliability score more than a - // success increases it. - // Play around with the below reliability function at https://www.desmos.com/calculator/d6hfhf5ukm - return this.#successes / (this.#attempts + (this.#errors * 3)) - } -} - -export type TrustlessGatewayGetBlockProgressEvents = - ProgressEvent<'trustless-gateway:get-block:fetch', URL> - -/** - * A BlockProvider that accepts a list of trustless gateways that are queried - * for blocks. Gateways are queried in order of reliability, with the most - * reliable gateways being queried first. - */ -export class TrustlessGatewayBlockProvider implements BlockProvider< -ProgressOptions, -ProgressOptions -> { - private readonly gateways: TrustlessGateway[] - - constructor (urls: Array) { - this.gateways = urls.map((url) => new TrustlessGateway(url)) - } - - async get (cid: CID, options: AbortOptions & ProgressOptions = {}): Promise { - // Loop through the gateways until we get a block or run out of gateways - for (const gateway of this.gateways.sort((a, b) => b.reliability - a.reliability)) { - log('getting block for %c from %s', cid, gateway.url) - try { - const block = await gateway.getRawBlock(cid, options.signal) - log('got block for %c from %s', cid, gateway.url) - - return block - } catch (err) { - log.error('failed to get block for %c from %s', cid, gateway.url, err) - } - } - - throw new Error(`unable to fetch raw block for CID ${cid} from any gateway`) - } -} From e1656e41eecc78449ee201e654cf4438307b30e2 Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Tue, 10 Oct 2023 10:31:51 -0700 Subject: [PATCH 5/7] chore: remove any-signal from monorepo deps --- package.json | 3 --- 1 file changed, 3 deletions(-) diff --git a/package.json b/package.json index 6e5a2c76..bda639ad 100644 --- a/package.json +++ b/package.json @@ -36,9 +36,6 @@ "docs": "NODE_OPTIONS=--max_old_space_size=8192 aegir docs -- --exclude packages/interop --excludeExternals", "docs:no-publish": "NODE_OPTIONS=--max_old_space_size=8192 aegir docs --publish false -- --exclude packages/interop" }, - "dependencies": { - "any-signal": "^4.1.1" - }, "devDependencies": { "aegir": "^41.0.0", "npm-run-all": "^4.1.5", From e91f91783016246f65b5627a94e5c7880a98fcdc Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Tue, 10 Oct 2023 10:40:36 -0700 Subject: [PATCH 6/7] chore: cleanup trustless-gateway-block-broker --- .../trustless-gateway-block-broker.ts | 35 +++++++++++-------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/packages/helia/src/block-brokers/trustless-gateway-block-broker.ts b/packages/helia/src/block-brokers/trustless-gateway-block-broker.ts index ae710788..fb0c3ced 100644 --- a/packages/helia/src/block-brokers/trustless-gateway-block-broker.ts +++ b/packages/helia/src/block-brokers/trustless-gateway-block-broker.ts @@ -7,9 +7,10 @@ import type { ProgressEvent, ProgressOptions } from 'progress-events' const log = logger('helia:trustless-gateway-block-broker') /** - * A BlockProvider constructs instances of `TrustlessGateway` - * keeps track of the number of attempts, errors, and successes for a given - * gateway url. + * A `TrustlessGateway` keeps track of the number of attempts, errors, and + * successes for a given gateway url so that we can prioritize gateways that + * have been more reliable in the past, and ensure that requests are distributed + * across all gateways within a given `TrustedGatewayBlockBroker` instance. */ class TrustlessGateway { public readonly url: URL @@ -24,7 +25,8 @@ class TrustlessGateway { /** * The number of times this gateway has errored while attempting to fetch a * block. This includes `response.ok === false` and any other errors that - * throw while attempting to fetch a block. + * throw while attempting to fetch a block. This does not include aborted + * attempts. */ #errors = 0 @@ -32,6 +34,7 @@ class TrustlessGateway { * The number of times this gateway has successfully fetched a block. */ #successes = 0 + constructor (url: URL | string) { this.url = url instanceof URL ? url : new URL(url) } @@ -85,22 +88,26 @@ class TrustlessGateway { * reliable, for prioritization. This is based on the number of successful attempts made * and the number of errors encountered. * - * * Unused gateways have 100% reliability - * * Gateways that have never errored have 100% reliability + * Unused gateways have 100% reliability; They will be prioritized over + * gateways with a 100% success rate to ensure that we attempt all gateways. */ get reliability (): number { - // if we have never tried to use this gateway, it is considered the most - // reliable until we determine otherwise - // (prioritize unused gateways) + /** + * if we have never tried to use this gateway, it is considered the most + * reliable until we determine otherwise (prioritize unused gateways) + */ if (this.#attempts === 0) { return 1 } - // We have attempted the gateway, so we need to calculate the reliability - // based on the number of attempts, errors, and successes. Gateways that - // return a single error should drop their reliability score more than a - // success increases it. - // Play around with the below reliability function at https://www.desmos.com/calculator/d6hfhf5ukm + /** + * We have attempted the gateway, so we need to calculate the reliability + * based on the number of attempts, errors, and successes. Gateways that + * return a single error should drop their reliability score more than a + * single success increases it. + * + * Play around with the below reliability function at https://www.desmos.com/calculator/d6hfhf5ukm + */ return this.#successes / (this.#attempts + (this.#errors * 3)) } } From a3260ff10b71143678df980d7f2fd6e45608d34a Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Thu, 12 Oct 2023 19:04:17 -0700 Subject: [PATCH 7/7] feat: BlockBroker factory support (#284) --- packages/helia/.aegir.js | 4 + .../src/block-brokers/bitswap-block-broker.ts | 14 +- packages/helia/src/block-brokers/index.ts | 4 +- .../trustless-gateway-block-broker.ts | 53 ++++-- packages/helia/src/helia.ts | 2 + packages/helia/src/index.ts | 20 ++- packages/helia/src/utils/networked-storage.ts | 44 +++-- .../test/block-brokers/block-broker.spec.ts | 136 ++++++++++++++++ .../trustless-gateway-block-broker.spec.ts | 151 ++++++++++++++++++ packages/helia/test/fixtures/create-helia.ts | 4 + .../helia/test/pins.depth-limited.spec.ts | 1 + packages/helia/test/pins.recursive.spec.ts | 1 + .../test/utils/networked-storage.spec.ts | 2 +- packages/interface/src/blocks.ts | 12 +- packages/interface/src/index.ts | 18 ++- .../test/fixtures/create-helia.browser.ts | 4 + .../interop/test/fixtures/create-helia.ts | 4 + 17 files changed, 438 insertions(+), 36 deletions(-) create mode 100644 packages/helia/test/block-brokers/block-broker.spec.ts create mode 100644 packages/helia/test/block-brokers/trustless-gateway-block-broker.spec.ts diff --git a/packages/helia/.aegir.js b/packages/helia/.aegir.js index 6361cc84..fa0c2d9d 100644 --- a/packages/helia/.aegir.js +++ b/packages/helia/.aegir.js @@ -11,8 +11,12 @@ const options = { before: async () => { // use dynamic import otherwise the source may not have been built yet const { createHelia } = await import('./dist/src/index.js') + const { BitswapBlockBrokerFactory } = await import('./dist/src/block-brokers/index.js') const helia = await createHelia({ + blockBrokers: [ + BitswapBlockBrokerFactory + ], libp2p: { addresses: { listen: [ diff --git a/packages/helia/src/block-brokers/bitswap-block-broker.ts b/packages/helia/src/block-brokers/bitswap-block-broker.ts index 20e07f24..96f29b01 100644 --- a/packages/helia/src/block-brokers/bitswap-block-broker.ts +++ b/packages/helia/src/block-brokers/bitswap-block-broker.ts @@ -1,9 +1,9 @@ import { createBitswap } from 'ipfs-bitswap' -import type { BlockAnnouncer, BlockRetriever } from '@helia/interface/blocks' +import type { BlockBrokerFactoryFunction } from '@helia/interface' +import type { BlockAnnouncer, BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks' import type { Libp2p } from '@libp2p/interface' import type { Startable } from '@libp2p/interface/startable' import type { Blockstore } from 'interface-blockstore' -import type { AbortOptions } from 'interface-store' import type { Bitswap, BitswapNotifyProgressEvents, BitswapWantBlockProgressEvents } from 'ipfs-bitswap' import type { CID } from 'multiformats/cid' import type { MultihashHasher } from 'multiformats/hashes/interface' @@ -52,7 +52,15 @@ ProgressOptions this.bitswap.notify(cid, block, options) } - async retrieve (cid: CID, options?: AbortOptions & ProgressOptions): Promise { + async retrieve (cid: CID, { validateFn, ...options }: BlockRetrievalOptions> = {}): Promise { return this.bitswap.want(cid, options) } } + +/** + * A helper factory for users who want to override Helia `blockBrokers` but + * still want to use the default `BitswapBlockBroker`. + */ +export const BitswapBlockBrokerFactory: BlockBrokerFactoryFunction = (components): BitswapBlockBroker => { + return new BitswapBlockBroker(components.libp2p, components.blockstore, components.hashers) +} diff --git a/packages/helia/src/block-brokers/index.ts b/packages/helia/src/block-brokers/index.ts index 58de6077..90dc1151 100644 --- a/packages/helia/src/block-brokers/index.ts +++ b/packages/helia/src/block-brokers/index.ts @@ -1,2 +1,2 @@ -export { BitswapBlockBroker } from './bitswap-block-broker.js' -export { TrustedGatewayBlockBroker } from './trustless-gateway-block-broker.js' +export { BitswapBlockBroker, BitswapBlockBrokerFactory } from './bitswap-block-broker.js' +export { TrustlessGatewayBlockBroker } from './trustless-gateway-block-broker.js' diff --git a/packages/helia/src/block-brokers/trustless-gateway-block-broker.ts b/packages/helia/src/block-brokers/trustless-gateway-block-broker.ts index fb0c3ced..29ecaace 100644 --- a/packages/helia/src/block-brokers/trustless-gateway-block-broker.ts +++ b/packages/helia/src/block-brokers/trustless-gateway-block-broker.ts @@ -1,6 +1,5 @@ import { logger } from '@libp2p/logger' -import type { BlockRetriever } from '@helia/interface/blocks' -import type { AbortOptions } from 'interface-store' +import type { BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks' import type { CID } from 'multiformats/cid' import type { ProgressEvent, ProgressOptions } from 'progress-events' @@ -10,9 +9,9 @@ const log = logger('helia:trustless-gateway-block-broker') * A `TrustlessGateway` keeps track of the number of attempts, errors, and * successes for a given gateway url so that we can prioritize gateways that * have been more reliable in the past, and ensure that requests are distributed - * across all gateways within a given `TrustedGatewayBlockBroker` instance. + * across all gateways within a given `TrustlessGatewayBlockBroker` instance. */ -class TrustlessGateway { +export class TrustlessGateway { public readonly url: URL /** * The number of times this gateway has been attempted to be used to fetch a @@ -30,6 +29,13 @@ class TrustlessGateway { */ #errors = 0 + /** + * The number of times this gateway has returned an invalid block. A gateway + * that returns the wrong blocks for a CID should be considered for removal + * from the list of gateways to fetch blocks from. + */ + #invalidBlocks = 0 + /** * The number of times this gateway has successfully fetched a block. */ @@ -91,7 +97,7 @@ class TrustlessGateway { * Unused gateways have 100% reliability; They will be prioritized over * gateways with a 100% success rate to ensure that we attempt all gateways. */ - get reliability (): number { + reliability (): number { /** * if we have never tried to use this gateway, it is considered the most * reliable until we determine otherwise (prioritize unused gateways) @@ -100,6 +106,11 @@ class TrustlessGateway { return 1 } + if (this.#invalidBlocks > 0) { + // this gateway may not be trustworthy.. + return -Infinity + } + /** * We have attempted the gateway, so we need to calculate the reliability * based on the number of attempts, errors, and successes. Gateways that @@ -110,6 +121,13 @@ class TrustlessGateway { */ return this.#successes / (this.#attempts + (this.#errors * 3)) } + + /** + * Increment the number of invalid blocks returned by this gateway. + */ + incrementInvalidBlocks (): void { + this.#invalidBlocks++ + } } export type TrustlessGatewayGetBlockProgressEvents = @@ -119,24 +137,39 @@ export type TrustlessGatewayGetBlockProgressEvents = * A class that accepts a list of trustless gateways that are queried * for blocks. */ -export class TrustedGatewayBlockBroker implements BlockRetriever< +export class TrustlessGatewayBlockBroker implements BlockRetriever< ProgressOptions > { private readonly gateways: TrustlessGateway[] - constructor (urls: Array) { - this.gateways = urls.map((url) => new TrustlessGateway(url)) + constructor (gatewaysOrUrls: Array) { + this.gateways = gatewaysOrUrls.map((gatewayOrUrl) => { + if (gatewayOrUrl instanceof TrustlessGateway || Object.prototype.hasOwnProperty.call(gatewayOrUrl, 'getRawBlock')) { + return gatewayOrUrl as TrustlessGateway + } + // eslint-disable-next-line no-console + console.trace('creating new TrustlessGateway for %s', gatewayOrUrl) + return new TrustlessGateway(gatewayOrUrl) + }) } - async retrieve (cid: CID, options: AbortOptions & ProgressOptions = {}): Promise { + async retrieve (cid: CID, options: BlockRetrievalOptions> = {}): Promise { // Loop through the gateways until we get a block or run out of gateways - const sortedGateways = this.gateways.sort((a, b) => b.reliability - a.reliability) + const sortedGateways = this.gateways.sort((a, b) => b.reliability() - a.reliability()) const aggregateErrors: Error[] = [] for (const gateway of sortedGateways) { log('getting block for %c from %s', cid, gateway.url) try { const block = await gateway.getRawBlock(cid, options.signal) log.trace('got block for %c from %s', cid, gateway.url) + try { + await options.validateFn?.(block) + } catch (err) { + log.error('failed to validate block for %c from %s', cid, gateway.url, err) + gateway.incrementInvalidBlocks() + + throw new Error(`unable to validate block for CID ${cid} from gateway ${gateway.url}`) + } return block } catch (err: unknown) { diff --git a/packages/helia/src/helia.ts b/packages/helia/src/helia.ts index 9dc64446..b0735d8d 100644 --- a/packages/helia/src/helia.ts +++ b/packages/helia/src/helia.ts @@ -1,3 +1,4 @@ +import { type BlockBroker } from '@helia/interface/blocks' import { start, stop } from '@libp2p/interface/startable' import { logger } from '@libp2p/logger' import drain from 'it-drain' @@ -19,6 +20,7 @@ const log = logger('helia') interface HeliaImplInit extends HeliaInit { libp2p: T blockstore: Blockstore + blockBrokers: BlockBroker[] datastore: Datastore } diff --git a/packages/helia/src/index.ts b/packages/helia/src/index.ts index 86beb8ae..6334eaf9 100644 --- a/packages/helia/src/index.ts +++ b/packages/helia/src/index.ts @@ -24,13 +24,13 @@ import { logger } from '@libp2p/logger' import { MemoryBlockstore } from 'blockstore-core' import { MemoryDatastore } from 'datastore-core' -import { BitswapBlockBroker, TrustedGatewayBlockBroker } from './block-brokers/index.js' +import { BitswapBlockBroker, TrustlessGatewayBlockBroker } from './block-brokers/index.js' import { HeliaImpl } from './helia.js' import { defaultHashers } from './utils/default-hashers.js' import { createLibp2p } from './utils/libp2p.js' import { name, version } from './version.js' import type { DefaultLibp2pServices } from './utils/libp2p-defaults.js' -import type { Helia } from '@helia/interface' +import type { Helia, BlockBrokerFactoryFunction } from '@helia/interface' import type { BlockBroker } from '@helia/interface/blocks' import type { Libp2p } from '@libp2p/interface' import type { Blockstore } from 'interface-blockstore' @@ -98,7 +98,7 @@ export interface HeliaInit { * A list of strategies used to fetch blocks when they are not present in * the local blockstore */ - blockBrokers?: BlockBroker[] + blockBrokers?: Array /** * Pass `false` to not start the Helia node @@ -159,9 +159,19 @@ export async function createHelia (init: HeliaInit = {}): Promise const hashers = defaultHashers(init.hashers) - const blockBrokers = init.blockBrokers ?? [ + const blockBrokers: BlockBroker[] = init.blockBrokers?.map((blockBroker: BlockBroker | BlockBrokerFactoryFunction): BlockBroker => { + if (typeof blockBroker !== 'function') { + return blockBroker satisfies BlockBroker + } + return blockBroker({ + blockstore, + datastore, + libp2p, + hashers + }) satisfies BlockBroker + }) ?? [ new BitswapBlockBroker(libp2p, blockstore, hashers), - new TrustedGatewayBlockBroker(DEFAULT_TRUSTLESS_GATEWAYS) + new TrustlessGatewayBlockBroker(DEFAULT_TRUSTLESS_GATEWAYS) ] const helia = new HeliaImpl({ diff --git a/packages/helia/src/utils/networked-storage.ts b/packages/helia/src/utils/networked-storage.ts index 871f10af..bf02e5ed 100644 --- a/packages/helia/src/utils/networked-storage.ts +++ b/packages/helia/src/utils/networked-storage.ts @@ -6,7 +6,7 @@ import filter from 'it-filter' import forEach from 'it-foreach' import { CustomProgressEvent, type ProgressOptions } from 'progress-events' import { equals as uint8ArrayEquals } from 'uint8arrays/equals' -import type { BlockBroker, Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions, BlockRetriever, BlockAnnouncer } from '@helia/interface/blocks' +import type { BlockBroker, Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions, BlockRetriever, BlockAnnouncer, BlockRetrievalOptions } from '@helia/interface/blocks' import type { AbortOptions } from '@libp2p/interface' import type { Blockstore } from 'interface-blockstore' import type { AwaitIterable } from 'interface-store' @@ -196,17 +196,31 @@ export class NetworkedStorage implements Blocks, Startable { } } -/** - * Race block providers cancelling any pending requests once the block has been - * found. - */ -async function raceBlockRetrievers (cid: CID, providers: BlockRetriever[], hashers: MultihashHasher[], options: AbortOptions): Promise { +export const getCidBlockVerifierFunction = (cid: CID, hashers: MultihashHasher[]): Required['validateFn'] => { const hasher = hashers.find(hasher => hasher.code === cid.multihash.code) if (hasher == null) { throw new CodeError(`No hasher configured for multihash code 0x${cid.multihash.code.toString(16)}, please configure one. You can look up which hash this is at https://github.com/multiformats/multicodec/blob/master/table.csv`, 'ERR_UNKNOWN_HASH_ALG') } + return async (block: Uint8Array): Promise => { + // verify block + const hash = await hasher.digest(block) + + if (!uint8ArrayEquals(hash.digest, cid.multihash.digest)) { + // if a hash mismatch occurs for a TrustlessGatewayBlockBroker, we should try another gateway + throw new CodeError('Hash of downloaded block did not match multihash from passed CID', 'ERR_HASH_MISMATCH') + } + } +} + +/** + * Race block providers cancelling any pending requests once the block has been + * found. + */ +async function raceBlockRetrievers (cid: CID, providers: BlockRetriever[], hashers: MultihashHasher[], options: AbortOptions): Promise { + const validateFn = getCidBlockVerifierFunction(cid, hashers) + const controller = new AbortController() const signal = anySignal([controller.signal, options.signal]) @@ -214,21 +228,25 @@ async function raceBlockRetrievers (cid: CID, providers: BlockRetriever[], hashe return await Promise.any( providers.map(async provider => { try { + let blocksWereValidated = false const block = await provider.retrieve(cid, { ...options, - signal + signal, + validateFn: async (block: Uint8Array): Promise => { + await validateFn(block) + blocksWereValidated = true + } }) - // verify block - const hash = await hasher.digest(block) - - if (!uint8ArrayEquals(hash.digest, cid.multihash.digest)) { - throw new CodeError('Hash of downloaded block did not match multihash from passed CID', 'ERR_HASH_MISMATCH') + if (!blocksWereValidated) { + // the blockBroker either did not throw an error when attempting to validate the block + // or did not call the validateFn at all. We should validate the block ourselves + await validateFn(block) } return block } catch (err) { - log.error('could not retrieve block for %c', cid, err) + log.error('could not retrieve verified block for %c', cid, err) throw err } }) diff --git a/packages/helia/test/block-brokers/block-broker.spec.ts b/packages/helia/test/block-brokers/block-broker.spec.ts new file mode 100644 index 00000000..36293bd1 --- /dev/null +++ b/packages/helia/test/block-brokers/block-broker.spec.ts @@ -0,0 +1,136 @@ +/* eslint-env mocha */ + +import { expect } from 'aegir/chai' +import { MemoryBlockstore } from 'blockstore-core' +import delay from 'delay' +import all from 'it-all' +import * as raw from 'multiformats/codecs/raw' +import Sinon from 'sinon' +import { type StubbedInstance, stubInterface } from 'sinon-ts' +import { defaultHashers } from '../../src/utils/default-hashers.js' +import { NetworkedStorage } from '../../src/utils/networked-storage.js' +import { createBlock } from '../fixtures/create-block.js' +import type { BitswapBlockBroker, TrustlessGatewayBlockBroker } from '../../src/block-brokers/index.js' +import type { Blockstore } from 'interface-blockstore' +import type { CID } from 'multiformats/cid' + +describe('block-provider', () => { + let storage: NetworkedStorage + let blockstore: Blockstore + let bitswapBlockBroker: StubbedInstance + let blocks: Array<{ cid: CID, block: Uint8Array }> + let gatewayBlockBroker: StubbedInstance + + beforeEach(async () => { + blocks = [] + + for (let i = 0; i < 10; i++) { + blocks.push(await createBlock(raw.code, Uint8Array.from([0, 1, 2, i]))) + } + + blockstore = new MemoryBlockstore() + bitswapBlockBroker = stubInterface() + gatewayBlockBroker = stubInterface() + storage = new NetworkedStorage(blockstore, { + blockBrokers: [ + bitswapBlockBroker, + gatewayBlockBroker + ], + hashers: defaultHashers() + }) + }) + + it('gets a block from the gatewayBlockBroker when it is not in the blockstore', async () => { + const { cid, block } = blocks[0] + + gatewayBlockBroker.retrieve.withArgs(cid, Sinon.match.any).resolves(block) + + expect(await blockstore.has(cid)).to.be.false() + + const returned = await storage.get(cid) + + expect(await blockstore.has(cid)).to.be.true() + expect(returned).to.equalBytes(block) + expect(gatewayBlockBroker.retrieve.calledWith(cid)).to.be.true() + }) + + it('gets many blocks from gatewayBlockBroker when they are not in the blockstore', async () => { + const count = 5 + + for (let i = 0; i < count; i++) { + const { cid, block } = blocks[i] + gatewayBlockBroker.retrieve.withArgs(cid, Sinon.match.any).resolves(block) + + expect(await blockstore.has(cid)).to.be.false() + } + + const retrieved = await all(storage.getMany(async function * () { + for (let i = 0; i < count; i++) { + yield blocks[i].cid + await delay(10) + } + }())) + + expect(retrieved).to.deep.equal(new Array(count).fill(0).map((_, i) => blocks[i])) + + for (let i = 0; i < count; i++) { + const { cid } = blocks[i] + expect(gatewayBlockBroker.retrieve.calledWith(cid)).to.be.true() + expect(await blockstore.has(cid)).to.be.true() + } + }) + + it('gets some blocks from gatewayBlockBroker when they are not in the blockstore', async () => { + const count = 5 + + // blocks 0,1,3,4 are in the blockstore + await blockstore.put(blocks[0].cid, blocks[0].block) + await blockstore.put(blocks[1].cid, blocks[1].block) + await blockstore.put(blocks[3].cid, blocks[3].block) + await blockstore.put(blocks[4].cid, blocks[4].block) + + // block #2 comes from gatewayBlockBroker but slowly + gatewayBlockBroker.retrieve.withArgs(blocks[2].cid).callsFake(async () => { + await delay(100) + return blocks[2].block + }) + + const retrieved = await all(storage.getMany(async function * () { + for (let i = 0; i < count; i++) { + yield blocks[i].cid + await delay(10) + } + }())) + + expect(retrieved).to.deep.equal(new Array(count).fill(0).map((_, i) => blocks[i])) + + for (let i = 0; i < count; i++) { + expect(await blockstore.has(blocks[i].cid)).to.be.true() + } + }) + + it('handles incorrect bytes from a gateway', async () => { + const { cid } = blocks[0] + const block = blocks[1].block + storage = new NetworkedStorage(blockstore, { + blockBrokers: [ + gatewayBlockBroker + ], + hashers: defaultHashers() + }) + + gatewayBlockBroker.retrieve.withArgs(cid, Sinon.match.any).resolves(block) + + expect(await blockstore.has(cid)).to.be.false() + + try { + await storage.get(cid) + throw new Error('should have thrown') + } catch (err) { + const error = err as AggregateError & { errors: Error & { code: string } } + expect(error).to.be.an('error') + expect(error.errors).to.be.an('array').with.lengthOf(1) + expect(error.errors[0]).to.be.an('error').with.property('code', 'ERR_HASH_MISMATCH') + } + }) +}) diff --git a/packages/helia/test/block-brokers/trustless-gateway-block-broker.spec.ts b/packages/helia/test/block-brokers/trustless-gateway-block-broker.spec.ts new file mode 100644 index 00000000..b12a70df --- /dev/null +++ b/packages/helia/test/block-brokers/trustless-gateway-block-broker.spec.ts @@ -0,0 +1,151 @@ +/* eslint-env mocha */ +import { expect } from 'aegir/chai' +import * as raw from 'multiformats/codecs/raw' +import Sinon from 'sinon' +import { type StubbedInstance, stubConstructor } from 'sinon-ts' +import { TrustlessGatewayBlockBroker } from '../../src/block-brokers/index.js' +import { TrustlessGateway } from '../../src/block-brokers/trustless-gateway-block-broker.js' +import { createBlock } from '../fixtures/create-block.js' +import type { CID } from 'multiformats/cid' + +describe('trustless-gateway-block-broker', () => { + let blocks: Array<{ cid: CID, block: Uint8Array }> + let gatewayBlockBroker: TrustlessGatewayBlockBroker + let gateways: Array> + + // take a Record) => void> and stub the gateways + // Record.default is the default handler + function stubGateways (handlers: Record, index?: number) => void> & { default(gateway: StubbedInstance, index: number): void }): void { + for (let i = 0; i < gateways.length; i++) { + if (handlers[i] != null) { + handlers[i](gateways[i]) + continue + } + handlers.default(gateways[i], i) + } + } + + beforeEach(async () => { + blocks = [] + + for (let i = 0; i < 10; i++) { + blocks.push(await createBlock(raw.code, Uint8Array.from([0, 1, 2, i]))) + } + + gateways = [ + stubConstructor(TrustlessGateway, 'http://localhost:8080'), + stubConstructor(TrustlessGateway, 'http://localhost:8081'), + stubConstructor(TrustlessGateway, 'http://localhost:8082'), + stubConstructor(TrustlessGateway, 'http://localhost:8083') + ] + gatewayBlockBroker = new TrustlessGatewayBlockBroker(gateways) + }) + + it('tries all gateways before failing', async () => { + // stub all gateway responses to fail + for (const gateway of gateways) { + gateway.getRawBlock.rejects(new Error('failed')) + } + try { + await gatewayBlockBroker.retrieve(blocks[0].cid) + throw new Error('should have failed') + } catch (err: unknown) { + expect(err).to.exist() + expect(err).to.be.an.instanceOf(AggregateError) + expect((err as AggregateError).errors).to.have.lengthOf(gateways.length) + } + for (const gateway of gateways) { + expect(gateway.getRawBlock.calledWith(blocks[0].cid)).to.be.true() + } + }) + + it('prioritizes gateways based on reliability', async () => { + const callOrder: number[] = [] + + // stub all gateway responses to fail, and set reliabilities to known values. + stubGateways({ + default: (gateway, i) => { + gateway.getRawBlock.withArgs(blocks[1].cid, Sinon.match.any).callsFake(async () => { + callOrder.push(i) + throw new Error('failed') + }) + gateway.reliability.returns(i) // known reliability of 0, 1, 2, 3 + } + }) + + try { + await gatewayBlockBroker.retrieve(blocks[1].cid) + } catch { + // ignore + } + // all gateways were called + expect(gateways[0].getRawBlock.calledWith(blocks[1].cid)).to.be.true() + expect(gateways[1].getRawBlock.calledWith(blocks[1].cid)).to.be.true() + expect(gateways[2].getRawBlock.calledWith(blocks[1].cid)).to.be.true() + expect(gateways[3].getRawBlock.calledWith(blocks[1].cid)).to.be.true() + // and in the correct order. + expect(callOrder).to.have.ordered.members([3, 2, 1, 0]) + }) + + it('tries other gateways if it receives invalid blocks', async () => { + const { cid: cid1, block: block1 } = blocks[0] + const { block: block2 } = blocks[1] + stubGateways({ + // return valid block for only one gateway + 0: (gateway) => { + gateway.getRawBlock.withArgs(cid1, Sinon.match.any).resolves(block1) + gateway.reliability.returns(0) // make sure it's called last + }, + // return invalid blocks for all other gateways + default: (gateway) => { // default stub function + gateway.getRawBlock.withArgs(cid1, Sinon.match.any).resolves(block2) // invalid block for the CID + gateway.reliability.returns(1) // make sure other gateways are called first + } + }) + // try { + const block = await gatewayBlockBroker.retrieve(cid1, { + validateFn: async (block) => { + if (block !== block1) { + throw new Error('invalid block') + } + } + }) + expect(block).to.equal(block1) + + // expect that all gateways are called, because everyone returned invalid blocks except the last one + for (const gateway of gateways) { + expect(gateway.getRawBlock.calledWith(cid1, Sinon.match.any)).to.be.true() + } + }) + + it('doesnt call other gateways if the first gateway returns a valid block', async () => { + const { cid: cid1, block: block1 } = blocks[0] + const { block: block2 } = blocks[1] + + stubGateways({ + // return valid block for only one gateway + 3: (gateway) => { + gateway.getRawBlock.withArgs(cid1, Sinon.match.any).resolves(block1) + gateway.reliability.returns(1) // make sure it's called first + }, + // return invalid blocks for all other gateways + default: (gateway) => { // default stub function + gateway.getRawBlock.withArgs(cid1, Sinon.match.any).resolves(block2) // invalid block for the CID + gateway.reliability.returns(0) // make sure other gateways are called last + } + }) + const block = await gatewayBlockBroker.retrieve(cid1, { + validateFn: async (block) => { + if (block !== block1) { + throw new Error('invalid block') + } + } + }) + expect(block).to.equal(block1) + expect(gateways[3].getRawBlock.calledWith(cid1, Sinon.match.any)).to.be.true() + // expect that other gateways are not called, because the first gateway returned a valid block + expect(gateways[0].getRawBlock.calledWith(cid1, Sinon.match.any)).to.be.false() + expect(gateways[1].getRawBlock.calledWith(cid1, Sinon.match.any)).to.be.false() + expect(gateways[2].getRawBlock.calledWith(cid1, Sinon.match.any)).to.be.false() + }) +}) diff --git a/packages/helia/test/fixtures/create-helia.ts b/packages/helia/test/fixtures/create-helia.ts index a8a0dd53..8fed714f 100644 --- a/packages/helia/test/fixtures/create-helia.ts +++ b/packages/helia/test/fixtures/create-helia.ts @@ -2,11 +2,15 @@ import { webSockets } from '@libp2p/websockets' import * as Filters from '@libp2p/websockets/filters' import { circuitRelayTransport } from 'libp2p/circuit-relay' import { identifyService } from 'libp2p/identify' +import { BitswapBlockBrokerFactory } from '../../src/block-brokers/index.js' import { createHelia as createNode } from '../../src/index.js' import type { Helia } from '@helia/interface' export async function createHelia (): Promise { return createNode({ + blockBrokers: [ + BitswapBlockBrokerFactory + ], libp2p: { addresses: { listen: [ diff --git a/packages/helia/test/pins.depth-limited.spec.ts b/packages/helia/test/pins.depth-limited.spec.ts index 201983ab..6c084d5c 100644 --- a/packages/helia/test/pins.depth-limited.spec.ts +++ b/packages/helia/test/pins.depth-limited.spec.ts @@ -27,6 +27,7 @@ describe('pins (depth limited)', () => { dag = await createDag(codec, blockstore, MAX_DEPTH, 3) helia = await createHelia({ + blockBrokers: [], datastore: new MemoryDatastore(), blockstore, libp2p: await createLibp2p({ diff --git a/packages/helia/test/pins.recursive.spec.ts b/packages/helia/test/pins.recursive.spec.ts index 1bc4a7bd..8b451bd9 100644 --- a/packages/helia/test/pins.recursive.spec.ts +++ b/packages/helia/test/pins.recursive.spec.ts @@ -25,6 +25,7 @@ describe('pins (recursive)', () => { dag = await createDag(codec, blockstore, 2, 3) helia = await createHelia({ + blockBrokers: [], datastore: new MemoryDatastore(), blockstore, libp2p: await createLibp2p({ diff --git a/packages/helia/test/utils/networked-storage.spec.ts b/packages/helia/test/utils/networked-storage.spec.ts index 5c993ed5..40124628 100644 --- a/packages/helia/test/utils/networked-storage.spec.ts +++ b/packages/helia/test/utils/networked-storage.spec.ts @@ -15,7 +15,7 @@ import type { BitswapBlockBroker } from '../../src/block-brokers/bitswap-block-b import type { Blockstore } from 'interface-blockstore' import type { CID } from 'multiformats/cid' -describe('storage', () => { +describe('networked-storage', () => { let storage: NetworkedStorage let blockstore: Blockstore let bitswap: StubbedInstance diff --git a/packages/interface/src/blocks.ts b/packages/interface/src/blocks.ts index 742a0484..0b8356ea 100644 --- a/packages/interface/src/blocks.ts +++ b/packages/interface/src/blocks.ts @@ -63,11 +63,21 @@ ProgressOptions, ProgressOptions = AbortOptions & GetProgressOptions & { + /** + * A function that blockBrokers should call prior to returning a block to ensure it can maintain control + * of the block request flow. e.g. TrustedGatewayBlockBroker will use this to ensure that the block + * is valid from one of the gateways before assuming it's work is done. If the block is not valid, it should try another gateway + * and WILL consider the gateway that returned the invalid blocks completely unreliable. + */ + validateFn?(block: Uint8Array): Promise +} + export interface BlockRetriever { /** * Retrieve a block from a source */ - retrieve(cid: CID, options?: AbortOptions & GetProgressOptions): Promise + retrieve(cid: CID, options?: BlockRetrievalOptions): Promise } export interface BlockAnnouncer { diff --git a/packages/interface/src/index.ts b/packages/interface/src/index.ts index a6245b18..edd46b95 100644 --- a/packages/interface/src/index.ts +++ b/packages/interface/src/index.ts @@ -14,11 +14,12 @@ * ``` */ -import type { Blocks } from './blocks.js' +import type { BlockBroker, Blocks } from './blocks.js' import type { Pins } from './pins.js' import type { Libp2p, AbortOptions } from '@libp2p/interface' import type { Datastore } from 'interface-datastore' import type { CID } from 'multiformats/cid' +import type { MultihashHasher } from 'multiformats/hashes/interface' import type { ProgressEvent, ProgressOptions } from 'progress-events' export type { Await, AwaitIterable } from 'interface-store' @@ -70,3 +71,18 @@ export type GcEvents = export interface GCOptions extends AbortOptions, ProgressOptions { } +export type BlockBrokerFactoryComponents = Pick & { + hashers: MultihashHasher[] +} + +/** + * A function that receives some {@link Helia} components and returns a + * {@link BlockBroker}. + * + * This is needed in order to re-use some of the internal components Helia + * constructs without having to hoist each required component into the top-level + * scope. + */ +export interface BlockBrokerFactoryFunction { + (heliaComponents: BlockBrokerFactoryComponents): BlockBroker +} diff --git a/packages/interop/test/fixtures/create-helia.browser.ts b/packages/interop/test/fixtures/create-helia.browser.ts index 38864c44..1c544b21 100644 --- a/packages/interop/test/fixtures/create-helia.browser.ts +++ b/packages/interop/test/fixtures/create-helia.browser.ts @@ -5,6 +5,7 @@ import { all } from '@libp2p/websockets/filters' import { MemoryBlockstore } from 'blockstore-core' import { MemoryDatastore } from 'datastore-core' import { createHelia, type HeliaInit } from 'helia' +import { BitswapBlockBrokerFactory } from 'helia/block-brokers' import { createLibp2p } from 'libp2p' import { identifyService } from 'libp2p/identify' import type { Helia } from '@helia/interface' @@ -38,6 +39,9 @@ export async function createHeliaNode (init?: Partial): Promise): Promise