Skip to content

Commit

Permalink
feat: use big file downloader
Browse files Browse the repository at this point in the history
  • Loading branch information
Alan Shaw committed Sep 19, 2024
1 parent ae464e5 commit 5b13727
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 11 deletions.
7 changes: 5 additions & 2 deletions packages/edge-gateway/src/gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,11 @@ async function getFromDotstorage (request, env, cid, options = {}) {
}),
...env.cdnGateways.map(async (host) => {
const gwResponse = await gatewayFetch(host, cid, pathname, {
method: request.method,
timeout: env.CDN_REQUEST_TIMEOUT,
headers: request.headers,
search
search,
TransformStream: IdentityTransformStream
})

// @ts-ignore 'response' does not exist on type 'GatewayResponseFailure'
Expand Down Expand Up @@ -306,7 +308,8 @@ async function getFromGatewayRacer (cid, pathname, search, headers, env, ctx) {
reportRaceResults(env, gatewayResponsePromises, undefined, gatewayControllers)
)
}
}
},
TransformStream: IdentityTransformStream
})
if (!layerOneIsWinner) {
throw new Error('no winner in the first race')
Expand Down
38 changes: 29 additions & 9 deletions packages/ipfs-gateway-race/lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import pAny, { AggregateError } from 'p-any'
import { FilterError } from 'p-some'
import pSettle from 'p-settle'
import fetch, { Headers } from '@web-std/fetch'
import * as UnixFSDownloader from '@storacha/unixfs-dl'

import {
NotFoundError,
Expand All @@ -15,6 +16,7 @@ import {
ABORT_CODE,
DEFAULT_REQUEST_TIMEOUT
} from './constants.js'
import { isAlternateFormatRequest, isRangeRequest } from './request.js'

const nop = () => {}

Expand Down Expand Up @@ -53,13 +55,15 @@ export class IpfsGatewayRacer {
/** @type {GatewayResponsePromise[]} */
const gatewayResponsePromises = this.ipfsGateways.map((gwUrl) =>
gatewayFetch(gwUrl, cid, pathname, {
method: options.method,
headers,
search,
timeout: this.timeout,
// Combine internal race winner controller signal with custom user signal
signal: gatewaySignals[gwUrl]
? anySignal([raceWinnerController.signal, gatewaySignals[gwUrl]])
: raceWinnerController.signal
: raceWinnerController.signal,
TransformStream: options.TransformStream
})
)

Expand Down Expand Up @@ -126,32 +130,48 @@ export function createGatewayRacer (ipfsGateways, options = {}) {
* @param {string} cid
* @param {string} pathname
* @param {Object} [options]
* @param {string} [options.method]
* @param {Headers} [options.headers]
* @param {string} [options.search]
* @param {number} [options.timeout]
* @param {AbortSignal} [options.signal]
* @param {typeof TransformStream} [options.TransformStream]
*/
export async function gatewayFetch (
gwUrl,
cid,
pathname,
options = {}
) {
const { headers, signal } = options
const method = options.method || 'GET'
const headers = options.headers || new Headers()
const timeout = options.timeout || 60000
const search = options.search || ''
const timeoutController = new AbortController()
const timer = setTimeout(() => timeoutController.abort(), timeout)
// Combine timeout signal with done signal
const signal = options.signal
? anySignal([timeoutController.signal, options.signal])
: timeoutController.signal
const url = new URL(`ipfs/${cid}${pathname}${search}`, gwUrl)

let response
try {
response = await fetch(new URL(`ipfs/${cid}${pathname}${search}`, gwUrl), {
// Combine timeout signal with done signal
signal: signal
? anySignal([timeoutController.signal, signal])
: timeoutController.signal,
headers
})
// If this is an atypical request, i.e. it's a HEAD request, a range request
// or a request for a different format to UnixFS, then just make the request
// to the upstream as usual.
if (
method !== 'GET' ||
isRangeRequest(headers) ||
isAlternateFormatRequest(headers, url.searchParams)
) {
response = await fetch(url, { signal, headers })
} else {
// Otherwise use the unixfs downloader to make byte range requests
// upstream allowing big files to be downloaded without exhausting
// the upstream worker's CPU budget.
response = await UnixFSDownloader.fetch(url, { signal, TransformStream: options.TransformStream })
}
} catch (error) {
if (timeoutController.signal.aborted) {
return {
Expand Down
17 changes: 17 additions & 0 deletions packages/ipfs-gateway-race/lib/request.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/**
* Determine if the request is for a specific byte range.
* @param {Headers} headers
*/
export const isRangeRequest = headers => headers.get('Range') != null

/**
* Determine if the request is for an alternative format, like an IPLD block or
* a CAR file.
* @param {Headers} headers
* @param {URLSearchParams} searchParams
*/
export const isAlternateFormatRequest = (headers, searchParams) => {
const format = searchParams.get('format')
const accept = headers.get('Accept')
return Boolean(format || accept?.includes('application/vnd.ipld.'))
}
1 change: 1 addition & 0 deletions packages/ipfs-gateway-race/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"gateway"
],
"dependencies": {
"@storacha/unixfs-dl": "^1.3.0",
"@web-std/fetch": "^4.1.0",
"any-signal": "^3.0.1",
"multiformats": "^9.6.4",
Expand Down
2 changes: 2 additions & 0 deletions packages/ipfs-gateway-race/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ export interface IpfsGatewayRacerOptions {
}

export interface IpfsGatewayRaceGetOptions {
method?: string
pathname?: string
search?: string
headers?: Headers
noAbortRequestsOnWinner?: boolean
onRaceEnd?: (gatewayResponsePromises: GatewayResponsePromise[], winnerResponse: GatewayResponse | undefined) => void
gatewaySignals?: Record<string, AbortSignal>
TransformStream?: typeof TransformStream
}

// Gateway Race Responses
Expand Down

0 comments on commit 5b13727

Please sign in to comment.