Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: TrustlessGatewayBlockBroker prioritizes & tries all gateways #281

Merged
merged 8 commits into from
Oct 13, 2023
Merged
4 changes: 4 additions & 0 deletions packages/helia/.aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ const options = {
before: async () => {
// use dynamic import otherwise the source may not have been built yet
const { createHelia } = await import('./dist/src/index.js')
const { BitswapBlockBrokerFactory } = await import('./dist/src/block-brokers/index.js')

const helia = await createHelia({
blockBrokers: [
BitswapBlockBrokerFactory
],
libp2p: {
addresses: {
listen: [
Expand Down
14 changes: 11 additions & 3 deletions packages/helia/src/block-brokers/bitswap-block-broker.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { createBitswap } from 'ipfs-bitswap'
import type { BlockAnnouncer, BlockRetriever } from '@helia/interface/blocks'
import type { BlockBrokerFactoryFunction } from '@helia/interface'
import type { BlockAnnouncer, BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks'
import type { Libp2p } from '@libp2p/interface'
import type { Startable } from '@libp2p/interface/startable'
import type { Blockstore } from 'interface-blockstore'
import type { AbortOptions } from 'interface-store'
import type { Bitswap, BitswapNotifyProgressEvents, BitswapWantBlockProgressEvents } from 'ipfs-bitswap'
import type { CID } from 'multiformats/cid'
import type { MultihashHasher } from 'multiformats/hashes/interface'
Expand Down Expand Up @@ -52,7 +52,15 @@ ProgressOptions<BitswapWantBlockProgressEvents>
this.bitswap.notify(cid, block, options)
}

async retrieve (cid: CID, options?: AbortOptions & ProgressOptions<BitswapWantBlockProgressEvents>): Promise<Uint8Array> {
async retrieve (cid: CID, { validateFn, ...options }: BlockRetrievalOptions<ProgressOptions<BitswapWantBlockProgressEvents>> = {}): Promise<Uint8Array> {
return this.bitswap.want(cid, options)
}
}

/**
* A helper factory for users who want to override Helia `blockBrokers` but
* still want to use the default `BitswapBlockBroker`.
*/
export const BitswapBlockBrokerFactory: BlockBrokerFactoryFunction = (components): BitswapBlockBroker => {
return new BitswapBlockBroker(components.libp2p, components.blockstore, components.hashers)
}
4 changes: 2 additions & 2 deletions packages/helia/src/block-brokers/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export { BitswapBlockBroker } from './bitswap-block-broker.js'
export { TrustedGatewayBlockBroker } from './trustless-gateway-block-broker.js'
export { BitswapBlockBroker, BitswapBlockBrokerFactory } from './bitswap-block-broker.js'
export { TrustlessGatewayBlockBroker } from './trustless-gateway-block-broker.js'
218 changes: 166 additions & 52 deletions packages/helia/src/block-brokers/trustless-gateway-block-broker.ts
Original file line number Diff line number Diff line change
@@ -1,78 +1,192 @@
import { logger } from '@libp2p/logger'
import type { BlockRetriever } from '@helia/interface/blocks'
import type { AbortOptions } from 'interface-store'
import type { BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks'
import type { CID } from 'multiformats/cid'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

const log = logger('helia:trustless-gateway-block-provider')

export type TrustlessGatewayGetBlockProgressEvents =
ProgressEvent<'trustless-gateway:get-block:fetch', URL>
const log = logger('helia:trustless-gateway-block-broker')

/**
* A class that accepts a list of trustless gateways that are queried
* for blocks.
* A `TrustlessGateway` keeps track of the number of attempts, errors, and
* successes for a given gateway url so that we can prioritize gateways that
* have been more reliable in the past, and ensure that requests are distributed
* across all gateways within a given `TrustlessGatewayBlockBroker` instance.
*/
export class TrustedGatewayBlockBroker implements BlockRetriever<
ProgressOptions<TrustlessGatewayGetBlockProgressEvents>
> {
private readonly gateways: URL[]
export class TrustlessGateway {
public readonly url: URL
/**
* The number of times this gateway has been attempted to be used to fetch a
* block. This includes successful, errored, and aborted attempts. By counting
* even aborted attempts, slow gateways that are out-raced by others will be
* considered less reliable.
*/
#attempts = 0

/**
* The number of times this gateway has errored while attempting to fetch a
* block. This includes `response.ok === false` and any other errors that
* throw while attempting to fetch a block. This does not include aborted
* attempts.
*/
#errors = 0

/**
* The number of times this gateway has returned an invalid block. A gateway
* that returns the wrong blocks for a CID should be considered for removal
* from the list of gateways to fetch blocks from.
*/
#invalidBlocks = 0

/**
* The number of times this gateway has successfully fetched a block.
*/
#successes = 0

constructor (urls: Array<string | URL>) {
this.gateways = urls.map(url => new URL(url.toString()))
constructor (url: URL | string) {
this.url = url instanceof URL ? url : new URL(url)
}

async retrieve (cid: CID, options: AbortOptions & ProgressOptions<TrustlessGatewayGetBlockProgressEvents> = {}): Promise<Uint8Array> {
// choose a gateway
const url = this.gateways[Math.floor(Math.random() * this.gateways.length)]
/**
* Fetch a raw block from `this.url` following the specification defined at
* https://specs.ipfs.tech/http-gateways/trustless-gateway/
*/
async getRawBlock (cid: CID, signal?: AbortSignal): Promise<Uint8Array> {
const gwUrl = this.url
gwUrl.pathname = `/ipfs/${cid.toString()}`

log('getting block for %c from %s', cid, url)
// necessary as not every gateway supports dag-cbor, but every should support
// sending raw block as-is
gwUrl.search = '?format=raw'

if (signal?.aborted === true) {
throw new Error(`Signal to fetch raw block for CID ${cid} from gateway ${this.url} was aborted prior to fetch`)
}

try {
const block = await getRawBlockFromGateway(url, cid, options.signal)
log('got block for %c from %s', cid, url)
this.#attempts++
const res = await fetch(gwUrl.toString(), {
signal,
headers: {
// also set header, just in case ?format= is filtered out by some
// reverse proxy
Accept: 'application/vnd.ipld.raw'
},
cache: 'force-cache'
})
if (!res.ok) {
this.#errors++
throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${this.url}`)
}
this.#successes++
return new Uint8Array(await res.arrayBuffer())
} catch (cause) {
// @ts-expect-error - TS thinks signal?.aborted can only be false now
// because it was checked for true above.
if (signal?.aborted === true) {
throw new Error(`fetching raw block for CID ${cid} from gateway ${this.url} was aborted`)
}
this.#errors++
throw new Error(`unable to fetch raw block for CID ${cid}`)
}
}

return block
} catch (err: any) {
log.error('failed to get block for %c from %s', cid, url, err)
/**
* Encapsulate the logic for determining whether a gateway is considered
* reliable, for prioritization. This is based on the number of successful attempts made
* and the number of errors encountered.
*
* Unused gateways have 100% reliability; They will be prioritized over
* gateways with a 100% success rate to ensure that we attempt all gateways.
*/
reliability (): number {
/**
* if we have never tried to use this gateway, it is considered the most
* reliable until we determine otherwise (prioritize unused gateways)
*/
if (this.#attempts === 0) {
return 1
}

throw err
if (this.#invalidBlocks > 0) {
// this gateway may not be trustworthy..
return -Infinity
}

/**
* We have attempted the gateway, so we need to calculate the reliability
* based on the number of attempts, errors, and successes. Gateways that
* return a single error should drop their reliability score more than a
* single success increases it.
*
* Play around with the below reliability function at https://www.desmos.com/calculator/d6hfhf5ukm
*/
return this.#successes / (this.#attempts + (this.#errors * 3))
}

/**
* Increment the number of invalid blocks returned by this gateway.
*/
incrementInvalidBlocks (): void {
this.#invalidBlocks++
}
}

async function getRawBlockFromGateway (url: URL, cid: CID, signal?: AbortSignal): Promise<Uint8Array> {
const gwUrl = new URL(url)
gwUrl.pathname = `/ipfs/${cid.toString()}`
export type TrustlessGatewayGetBlockProgressEvents =
ProgressEvent<'trustless-gateway:get-block:fetch', URL>

// necessary as not every gateway supports dag-cbor, but every should support
// sending raw block as-is
gwUrl.search = '?format=raw'
/**
* A class that accepts a list of trustless gateways that are queried
* for blocks.
*/
export class TrustlessGatewayBlockBroker implements BlockRetriever<
ProgressOptions<TrustlessGatewayGetBlockProgressEvents>
> {
private readonly gateways: TrustlessGateway[]

if (signal?.aborted === true) {
throw new Error(`Signal to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted prior to fetch`)
constructor (gatewaysOrUrls: Array<string | URL | TrustlessGateway>) {
this.gateways = gatewaysOrUrls.map((gatewayOrUrl) => {
if (gatewayOrUrl instanceof TrustlessGateway || Object.prototype.hasOwnProperty.call(gatewayOrUrl, 'getRawBlock')) {
return gatewayOrUrl as TrustlessGateway
}
// eslint-disable-next-line no-console
console.trace('creating new TrustlessGateway for %s', gatewayOrUrl)
return new TrustlessGateway(gatewayOrUrl)
})
}

try {
const res = await fetch(gwUrl.toString(), {
signal,
headers: {
// also set header, just in case ?format= is filtered out by some
// reverse proxy
Accept: 'application/vnd.ipld.raw'
},
cache: 'force-cache'
})
if (!res.ok) {
throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()}`)
}
return new Uint8Array(await res.arrayBuffer())
} catch (cause) {
// @ts-expect-error - TS thinks signal?.aborted can only be false now
// because it was checked for true above.
if (signal?.aborted === true) {
throw new Error(`fetching raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted`)
async retrieve (cid: CID, options: BlockRetrievalOptions<ProgressOptions<TrustlessGatewayGetBlockProgressEvents>> = {}): Promise<Uint8Array> {
// Loop through the gateways until we get a block or run out of gateways
const sortedGateways = this.gateways.sort((a, b) => b.reliability() - a.reliability())
const aggregateErrors: Error[] = []
for (const gateway of sortedGateways) {
log('getting block for %c from %s', cid, gateway.url)
try {
const block = await gateway.getRawBlock(cid, options.signal)
log.trace('got block for %c from %s', cid, gateway.url)
try {
await options.validateFn?.(block)
} catch (err) {
log.error('failed to validate block for %c from %s', cid, gateway.url, err)
gateway.incrementInvalidBlocks()

throw new Error(`unable to validate block for CID ${cid} from gateway ${gateway.url}`)
}

return block
} catch (err: unknown) {
log.error('failed to get block for %c from %s', cid, gateway.url, err)
if (err instanceof Error) {
aggregateErrors.push(err)
} else {
aggregateErrors.push(new Error(`unable to fetch raw block for CID ${cid} from gateway ${gateway.url}`))
}
// if signal was aborted, exit the loop
if (options.signal?.aborted === true) {
log.trace('request aborted while fetching raw block for CID %c from gateway %s', cid, gateway.url)
break
}
}
}
throw new Error(`unable to fetch raw block for CID ${cid}`)

throw new AggregateError(aggregateErrors, `unable to fetch raw block for CID ${cid} from any gateway`)
}
}
2 changes: 2 additions & 0 deletions packages/helia/src/helia.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { type BlockBroker } from '@helia/interface/blocks'
import { start, stop } from '@libp2p/interface/startable'
import { logger } from '@libp2p/logger'
import drain from 'it-drain'
Expand All @@ -19,6 +20,7 @@ const log = logger('helia')
interface HeliaImplInit<T extends Libp2p = Libp2p> extends HeliaInit<T> {
libp2p: T
blockstore: Blockstore
blockBrokers: BlockBroker[]
datastore: Datastore
}

Expand Down
20 changes: 15 additions & 5 deletions packages/helia/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import { logger } from '@libp2p/logger'
import { MemoryBlockstore } from 'blockstore-core'
import { MemoryDatastore } from 'datastore-core'
import { BitswapBlockBroker, TrustedGatewayBlockBroker } from './block-brokers/index.js'
import { BitswapBlockBroker, TrustlessGatewayBlockBroker } from './block-brokers/index.js'
import { HeliaImpl } from './helia.js'
import { defaultHashers } from './utils/default-hashers.js'
import { createLibp2p } from './utils/libp2p.js'
import { name, version } from './version.js'
import type { DefaultLibp2pServices } from './utils/libp2p-defaults.js'
import type { Helia } from '@helia/interface'
import type { Helia, BlockBrokerFactoryFunction } from '@helia/interface'
import type { BlockBroker } from '@helia/interface/blocks'
import type { Libp2p } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
Expand Down Expand Up @@ -98,7 +98,7 @@ export interface HeliaInit<T extends Libp2p = Libp2p> {
* A list of strategies used to fetch blocks when they are not present in
* the local blockstore
*/
blockBrokers?: BlockBroker[]
blockBrokers?: Array<BlockBroker | BlockBrokerFactoryFunction>

/**
* Pass `false` to not start the Helia node
Expand Down Expand Up @@ -159,9 +159,19 @@ export async function createHelia (init: HeliaInit = {}): Promise<Helia<unknown>

const hashers = defaultHashers(init.hashers)

const blockBrokers = init.blockBrokers ?? [
const blockBrokers: BlockBroker[] = init.blockBrokers?.map((blockBroker: BlockBroker | BlockBrokerFactoryFunction): BlockBroker => {
if (typeof blockBroker !== 'function') {
return blockBroker satisfies BlockBroker
}
return blockBroker({
blockstore,
datastore,
libp2p,
hashers
}) satisfies BlockBroker
}) ?? [
new BitswapBlockBroker(libp2p, blockstore, hashers),
new TrustedGatewayBlockBroker(DEFAULT_TRUSTLESS_GATEWAYS)
new TrustlessGatewayBlockBroker(DEFAULT_TRUSTLESS_GATEWAYS)
]

const helia = new HeliaImpl({
Expand Down
Loading