Skip to content

Commit

Permalink
feat: use big file downloader (#180)
Browse files Browse the repository at this point in the history
This PR uses `@storacha/unixfs-dl` to make ranged requests to big unixfs
files in order to not exhaust the upstream worker of CPU time.

Also switches to using the `Link` type for CIDs instead of `string`.
This allows us to detect if the CID is using the "raw" IPLD codec.
  • Loading branch information
Alan Shaw authored Sep 25, 2024
1 parent db444e0 commit 57832cc
Show file tree
Hide file tree
Showing 13 changed files with 290 additions and 80 deletions.
2 changes: 1 addition & 1 deletion packages/edge-gateway/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"ipfs-core-utils": "^0.15.0",
"ipfs-gateway-race": "link:../ipfs-gateway-race",
"itty-router": "^2.4.5",
"multiformats": "^9.6.4",
"multiformats": "^13.3.0",
"p-any": "^4.0.0",
"p-defer": "^4.0.0",
"p-retry": "^5.0.0",
Expand Down
46 changes: 17 additions & 29 deletions packages/edge-gateway/src/gateway.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* eslint-env serviceworker, browser */
/* global Response caches */
/* global Response caches, IdentityTransformStream */

import pAny, { AggregateError } from 'p-any'
import pDefer from 'p-defer'
Expand All @@ -9,7 +9,8 @@ import { gatewayFetch } from 'ipfs-gateway-race'

import {
getCidFromSubdomainUrl,
toDenyListAnchor
toDenyListAnchor,
getCidFromEtag
} from './utils/cid.js'
import { getHeaders } from './utils/headers.js'
import { getCidForbiddenResponse } from './utils/verification.js'
Expand Down Expand Up @@ -133,7 +134,7 @@ export async function gatewayGet (request, env, ctx) {
} = await getFromGatewayRacer(cid, pathname, search, getHeaders(request), env, ctx)

// Validation layer - resource CID
const resourceCid = pathname !== '/' ? getCidFromEtag(winnerGwResponse.headers.get('etag') || cid) : cid
const resourceCid = pathname !== '/' ? getCidFromEtag(winnerGwResponse.headers.get('etag') || `"${cid}"`) : cid
if (winnerGwResponse && pathname !== '/' && resourceCid) {
const resourceCidForbiddenResponse = await getCidForbiddenResponse(resourceCid, env)
if (resourceCidForbiddenResponse) {
Expand Down Expand Up @@ -206,7 +207,7 @@ async function getFromCdn (request, env, cache) {
/**
* @param {Request} request
* @param {Env} env
* @param {string} cid
* @param {import('multiformats').UnknownLink} cid
* @param {{ pathname?: string, search?: string }} [options]
* @return {Promise<ProxiedCDNResponse | undefined>}
*/
Expand Down Expand Up @@ -249,9 +250,14 @@ async function getFromDotstorage (request, env, cid, options = {}) {
}),
...env.cdnGateways.map(async (host) => {
const gwResponse = await gatewayFetch(host, cid, pathname, {
method: request.method,
timeout: env.CDN_REQUEST_TIMEOUT,
headers: request.headers,
search
search,
// Cloudflare's IdentityTransformStream provides a zero copy
// passthrough alternative to TransformStream.
// https://developers.cloudflare.com/workers/runtime-apis/streams/transformstream/#identitytransformstream
IdentityTransformStream: IdentityTransformStream
})

// @ts-ignore 'response' does not exist on type 'GatewayResponseFailure'
Expand All @@ -274,7 +280,7 @@ async function getFromDotstorage (request, env, cid, options = {}) {

/**
*
* @param {string} cid
* @param {import('multiformats').UnknownLink} cid
* @param {string} pathname
* @param {string} search
* @param {Headers} headers
Expand Down Expand Up @@ -308,7 +314,11 @@ async function getFromGatewayRacer (cid, pathname, search, headers, env, ctx) {
reportRaceResults(env, gatewayResponsePromises, undefined, gatewayControllers)
)
}
}
},
// Cloudflare's IdentityTransformStream provides a zero copy
// passthrough alternative to TransformStream.
// https://developers.cloudflare.com/workers/runtime-apis/streams/transformstream/#identitytransformstream
IdentityTransformStream: IdentityTransformStream
})
if (!layerOneIsWinner) {
throw new Error('no winner in the first race')
Expand Down Expand Up @@ -443,28 +453,6 @@ function getResponseWithCustomHeaders (
return clonedResponse
}

/**
* Extracting resource CID from etag based on
* https://github.com/ipfs/specs/blob/main/http-gateways/PATH_GATEWAY.md#etag-response-header
*
* @param {string} etag
*/
function getCidFromEtag (etag) {
let resourceCid = decodeURIComponent(etag)

// Handle weak etag
resourceCid = resourceCid.replace('W/', '')
resourceCid = resourceCid.replaceAll('"', '')

// Handle directory index generated
if (etag.includes('DirIndex')) {
const split = resourceCid.split('-')
resourceCid = split[split.length - 1]
}

return resourceCid
}

/**
* Async metrics for race.
*
Expand Down
31 changes: 26 additions & 5 deletions packages/edge-gateway/src/utils/cid.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Multibases } from 'ipfs-core-utils/multibases'
import { bases } from 'multiformats/basics'
import { CID } from 'multiformats/cid'
import { parse } from 'multiformats/link'
import * as uint8arrays from 'uint8arrays'
import { sha256 } from 'multiformats/hashes/sha2'

Expand Down Expand Up @@ -31,14 +31,13 @@ export async function getCidFromSubdomainUrl (url) {
}

/**
* Parse CID and return normalized b32 v1.
* Parse CID and return normalized v1.
*
* @param {string} cid
*/
export async function normalizeCid (cid) {
const baseDecoder = await getMultibaseDecoder(cid)
const c = CID.parse(cid, baseDecoder)
return c.toV1().toString()
return parse(cid, baseDecoder).toV1()
}

/**
Expand All @@ -61,10 +60,32 @@ async function getMultibaseDecoder (cid) {
/**
* Get denylist anchor with badbits format.
*
* @param {string} cid
* @param {import('multiformats').UnknownLink} cid
*/
export async function toDenyListAnchor (cid) {
const multihash = await sha256.digest(uint8arrays.fromString(`${cid}/`))
const digest = multihash.bytes.subarray(2)
return uint8arrays.toString(digest, 'hex')
}

/**
* Extracting resource CID from etag based on
* https://github.com/ipfs/specs/blob/main/http-gateways/PATH_GATEWAY.md#etag-response-header
*
* @param {string} etag
*/
export function getCidFromEtag (etag) {
let resourceCid = decodeURIComponent(etag)

// Handle weak etag
resourceCid = resourceCid.replace('W/', '')
resourceCid = resourceCid.replaceAll('"', '')

// Handle directory index generated
if (etag.includes('DirIndex')) {
const split = resourceCid.split('-')
resourceCid = split[split.length - 1]
}

return parse(resourceCid)
}
2 changes: 1 addition & 1 deletion packages/edge-gateway/src/utils/verification.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
/**
* Checks to see if denylist or cid-verifier forbid this CID from being served.
*
* @param {string} cid
* @param {import('multiformats').UnknownLink} cid
* @param {import('../env').Env} env
*/
export async function getCidForbiddenResponse (cid, env) {
Expand Down
45 changes: 33 additions & 12 deletions packages/ipfs-gateway-race/lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import pAny, { AggregateError } from 'p-any'
import { FilterError } from 'p-some'
import pSettle from 'p-settle'
import fetch, { Headers } from '@web-std/fetch'

import * as UnixFSDownloader from '@storacha/unixfs-dl'
import * as raw from 'multiformats/codecs/raw'
import {
NotFoundError,
GatewayTimeoutError,
Expand All @@ -15,6 +16,7 @@ import {
ABORT_CODE,
DEFAULT_REQUEST_TIMEOUT
} from './constants.js'
import { isAlternateFormatRequest, isRangeRequest } from './request.js'

const nop = () => {}

Expand All @@ -38,7 +40,7 @@ export class IpfsGatewayRacer {
}

/**
* @param {string} cid
* @param {import('multiformats').UnknownLink} cid
* @param {IpfsGatewayRaceGetOptions} [options]
* @return {Promise<Response>}
*/
Expand All @@ -53,13 +55,15 @@ export class IpfsGatewayRacer {
/** @type {GatewayResponsePromise[]} */
const gatewayResponsePromises = this.ipfsGateways.map((gwUrl) =>
gatewayFetch(gwUrl, cid, pathname, {
method: options.method,
headers,
search,
timeout: this.timeout,
// Combine internal race winner controller signal with custom user signal
signal: gatewaySignals[gwUrl]
? anySignal([raceControllers[gwUrl].signal, gatewaySignals[gwUrl]])
: raceControllers[gwUrl].signal
: raceControllers[gwUrl].signal,
IdentityTransformStream: options.IdentityTransformStream
})
)

Expand Down Expand Up @@ -128,35 +132,52 @@ export function createGatewayRacer (ipfsGateways, options = {}) {
* Fetches given CID from given IPFS gateway URL.
*
* @param {string} gwUrl
* @param {string} cid
* @param {import('multiformats').UnknownLink} cid
* @param {string} pathname
* @param {Object} [options]
* @param {string} [options.method]
* @param {Headers} [options.headers]
* @param {string} [options.search]
* @param {number} [options.timeout]
* @param {AbortSignal} [options.signal]
* @param {typeof TransformStream} [options.IdentityTransformStream]
*/
export async function gatewayFetch (
gwUrl,
cid,
pathname,
options = {}
) {
const { headers, signal } = options
const method = options.method || 'GET'
const headers = options.headers || new Headers()
const timeout = options.timeout || 60000
const search = options.search || ''
const timeoutController = new AbortController()
const timer = setTimeout(() => timeoutController.abort(), timeout)
// Combine timeout signal with done signal
const signal = options.signal
? anySignal([timeoutController.signal, options.signal])
: timeoutController.signal
const url = new URL(`ipfs/${cid}${pathname}${search}`, gwUrl)

let response
try {
response = await fetch(new URL(`ipfs/${cid}${pathname}${search}`, gwUrl), {
// Combine timeout signal with done signal
signal: signal
? anySignal([timeoutController.signal, signal])
: timeoutController.signal,
headers
})
// If this is an atypical request, i.e. a HEAD request, a range request, a
// request for a different format to UnixFS, or if the root CID is for a raw
// block then just make the request to the upstream as usual.
if (
method !== 'GET' ||
cid.code === raw.code ||
isRangeRequest(headers) ||
isAlternateFormatRequest(headers, url.searchParams)
) {
response = await fetch(url, { method, signal, headers })
} else {
// Otherwise use the unixfs downloader to make byte range requests
// upstream allowing big files to be downloaded without exhausting the
// upstream worker's CPU budget.
response = await UnixFSDownloader.fetch(url, { signal, headers, IdentityTransformStream: options.IdentityTransformStream })
}
} catch (error) {
if (timeoutController.signal.aborted) {
return {
Expand Down
17 changes: 17 additions & 0 deletions packages/ipfs-gateway-race/lib/request.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/**
* Determine if the request is for a specific byte range.
* @param {Headers} headers
*/
export const isRangeRequest = headers => headers.get('Range') !== null

/**
* Determine if the request is for an alternative format, like an IPLD block or
* a CAR file.
* @param {Headers} headers
* @param {URLSearchParams} searchParams
*/
export const isAlternateFormatRequest = (headers, searchParams) => {
const format = searchParams.get('format')
const accept = headers.get('Accept')
return Boolean(format || accept?.includes('application/vnd.ipld.'))
}
3 changes: 2 additions & 1 deletion packages/ipfs-gateway-race/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@
"gateway"
],
"dependencies": {
"@storacha/unixfs-dl": "^1.5.1",
"@web-std/fetch": "^4.1.0",
"any-signal": "^3.0.1",
"multiformats": "^9.6.4",
"multiformats": "^13.3.0",
"p-any": "^4.0.0",
"p-map": "^5.3.0",
"p-retry": "^5.0.0",
Expand Down
Loading

0 comments on commit 57832cc

Please sign in to comment.