Skip to content

Commit

Permalink
feat: configurable block brokers (#280)
Browse files Browse the repository at this point in the history
Adds configurable block providers to allow using bitswap but also other
methods such as trustless gateways and any yet-to-be-invented way of
resolving a CID to a block..
  • Loading branch information
achingbrain authored Oct 10, 2023
1 parent fdda692 commit 0749cbf
Show file tree
Hide file tree
Showing 12 changed files with 386 additions and 72 deletions.
25 changes: 25 additions & 0 deletions packages/helia/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,22 @@
],
"type": "module",
"types": "./dist/src/index.d.ts",
"typesVersions": {
"*": {
"*": [
"*",
"dist/*",
"dist/src/*",
"dist/src/*/index"
],
"src/*": [
"*",
"dist/*",
"dist/src/*",
"dist/src/*/index"
]
}
},
"files": [
"src",
"dist",
Expand All @@ -26,6 +42,14 @@
".": {
"types": "./dist/src/index.d.ts",
"import": "./dist/src/index.js"
},
"./block-brokers": {
"types": "./dist/src/block-brokers/index.d.ts",
"import": "./dist/src/block-brokers/index.js"
},
"./hashers": {
"types": "./dist/src/utils/default-hashers.d.ts",
"import": "./dist/src/utils/default-hashers.js"
}
},
"eslintConfig": {
Expand Down Expand Up @@ -68,6 +92,7 @@
"@libp2p/webrtc": "^3.1.3",
"@libp2p/websockets": "^7.0.2",
"@libp2p/webtransport": "^3.0.3",
"any-signal": "^4.1.1",
"blockstore-core": "^4.0.0",
"cborg": "^4.0.1",
"datastore-core": "^9.0.0",
Expand Down
58 changes: 58 additions & 0 deletions packages/helia/src/block-brokers/bitswap-block-broker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { createBitswap } from 'ipfs-bitswap'
import type { BlockAnnouncer, BlockRetriever } from '@helia/interface/blocks'
import type { Libp2p } from '@libp2p/interface'
import type { Startable } from '@libp2p/interface/startable'
import type { Blockstore } from 'interface-blockstore'
import type { AbortOptions } from 'interface-store'
import type { Bitswap, BitswapNotifyProgressEvents, BitswapWantBlockProgressEvents } from 'ipfs-bitswap'
import type { CID } from 'multiformats/cid'
import type { MultihashHasher } from 'multiformats/hashes/interface'
import type { ProgressOptions } from 'progress-events'

export class BitswapBlockBroker implements BlockAnnouncer<ProgressOptions<BitswapNotifyProgressEvents>>, BlockRetriever<
ProgressOptions<BitswapWantBlockProgressEvents>
>, Startable {
private readonly bitswap: Bitswap
private started: boolean

constructor (libp2p: Libp2p, blockstore: Blockstore, hashers: MultihashHasher[]) {
this.bitswap = createBitswap(libp2p, blockstore, {
hashLoader: {
getHasher: async (codecOrName: string | number): Promise<MultihashHasher<number>> => {
const hasher = hashers.find(hasher => {
return hasher.code === codecOrName || hasher.name === codecOrName
})

if (hasher != null) {
return hasher
}

throw new Error(`Could not load hasher for code/name "${codecOrName}"`)
}
}
})
this.started = false
}

isStarted (): boolean {
return this.started
}

async start (): Promise<void> {
await this.bitswap.start()
this.started = true
}

async stop (): Promise<void> {
await this.bitswap.stop()
this.started = false
}

announce (cid: CID, block: Uint8Array, options?: ProgressOptions<BitswapNotifyProgressEvents>): void {
this.bitswap.notify(cid, block, options)
}

async retrieve (cid: CID, options?: AbortOptions & ProgressOptions<BitswapWantBlockProgressEvents>): Promise<Uint8Array> {
return this.bitswap.want(cid, options)
}
}
2 changes: 2 additions & 0 deletions packages/helia/src/block-brokers/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { BitswapBlockBroker } from './bitswap-block-broker.js'
export { TrustedGatewayBlockBroker } from './trustless-gateway-block-broker.js'
78 changes: 78 additions & 0 deletions packages/helia/src/block-brokers/trustless-gateway-block-broker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import { logger } from '@libp2p/logger'
import type { BlockRetriever } from '@helia/interface/blocks'
import type { AbortOptions } from 'interface-store'
import type { CID } from 'multiformats/cid'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

const log = logger('helia:trustless-gateway-block-provider')

export type TrustlessGatewayGetBlockProgressEvents =
ProgressEvent<'trustless-gateway:get-block:fetch', URL>

/**
* A class that accepts a list of trustless gateways that are queried
* for blocks.
*/
export class TrustedGatewayBlockBroker implements BlockRetriever<
ProgressOptions<TrustlessGatewayGetBlockProgressEvents>
> {
private readonly gateways: URL[]

constructor (urls: Array<string | URL>) {
this.gateways = urls.map(url => new URL(url.toString()))
}

async retrieve (cid: CID, options: AbortOptions & ProgressOptions<TrustlessGatewayGetBlockProgressEvents> = {}): Promise<Uint8Array> {
// choose a gateway
const url = this.gateways[Math.floor(Math.random() * this.gateways.length)]

log('getting block for %c from %s', cid, url)

try {
const block = await getRawBlockFromGateway(url, cid, options.signal)
log('got block for %c from %s', cid, url)

return block
} catch (err: any) {
log.error('failed to get block for %c from %s', cid, url, err)

throw err
}
}
}

async function getRawBlockFromGateway (url: URL, cid: CID, signal?: AbortSignal): Promise<Uint8Array> {
const gwUrl = new URL(url)
gwUrl.pathname = `/ipfs/${cid.toString()}`

// necessary as not every gateway supports dag-cbor, but every should support
// sending raw block as-is
gwUrl.search = '?format=raw'

if (signal?.aborted === true) {
throw new Error(`Signal to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted prior to fetch`)
}

try {
const res = await fetch(gwUrl.toString(), {
signal,
headers: {
// also set header, just in case ?format= is filtered out by some
// reverse proxy
Accept: 'application/vnd.ipld.raw'
},
cache: 'force-cache'
})
if (!res.ok) {
throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()}`)
}
return new Uint8Array(await res.arrayBuffer())
} catch (cause) {
// @ts-expect-error - TS thinks signal?.aborted can only be false now
// because it was checked for true above.
if (signal?.aborted === true) {
throw new Error(`fetching raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted`)
}
throw new Error(`unable to fetch raw block for CID ${cid}`)
}
}
38 changes: 5 additions & 33 deletions packages/helia/src/helia.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import { start, stop } from '@libp2p/interface/startable'
import { logger } from '@libp2p/logger'
import { type Bitswap, createBitswap } from 'ipfs-bitswap'
import drain from 'it-drain'
import { identity } from 'multiformats/hashes/identity'
import { sha256, sha512 } from 'multiformats/hashes/sha2'
import { CustomProgressEvent } from 'progress-events'
import { PinsImpl } from './pins.js'
import { BlockStorage } from './storage.js'
Expand All @@ -15,7 +13,6 @@ import type { Libp2p } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
import type { Datastore } from 'interface-datastore'
import type { CID } from 'multiformats/cid'
import type { MultihashHasher } from 'multiformats/hashes/interface'

const log = logger('helia')

Expand All @@ -31,34 +28,10 @@ export class HeliaImpl implements Helia {
public datastore: Datastore
public pins: Pins

#bitswap?: Bitswap

constructor (init: HeliaImplInit) {
const hashers: MultihashHasher[] = [
sha256,
sha512,
identity,
...(init.hashers ?? [])
]

this.#bitswap = createBitswap(init.libp2p, init.blockstore, {
hashLoader: {
getHasher: async (codecOrName: string | number): Promise<MultihashHasher<number>> => {
const hasher = hashers.find(hasher => {
return hasher.code === codecOrName || hasher.name === codecOrName
})

if (hasher != null) {
return hasher
}

throw new Error(`Could not load hasher for code/name "${codecOrName}"`)
}
}
})

const networkedStorage = new NetworkedStorage(init.blockstore, {
bitswap: this.#bitswap
blockBrokers: init.blockBrokers,
hashers: init.hashers
})

this.pins = new PinsImpl(init.datastore, networkedStorage, init.dagWalkers ?? [])
Expand All @@ -72,14 +45,13 @@ export class HeliaImpl implements Helia {

async start (): Promise<void> {
await assertDatastoreVersionIsCurrent(this.datastore)

await this.#bitswap?.start()
await start(this.blockstore)
await this.libp2p.start()
}

async stop (): Promise<void> {
await this.libp2p.stop()
await this.#bitswap?.stop()
await stop(this.blockstore)
}

async gc (options: GCOptions = {}): Promise<void> {
Expand Down
37 changes: 36 additions & 1 deletion packages/helia/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import { logger } from '@libp2p/logger'
import { MemoryBlockstore } from 'blockstore-core'
import { MemoryDatastore } from 'datastore-core'
import { BitswapBlockBroker, TrustedGatewayBlockBroker } from './block-brokers/index.js'
import { HeliaImpl } from './helia.js'
import { defaultHashers } from './utils/default-hashers.js'
import { createLibp2p } from './utils/libp2p.js'
import { name, version } from './version.js'
import type { DefaultLibp2pServices } from './utils/libp2p-defaults.js'
import type { Helia } from '@helia/interface'
import type { BlockBroker } from '@helia/interface/blocks'
import type { Libp2p } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
import type { Datastore } from 'interface-datastore'
Expand Down Expand Up @@ -91,6 +94,12 @@ export interface HeliaInit<T extends Libp2p = Libp2p> {
*/
dagWalkers?: DAGWalker[]

/**
* A list of strategies used to fetch blocks when they are not present in
* the local blockstore
*/
blockBrokers?: BlockBroker[]

/**
* Pass `false` to not start the Helia node
*/
Expand All @@ -114,6 +123,23 @@ export interface HeliaInit<T extends Libp2p = Libp2p> {
holdGcLock?: boolean
}

const DEFAULT_TRUSTLESS_GATEWAYS = [
// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://dweb.link',

// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://cf-ipfs.com',

// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://4everland.io',

// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://w3s.link',

// 2023-10-03: IPNS, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://cloudflare-ipfs.com'
]

/**
* Create and return a Helia node
*/
Expand All @@ -131,11 +157,20 @@ export async function createHelia (init: HeliaInit = {}): Promise<Helia<unknown>
libp2p = await createLibp2p(datastore, init.libp2p)
}

const hashers = defaultHashers(init.hashers)

const blockBrokers = init.blockBrokers ?? [
new BitswapBlockBroker(libp2p, blockstore, hashers),
new TrustedGatewayBlockBroker(DEFAULT_TRUSTLESS_GATEWAYS)
]

const helia = new HeliaImpl({
...init,
datastore,
blockstore,
libp2p
libp2p,
blockBrokers,
hashers
})

if (init.start !== false) {
Expand Down
19 changes: 18 additions & 1 deletion packages/helia/src/storage.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { start, stop, type Startable } from '@libp2p/interface/startable'
import createMortice from 'mortice'
import type { Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions } from '@helia/interface/blocks'
import type { Pins } from '@helia/interface/pins'
Expand All @@ -21,10 +22,11 @@ export interface GetOptions extends AbortOptions {
* blockstore (that may be on disk, s3, or something else). If the blocks are
* not present Bitswap will be used to fetch them from network peers.
*/
export class BlockStorage implements Blocks {
export class BlockStorage implements Blocks, Startable {
public lock: Mortice
private readonly child: Blockstore
private readonly pins: Pins
private started: boolean

/**
* Create a new BlockStorage
Expand All @@ -35,6 +37,21 @@ export class BlockStorage implements Blocks {
this.lock = createMortice({
singleProcess: options.holdGcLock
})
this.started = false
}

isStarted (): boolean {
return this.started
}

async start (): Promise<void> {
await start(this.child)
this.started = true
}

async stop (): Promise<void> {
await stop(this.child)
this.started = false
}

unwrap (): Blockstore {
Expand Down
12 changes: 12 additions & 0 deletions packages/helia/src/utils/default-hashers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { identity } from 'multiformats/hashes/identity'
import { sha256, sha512 } from 'multiformats/hashes/sha2'
import type { MultihashHasher } from 'multiformats/hashes/interface'

export function defaultHashers (hashers: MultihashHasher[] = []): MultihashHasher[] {
return [
sha256,
sha512,
identity,
...hashers
]
}
Loading

0 comments on commit 0749cbf

Please sign in to comment.