From 5b1372736b76780d25bc1b2b3af831ecfbe15736 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 19 Sep 2024 17:03:50 +0100 Subject: [PATCH] feat: use big file downloader --- packages/edge-gateway/src/gateway.js | 7 +++-- packages/ipfs-gateway-race/lib/index.js | 38 +++++++++++++++++------ packages/ipfs-gateway-race/lib/request.js | 17 ++++++++++ packages/ipfs-gateway-race/package.json | 1 + packages/ipfs-gateway-race/types.d.ts | 2 ++ 5 files changed, 54 insertions(+), 11 deletions(-) create mode 100644 packages/ipfs-gateway-race/lib/request.js diff --git a/packages/edge-gateway/src/gateway.js b/packages/edge-gateway/src/gateway.js index 64a38e3..a110989 100644 --- a/packages/edge-gateway/src/gateway.js +++ b/packages/edge-gateway/src/gateway.js @@ -247,9 +247,11 @@ async function getFromDotstorage (request, env, cid, options = {}) { }), ...env.cdnGateways.map(async (host) => { const gwResponse = await gatewayFetch(host, cid, pathname, { + method: request.method, timeout: env.CDN_REQUEST_TIMEOUT, headers: request.headers, - search + search, + TransformStream: IdentityTransformStream }) // @ts-ignore 'response' does not exist on type 'GatewayResponseFailure' @@ -306,7 +308,8 @@ async function getFromGatewayRacer (cid, pathname, search, headers, env, ctx) { reportRaceResults(env, gatewayResponsePromises, undefined, gatewayControllers) ) } - } + }, + TransformStream: IdentityTransformStream }) if (!layerOneIsWinner) { throw new Error('no winner in the first race') diff --git a/packages/ipfs-gateway-race/lib/index.js b/packages/ipfs-gateway-race/lib/index.js index c685fa4..1f09124 100644 --- a/packages/ipfs-gateway-race/lib/index.js +++ b/packages/ipfs-gateway-race/lib/index.js @@ -4,6 +4,7 @@ import pAny, { AggregateError } from 'p-any' import { FilterError } from 'p-some' import pSettle from 'p-settle' import fetch, { Headers } from '@web-std/fetch' +import * as UnixFSDownloader from '@storacha/unixfs-dl' import { NotFoundError, @@ -15,6 +16,7 @@ import { ABORT_CODE, DEFAULT_REQUEST_TIMEOUT } from './constants.js' +import { isAlternateFormatRequest, isRangeRequest } from './request.js' const nop = () => {} @@ -53,13 +55,15 @@ export class IpfsGatewayRacer { /** @type {GatewayResponsePromise[]} */ const gatewayResponsePromises = this.ipfsGateways.map((gwUrl) => gatewayFetch(gwUrl, cid, pathname, { + method: options.method, headers, search, timeout: this.timeout, // Combine internal race winner controller signal with custom user signal signal: gatewaySignals[gwUrl] ? anySignal([raceWinnerController.signal, gatewaySignals[gwUrl]]) - : raceWinnerController.signal + : raceWinnerController.signal, + TransformStream: options.TransformStream }) ) @@ -126,10 +130,12 @@ export function createGatewayRacer (ipfsGateways, options = {}) { * @param {string} cid * @param {string} pathname * @param {Object} [options] + * @param {string} [options.method] * @param {Headers} [options.headers] * @param {string} [options.search] * @param {number} [options.timeout] * @param {AbortSignal} [options.signal] + * @param {typeof TransformStream} [options.TransformStream] */ export async function gatewayFetch ( gwUrl, @@ -137,21 +143,35 @@ export async function gatewayFetch ( pathname, options = {} ) { - const { headers, signal } = options + const method = options.method || 'GET' + const headers = options.headers || new Headers() const timeout = options.timeout || 60000 const search = options.search || '' const timeoutController = new AbortController() const timer = setTimeout(() => timeoutController.abort(), timeout) + // Combine timeout signal with done signal + const signal = options.signal + ? anySignal([timeoutController.signal, options.signal]) + : timeoutController.signal + const url = new URL(`ipfs/${cid}${pathname}${search}`, gwUrl) let response try { - response = await fetch(new URL(`ipfs/${cid}${pathname}${search}`, gwUrl), { - // Combine timeout signal with done signal - signal: signal - ? anySignal([timeoutController.signal, signal]) - : timeoutController.signal, - headers - }) + // If this is an atypical request, i.e. it's a HEAD request, a range request + // or a request for a different format to UnixFS, then just make the request + // to the upstream as usual. + if ( + method !== 'GET' || + isRangeRequest(headers) || + isAlternateFormatRequest(headers, url.searchParams) + ) { + response = await fetch(url, { signal, headers }) + } else { + // Otherwise use the unixfs downloader to make byte range requests + // upstream allowing big files to be downloaded without exhausting + // the upstream worker's CPU budget. + response = await UnixFSDownloader.fetch(url, { signal, TransformStream: options.TransformStream }) + } } catch (error) { if (timeoutController.signal.aborted) { return { diff --git a/packages/ipfs-gateway-race/lib/request.js b/packages/ipfs-gateway-race/lib/request.js new file mode 100644 index 0000000..0acb3f9 --- /dev/null +++ b/packages/ipfs-gateway-race/lib/request.js @@ -0,0 +1,17 @@ +/** + * Determine if the request is for a specific byte range. + * @param {Headers} headers + */ +export const isRangeRequest = headers => headers.get('Range') != null + +/** + * Determine if the request is for an alternative format, like an IPLD block or + * a CAR file. + * @param {Headers} headers + * @param {URLSearchParams} searchParams + */ +export const isAlternateFormatRequest = (headers, searchParams) => { + const format = searchParams.get('format') + const accept = headers.get('Accept') + return Boolean(format || accept?.includes('application/vnd.ipld.')) +} diff --git a/packages/ipfs-gateway-race/package.json b/packages/ipfs-gateway-race/package.json index 4b3e880..7a57769 100644 --- a/packages/ipfs-gateway-race/package.json +++ b/packages/ipfs-gateway-race/package.json @@ -30,6 +30,7 @@ "gateway" ], "dependencies": { + "@storacha/unixfs-dl": "^1.3.0", "@web-std/fetch": "^4.1.0", "any-signal": "^3.0.1", "multiformats": "^9.6.4", diff --git a/packages/ipfs-gateway-race/types.d.ts b/packages/ipfs-gateway-race/types.d.ts index ac4ff13..4f24838 100644 --- a/packages/ipfs-gateway-race/types.d.ts +++ b/packages/ipfs-gateway-race/types.d.ts @@ -3,12 +3,14 @@ export interface IpfsGatewayRacerOptions { } export interface IpfsGatewayRaceGetOptions { + method?: string pathname?: string search?: string headers?: Headers noAbortRequestsOnWinner?: boolean onRaceEnd?: (gatewayResponsePromises: GatewayResponsePromise[], winnerResponse: GatewayResponse | undefined) => void gatewaySignals?: Record + TransformStream?: typeof TransformStream } // Gateway Race Responses