Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: TrustlessGatewayBlockBroker prioritizes & tries all gateways #281

Merged
merged 8 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
"docs": "NODE_OPTIONS=--max_old_space_size=8192 aegir docs -- --exclude packages/interop --excludeExternals",
"docs:no-publish": "NODE_OPTIONS=--max_old_space_size=8192 aegir docs --publish false -- --exclude packages/interop"
},
"dependencies": {
"any-signal": "^4.1.1"
},
"devDependencies": {
"aegir": "^41.0.0",
"npm-run-all": "^4.1.5",
Expand Down
21 changes: 21 additions & 0 deletions packages/helia/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,22 @@
],
"type": "module",
"types": "./dist/src/index.d.ts",
"typesVersions": {
"*": {
"*": [
"*",
"dist/*",
"dist/src/*",
"dist/src/*/index"
],
"src/*": [
"*",
"dist/*",
"dist/src/*",
"dist/src/*/index"
]
}
},
"files": [
"src",
"dist",
Expand All @@ -26,6 +42,10 @@
".": {
"types": "./dist/src/index.d.ts",
"import": "./dist/src/index.js"
},
"./block-providers": {
"types": "./dist/src/block-providers/index.d.ts",
"import": "./dist/src/block-providers/index.js"
}
},
"eslintConfig": {
Expand Down Expand Up @@ -68,6 +88,7 @@
"@libp2p/webrtc": "^3.1.3",
"@libp2p/websockets": "^7.0.2",
"@libp2p/webtransport": "^3.0.3",
"any-signal": "^4.1.1",
"blockstore-core": "^4.0.0",
"cborg": "^4.0.1",
"datastore-core": "^9.0.0",
Expand Down
59 changes: 59 additions & 0 deletions packages/helia/src/block-providers/bitswap-block-provider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { createBitswap } from 'ipfs-bitswap'
import type { BlockProvider } from '@helia/interface/blocks'
import type { Libp2p } from '@libp2p/interface'
import type { Startable } from '@libp2p/interface/startable'
import type { Blockstore } from 'interface-blockstore'
import type { AbortOptions } from 'interface-store'
import type { Bitswap, BitswapNotifyProgressEvents, BitswapWantBlockProgressEvents } from 'ipfs-bitswap'
import type { CID } from 'multiformats/cid'
import type { MultihashHasher } from 'multiformats/hashes/interface'
import type { ProgressOptions } from 'progress-events'

export class BitswapBlockProvider implements BlockProvider<
ProgressOptions<BitswapNotifyProgressEvents>,
ProgressOptions<BitswapWantBlockProgressEvents>
>, Startable {
private readonly bitswap: Bitswap
private started: boolean

constructor (libp2p: Libp2p, blockstore: Blockstore, hashers: MultihashHasher[]) {
this.bitswap = createBitswap(libp2p, blockstore, {
hashLoader: {
getHasher: async (codecOrName: string | number): Promise<MultihashHasher<number>> => {
const hasher = hashers.find(hasher => {
return hasher.code === codecOrName || hasher.name === codecOrName
})

if (hasher != null) {
return hasher
}

throw new Error(`Could not load hasher for code/name "${codecOrName}"`)
}
}
})
this.started = false
}

isStarted (): boolean {
return this.started
}

async start (): Promise<void> {
await this.bitswap.start()
this.started = true
}

async stop (): Promise<void> {
await this.bitswap.stop()
this.started = false
}

notify (cid: CID, block: Uint8Array, options?: ProgressOptions<BitswapNotifyProgressEvents>): void {
this.bitswap.notify(cid, block, options)
}

async get (cid: CID, options?: AbortOptions & ProgressOptions<BitswapWantBlockProgressEvents>): Promise<Uint8Array> {
return this.bitswap.want(cid, options)
}
}
2 changes: 2 additions & 0 deletions packages/helia/src/block-providers/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { BitswapBlockProvider } from './bitswap-block-provider.js'
export { TrustlessGatewayBlockProvider } from './trustless-gateway-block-provider.js'
148 changes: 148 additions & 0 deletions packages/helia/src/block-providers/trustless-gateway-block-provider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import { logger } from '@libp2p/logger'
import type { BlockProvider } from '@helia/interface/blocks'
import type { AbortOptions } from 'interface-store'
import type { CID } from 'multiformats/cid'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

const log = logger('helia:trustless-gateway-block-provider')

/**
* A BlockProvider constructs instances of `TrustlessGateway`
* keeps track of the number of attempts, errors, and successes for a given
* gateway url.
*/
class TrustlessGateway {
public readonly url: URL
/**
* The number of times this gateway has been attempted to be used to fetch a
* block. This includes successful, errored, and aborted attempts. By counting
* even aborted attempts, slow gateways that are out-raced by others will be
* considered less reliable.
*/
#attempts = 0

/**
* The number of times this gateway has errored while attempting to fetch a
* block. This includes `response.ok === false` and any other errors that
* throw while attempting to fetch a block.
*/
#errors = 0

/**
* The number of times this gateway has successfully fetched a block.
*/
#successes = 0
constructor (url: URL | string) {
this.url = url instanceof URL ? url : new URL(url)
}

/**
* Fetch a raw block from `this.url` following the specification defined at
* https://specs.ipfs.tech/http-gateways/trustless-gateway/
*/
async getRawBlock (cid: CID, signal?: AbortSignal): Promise<Uint8Array> {
const gwUrl = this.url
gwUrl.pathname = `/ipfs/${cid.toString()}`

// necessary as not every gateway supports dag-cbor, but every should support
// sending raw block as-is
gwUrl.search = '?format=raw'

if (signal?.aborted === true) {
throw new Error(`Signal to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted prior to fetch`)
}

try {
this.#attempts++
const res = await fetch(gwUrl.toString(), {
signal,
headers: {
// also set header, just in case ?format= is filtered out by some
// reverse proxy
Accept: 'application/vnd.ipld.raw'
},
cache: 'force-cache'
})
if (!res.ok) {
this.#errors++
throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()}`)
}
this.#successes++
return new Uint8Array(await res.arrayBuffer())
} catch (cause) {
// @ts-expect-error - TS thinks signal?.aborted can only be false now
// because it was checked for true above.
if (signal?.aborted === true) {
throw new Error(`fetching raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted`)
}
this.#errors++
throw new Error(`unable to fetch raw block for CID ${cid}`)
}
}

/**
* Encapsulate the logic for determining whether a gateway is considered
* reliable, for prioritization. This is based on the number of successful attempts made
* and the number of errors encountered.
*
* * Unused gateways have 100% reliability
* * Gateways that have never errored have 100% reliability
*/
get reliability (): number {
// if we have never tried to use this gateway, it is considered the most
// reliable until we determine otherwise
// (prioritize unused gateways)
if (this.#attempts === 0) {
return 1
}

// The gateway has > 0 attempts; If we have never encountered an error, consider it 100% reliable.
// Even if a gateway has no successes, it is still considered more reliable than a gateway with errors,
// because it may have been used and aborted, or beaten by another BlockProvider.
if (this.#errors === 0) {
return 1
}

// We have encountered errors, so we need to calculate the reliability
// based on the number of attempts, errors, and successes. Gateways that
// return a single error should drop their reliability score more than a
// success increases it.
// Play around with the below reliability function at https://www.desmos.com/calculator/d6hfhf5ukm
return this.#successes / (this.#attempts + (this.#errors * 3))
}
}

export type TrustlessGatewayGetBlockProgressEvents =
ProgressEvent<'trustless-gateway:get-block:fetch', URL>

/**
* A BlockProvider that accepts a list of trustless gateways that are queried
* for blocks. Individual gateways are randomly chosen.
*/
export class TrustlessGatewayBlockProvider implements BlockProvider<
ProgressOptions,
ProgressOptions<TrustlessGatewayGetBlockProgressEvents>
> {
private readonly gateways: TrustlessGateway[]

constructor (urls: Array<string | URL>) {
this.gateways = urls.map((url) => new TrustlessGateway(url))
}

async get (cid: CID, options: AbortOptions & ProgressOptions<TrustlessGatewayGetBlockProgressEvents> = {}): Promise<Uint8Array> {
// Loop through the gateways until we get a block or run out of gateways
for (const gateway of this.gateways.sort((a, b) => b.reliability - a.reliability)) {
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved
log('getting block for %c from %s', cid, gateway.url)
try {
const block = await gateway.getRawBlock(cid, options.signal)
log('got block for %c from %s', cid, gateway.url)

return block
} catch (err) {
log.error('failed to get block for %c from %s', cid, gateway.url, err)
}
}

throw new Error(`unable to fetch raw block for CID ${cid} from any gateway`)
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved
}
}
38 changes: 5 additions & 33 deletions packages/helia/src/helia.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import { start, stop } from '@libp2p/interface/startable'
import { logger } from '@libp2p/logger'
import { type Bitswap, createBitswap } from 'ipfs-bitswap'
import drain from 'it-drain'
import { identity } from 'multiformats/hashes/identity'
import { sha256, sha512 } from 'multiformats/hashes/sha2'
import { CustomProgressEvent } from 'progress-events'
import { PinsImpl } from './pins.js'
import { BlockStorage } from './storage.js'
Expand All @@ -15,7 +13,6 @@ import type { Libp2p } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
import type { Datastore } from 'interface-datastore'
import type { CID } from 'multiformats/cid'
import type { MultihashHasher } from 'multiformats/hashes/interface'

const log = logger('helia')

Expand All @@ -31,34 +28,10 @@ export class HeliaImpl implements Helia {
public datastore: Datastore
public pins: Pins

#bitswap?: Bitswap

constructor (init: HeliaImplInit) {
const hashers: MultihashHasher[] = [
sha256,
sha512,
identity,
...(init.hashers ?? [])
]

this.#bitswap = createBitswap(init.libp2p, init.blockstore, {
hashLoader: {
getHasher: async (codecOrName: string | number): Promise<MultihashHasher<number>> => {
const hasher = hashers.find(hasher => {
return hasher.code === codecOrName || hasher.name === codecOrName
})

if (hasher != null) {
return hasher
}

throw new Error(`Could not load hasher for code/name "${codecOrName}"`)
}
}
})

const networkedStorage = new NetworkedStorage(init.blockstore, {
bitswap: this.#bitswap
blockProviders: init.blockProviders,
hashers: init.hashers
})

this.pins = new PinsImpl(init.datastore, networkedStorage, init.dagWalkers ?? [])
Expand All @@ -72,14 +45,13 @@ export class HeliaImpl implements Helia {

async start (): Promise<void> {
await assertDatastoreVersionIsCurrent(this.datastore)

await this.#bitswap?.start()
await start(this.blockstore)
await this.libp2p.start()
}

async stop (): Promise<void> {
await this.libp2p.stop()
await this.#bitswap?.stop()
await stop(this.blockstore)
}

async gc (options: GCOptions = {}): Promise<void> {
Expand Down
Loading
Loading