From 338885f20277a25277ba9192d8e15cca95e640e4 Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Sat, 20 Apr 2024 09:07:43 -0700 Subject: [PATCH] fix: prevent duplicate trustless-gateway reqs (#503) Fixes https://github.com/ipfs-shipyard/service-worker-gateway/issues/104 This PR fixes issues brought up in service-worker-gateway where sub-resources end up causing multiple requests to a trustless gateway for the root CID. --- packages/block-brokers/.aegir.js | 11 +++ packages/block-brokers/package.json | 1 + .../trustless-gateway/trustless-gateway.ts | 81 ++++++++++++++----- .../test/trustless-gateway.spec.ts | 31 ++++++- 4 files changed, 105 insertions(+), 19 deletions(-) diff --git a/packages/block-brokers/.aegir.js b/packages/block-brokers/.aegir.js index 28f61b801..35d044d85 100644 --- a/packages/block-brokers/.aegir.js +++ b/packages/block-brokers/.aegir.js @@ -17,6 +17,17 @@ const options = { }) res.end(Uint8Array.from([0, 1, 2, 0])) }) + server.all('/ipfs/bafkqabtimvwgy3yk', async (req, res) => { + // delay the response + await new Promise((resolve) => setTimeout(resolve, 500)) + + res.writeHead(200, { + 'content-type': 'application/octet-stream', + 'content-length': 5 + }) + // "hello" + res.end(Uint8Array.from([104, 101, 108, 108, 111])) + }) await server.listen() const { port } = server.server.address() diff --git a/packages/block-brokers/package.json b/packages/block-brokers/package.json index 45f38b39c..e0e1a42f4 100644 --- a/packages/block-brokers/package.json +++ b/packages/block-brokers/package.json @@ -70,6 +70,7 @@ "@libp2p/logger": "^4.0.7", "@libp2p/peer-id-factory": "^4.0.7", "@multiformats/uri-to-multiaddr": "^8.0.0", + "@types/polka": "^0.5.7", "@types/sinon": "^17.0.3", "aegir": "^42.2.5", "cors": "^2.8.5", diff --git a/packages/block-brokers/src/trustless-gateway/trustless-gateway.ts b/packages/block-brokers/src/trustless-gateway/trustless-gateway.ts index a5ee1ef7e..499ffd521 100644 --- a/packages/block-brokers/src/trustless-gateway/trustless-gateway.ts +++ b/packages/block-brokers/src/trustless-gateway/trustless-gateway.ts @@ -1,6 +1,15 @@ +import { base64 } from 'multiformats/bases/base64' import type { ComponentLogger, Logger } from '@libp2p/interface' import type { CID } from 'multiformats/cid' +export interface TrustlessGatewayStats { + attempts: number + errors: number + invalidBlocks: number + successes: number + pendingResponses?: number +} + /** * A `TrustlessGateway` keeps track of the number of attempts, errors, and * successes for a given gateway url so that we can prioritize gateways that @@ -37,6 +46,13 @@ export class TrustlessGateway { */ #successes = 0 + /** + * A map of pending responses for this gateway. This is used to ensure that + * only one request per CID is made to a given gateway at a time, and that we + * don't make multiple in-flight requests for the same CID to the same gateway. + */ + #pendingResponses = new Map>() + private readonly log: Logger constructor (url: URL | string, logger: ComponentLogger) { @@ -44,6 +60,20 @@ export class TrustlessGateway { this.log = logger.forComponent(`helia:trustless-gateway-block-broker:${this.url.hostname}`) } + /** + * This function returns a unique string for the multihash.bytes of the CID. + * + * Some useful resources for why this is needed can be found using the links below: + * + * - https://github.com/ipfs/helia/pull/503#discussion_r1572451331 + * - https://github.com/ipfs/kubo/issues/6815 + * - https://www.notion.so/pl-strflt/Handling-ambiguity-around-CIDs-9d5e14f6516f438980b01ef188efe15d#d9d45cd1ed8b4d349b96285de4aed5ab + */ + #uniqueBlockId (cid: CID): string { + const multihashBytes = cid.multihash.bytes + return base64.encode(multihashBytes) + } + /** * Fetch a raw block from `this.url` following the specification defined at * https://specs.ipfs.tech/http-gateways/trustless-gateway/ @@ -60,26 +90,29 @@ export class TrustlessGateway { throw new Error(`Signal to fetch raw block for CID ${cid} from gateway ${this.url} was aborted prior to fetch`) } + const blockId = this.#uniqueBlockId(cid) try { - this.#attempts++ - const res = await fetch(gwUrl.toString(), { - signal, - headers: { - // also set header, just in case ?format= is filtered out by some - // reverse proxy - Accept: 'application/vnd.ipld.raw' - }, - cache: 'force-cache' - }) - - this.log('GET %s %d', gwUrl, res.status) - - if (!res.ok) { - this.#errors++ - throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${this.url}`) + let pendingResponse: Promise | undefined = this.#pendingResponses.get(blockId) + if (pendingResponse == null) { + this.#attempts++ + pendingResponse = fetch(gwUrl.toString(), { + signal, + headers: { + Accept: 'application/vnd.ipld.raw' + }, + cache: 'force-cache' + }).then(async (res) => { + this.log('GET %s %d', gwUrl, res.status) + if (!res.ok) { + this.#errors++ + throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${this.url}`) + } + this.#successes++ + return new Uint8Array(await res.arrayBuffer()) + }) + this.#pendingResponses.set(blockId, pendingResponse) } - this.#successes++ - return new Uint8Array(await res.arrayBuffer()) + return await pendingResponse } catch (cause) { // @ts-expect-error - TS thinks signal?.aborted can only be false now // because it was checked for true above. @@ -88,6 +121,8 @@ export class TrustlessGateway { } this.#errors++ throw new Error(`unable to fetch raw block for CID ${cid}`) + } finally { + this.#pendingResponses.delete(blockId) } } @@ -130,4 +165,14 @@ export class TrustlessGateway { incrementInvalidBlocks (): void { this.#invalidBlocks++ } + + getStats (): TrustlessGatewayStats { + return { + attempts: this.#attempts, + errors: this.#errors, + invalidBlocks: this.#invalidBlocks, + successes: this.#successes, + pendingResponses: this.#pendingResponses.size + } + } } diff --git a/packages/block-brokers/test/trustless-gateway.spec.ts b/packages/block-brokers/test/trustless-gateway.spec.ts index a932ccea6..b36cff030 100644 --- a/packages/block-brokers/test/trustless-gateway.spec.ts +++ b/packages/block-brokers/test/trustless-gateway.spec.ts @@ -5,6 +5,7 @@ import { createEd25519PeerId } from '@libp2p/peer-id-factory' import { multiaddr } from '@multiformats/multiaddr' import { uriToMultiaddr } from '@multiformats/uri-to-multiaddr' import { expect } from 'aegir/chai' +import { CID } from 'multiformats/cid' import * as raw from 'multiformats/codecs/raw' import Sinon from 'sinon' import { type StubbedInstance, stubConstructor, stubInterface } from 'sinon-ts' @@ -12,7 +13,6 @@ import { TrustlessGatewayBlockBroker } from '../src/trustless-gateway/broker.js' import { TrustlessGateway } from '../src/trustless-gateway/trustless-gateway.js' import { createBlock } from './fixtures/create-block.js' import type { Routing } from '@helia/interface' -import type { CID } from 'multiformats/cid' describe('trustless-gateway-block-broker', () => { let blocks: Array<{ cid: CID, block: Uint8Array }> @@ -190,4 +190,33 @@ describe('trustless-gateway-block-broker', () => { await expect(sessionBlockstore?.retrieve?.(blocks[0].cid)).to.eventually.deep.equal(blocks[0].block) }) + + it('does not trigger new network requests if the same cid request is in-flight', async function () { + // from .aegir.js polka server + const cid = CID.parse('bafkqabtimvwgy3yk') + if (process.env.TRUSTLESS_GATEWAY == null) { + return this.skip() + } + const trustlessGateway = new TrustlessGateway(process.env.TRUSTLESS_GATEWAY, defaultLogger()) + + // Call getRawBlock multiple times with the same CID + const promises = Array.from({ length: 10 }, async () => trustlessGateway.getRawBlock(cid)) + + // Wait for both promises to resolve + const [block1, ...blocks] = await Promise.all(promises) + + // Assert that all calls to getRawBlock returned the same block + for (const block of blocks) { + expect(block).to.deep.equal(block1) + } + + expect(trustlessGateway.getStats()).to.deep.equal({ + // attempt is only incremented when a new request is made + attempts: 1, + errors: 0, + invalidBlocks: 0, + successes: 1, + pendingResponses: 0 // the queue is empty + }) + }) })