diff --git a/packages/verified-fetch/package.json b/packages/verified-fetch/package.json index efaffc4..9600146 100644 --- a/packages/verified-fetch/package.json +++ b/packages/verified-fetch/package.json @@ -57,13 +57,12 @@ "release": "aegir release" }, "dependencies": { - "@helia/block-brokers": "^2.0.3", - "@helia/car": "^3.1.2", - "@helia/http": "^1.0.3", - "@helia/interface": "^4.1.0", - "@helia/ipns": "^7.2.0", - "@helia/routers": "^1.0.2", - "@helia/unixfs": "^3.0.3", + "@helia/block-brokers": "^2.1.0", + "@helia/car": "^3.1.3", + "@helia/http": "^1.0.4", + "@helia/interface": "^4.2.0", + "@helia/ipns": "^7.2.1", + "@helia/routers": "^1.0.3", "@ipld/dag-cbor": "^9.2.0", "@ipld/dag-json": "^10.2.0", "@ipld/dag-pb": "^4.1.0", @@ -85,11 +84,12 @@ "uint8arrays": "^5.0.3" }, "devDependencies": { - "@helia/car": "^3.1.2", - "@helia/dag-cbor": "^3.0.2", - "@helia/dag-json": "^3.0.2", - "@helia/json": "^3.0.2", - "@helia/utils": "^0.1.0", + "@helia/car": "^3.1.3", + "@helia/dag-cbor": "^3.0.3", + "@helia/dag-json": "^3.0.3", + "@helia/json": "^3.0.3", + "@helia/unixfs": "^3.0.4", + "@helia/utils": "^0.2.0", "@ipld/car": "^5.3.0", "@libp2p/interface-compliance-tests": "^5.3.4", "@libp2p/logger": "^4.0.9", @@ -100,10 +100,11 @@ "blockstore-core": "^4.4.1", "browser-readablestream-to-it": "^2.0.5", "datastore-core": "^9.2.9", - "helia": "^4.1.0", + "helia": "^4.1.1", "ipfs-unixfs-importer": "^15.2.5", "ipns": "^9.1.0", "it-all": "^3.0.4", + "it-drain": "^3.0.5", "it-last": "^3.0.4", "it-to-buffer": "^4.0.5", "magic-bytes.js": "^1.10.0", diff --git a/packages/verified-fetch/src/index.ts b/packages/verified-fetch/src/index.ts index f5c3c97..100a569 100644 --- a/packages/verified-fetch/src/index.ts +++ b/packages/verified-fetch/src/index.ts @@ -594,11 +594,11 @@ import { createHeliaHTTP } from '@helia/http' import { delegatedHTTPRouting } from '@helia/routers' import { dns } from '@multiformats/dns' import { VerifiedFetch as VerifiedFetchClass } from './verified-fetch.js' -import type { Helia } from '@helia/interface' +import type { GetBlockProgressEvents, Helia } from '@helia/interface' import type { ResolveDNSLinkProgressEvents } from '@helia/ipns' -import type { GetEvents } from '@helia/unixfs' import type { DNSResolvers, DNS } from '@multiformats/dns' import type { DNSResolver } from '@multiformats/dns/resolvers' +import type { ExporterProgressEvents } from 'ipfs-unixfs-exporter' import type { CID } from 'multiformats/cid' import type { ProgressEvent, ProgressOptions } from 'progress-events' @@ -674,8 +674,10 @@ export interface ContentTypeParser { } export type BubbledProgressEvents = - // unixfs - GetEvents | + // unixfs-exporter + ExporterProgressEvents | + // helia blockstore + GetBlockProgressEvents | // ipns ResolveDNSLinkProgressEvents diff --git a/packages/verified-fetch/src/verified-fetch.ts b/packages/verified-fetch/src/verified-fetch.ts index 26dc33a..8323106 100644 --- a/packages/verified-fetch/src/verified-fetch.ts +++ b/packages/verified-fetch/src/verified-fetch.ts @@ -1,6 +1,5 @@ import { car } from '@helia/car' import { ipns as heliaIpns, type IPNS } from '@helia/ipns' -import { unixfs as heliaUnixFs, type UnixFS as HeliaUnixFs } from '@helia/unixfs' import * as ipldDagCbor from '@ipld/dag-cbor' import * as ipldDagJson from '@ipld/dag-json' import { code as dagPbCode } from '@ipld/dag-pb' @@ -8,6 +7,7 @@ import { type AbortOptions, type Logger, type PeerId } from '@libp2p/interface' import { Record as DHTRecord } from '@libp2p/kad-dht' import { peerIdFromString } from '@libp2p/peer-id' import { Key } from 'interface-datastore' +import { exporter } from 'ipfs-unixfs-exporter' import toBrowserReadableStream from 'it-to-browser-readablestream' import { code as jsonCode } from 'multiformats/codecs/json' import { code as rawCode } from 'multiformats/codecs/raw' @@ -39,7 +39,6 @@ import type { CID } from 'multiformats/cid' interface VerifiedFetchComponents { helia: Helia ipns?: IPNS - unixfs?: HeliaUnixFs } /** @@ -128,15 +127,13 @@ function getOverridenRawContentType ({ headers, accept }: { headers?: HeadersIni export class VerifiedFetch { private readonly helia: Helia private readonly ipns: IPNS - private readonly unixfs: HeliaUnixFs private readonly log: Logger private readonly contentTypeParser: ContentTypeParser | undefined - constructor ({ helia, ipns, unixfs }: VerifiedFetchComponents, init?: VerifiedFetchInit) { + constructor ({ helia, ipns }: VerifiedFetchComponents, init?: VerifiedFetchInit) { this.helia = helia this.log = helia.logger.forComponent('helia:verified-fetch') this.ipns = ipns ?? heliaIpns(helia) - this.unixfs = unixfs ?? heliaUnixFs(helia) this.contentTypeParser = init?.contentTypeParser this.log.trace('created VerifiedFetch instance') } @@ -350,14 +347,15 @@ export class VerifiedFetch { const rootFilePath = 'index.html' try { this.log.trace('found directory at %c/%s, looking for index.html', cid, path) - const stat = await this.unixfs.stat(dirCid, { - path: rootFilePath, + + const entry = await exporter(`/ipfs/${dirCid}/${rootFilePath}`, this.helia.blockstore, { signal: options?.signal, onProgress: options?.onProgress }) - this.log.trace('found root file at %c/%s with cid %c', dirCid, rootFilePath, stat.cid) + + this.log.trace('found root file at %c/%s with cid %c', dirCid, rootFilePath, entry.cid) path = rootFilePath - resolvedCID = stat.cid + resolvedCID = entry.cid } catch (err: any) { options?.signal?.throwIfAborted() this.log('error loading path %c/%s', dirCid, rootFilePath, err) @@ -375,16 +373,22 @@ export class VerifiedFetch { } const offset = byteRangeContext.offset const length = byteRangeContext.length - this.log.trace('calling unixfs.cat for %c/%s with offset=%o & length=%o', resolvedCID, path, offset, length) - const asyncIter = this.unixfs.cat(resolvedCID, { - signal: options?.signal, - onProgress: options?.onProgress, - offset, - length - }) - this.log('got async iterator for %c/%s', cid, path) + this.log.trace('calling exporter for %c/%s with offset=%o & length=%o', resolvedCID, path, offset, length) try { + const entry = await exporter(resolvedCID, this.helia.blockstore, { + signal: options?.signal, + onProgress: options?.onProgress + }) + + const asyncIter = entry.content({ + signal: options?.signal, + onProgress: options?.onProgress, + offset, + length + }) + this.log('got async iterator for %c/%s', cid, path) + const { stream, firstChunk } = await getStreamFromAsyncIterable(asyncIter, path ?? '', this.helia.logger, { onProgress: options?.onProgress, signal: options?.signal @@ -400,7 +404,6 @@ export class VerifiedFetch { if (ipfsRoots != null) { response.headers.set('X-Ipfs-Roots', ipfsRoots.map(cid => cid.toV1().toString()).join(',')) // https://specs.ipfs.tech/http-gateways/path-gateway/#x-ipfs-roots-response-header } - return response } catch (err: any) { options?.signal?.throwIfAborted() diff --git a/packages/verified-fetch/test/abort-handling.spec.ts b/packages/verified-fetch/test/abort-handling.spec.ts index 1c2e9f6..cacbed1 100644 --- a/packages/verified-fetch/test/abort-handling.spec.ts +++ b/packages/verified-fetch/test/abort-handling.spec.ts @@ -1,10 +1,13 @@ import { dagCbor } from '@helia/dag-cbor' import { type DNSLinkResolveResult, type IPNS, type IPNSResolveResult } from '@helia/ipns' -import { type UnixFS, type UnixFSStats, unixfs } from '@helia/unixfs' +import { unixfs } from '@helia/unixfs' import { stop, type ComponentLogger, type Logger } from '@libp2p/interface' import { prefixLogger, logger as libp2pLogger } from '@libp2p/logger' import { createEd25519PeerId } from '@libp2p/peer-id-factory' import { expect } from 'aegir/chai' +import browserReadableStreamToIt from 'browser-readablestream-to-it' +import { fixedSize } from 'ipfs-unixfs-importer/chunker' +import drain from 'it-drain' import { CID } from 'multiformats/cid' import pDefer, { type DeferredPromise } from 'p-defer' import Sinon from 'sinon' @@ -13,7 +16,7 @@ import { VerifiedFetch } from '../src/verified-fetch.js' import { createHelia } from './fixtures/create-offline-helia.js' import { getAbortablePromise } from './fixtures/get-abortable-promise.js' import { makeAbortedRequest } from './fixtures/make-aborted-request.js' -import type { BlockRetriever, Helia } from '@helia/interface' +import type { BlockBroker, Helia } from '@helia/interface' describe('abort-handling', function () { this.timeout(500) // these tests should all fail extremely quickly. if they don't, they're not aborting properly, or they're being ran on an extremely slow machine. @@ -24,7 +27,6 @@ describe('abort-handling', function () { const notPublishedCid = CID.parse('bafybeichqiz32cw5c3vdpvh2xtfgl42veqbsr6sw2g6c7ffz6atvh2vise') let helia: Helia let name: StubbedInstance - let fs: StubbedInstance let logger: ComponentLogger let componentLoggers: Logger[] = [] let verifiedFetch: VerifiedFetch @@ -32,11 +34,9 @@ describe('abort-handling', function () { /** * Stubbed networking components */ - let blockRetriever: StubbedInstance + let blockRetriever: StubbedInstance>> let dnsLinkResolver: Sinon.SinonStub> let peerIdResolver: Sinon.SinonStub> - let unixFsCatStub: Sinon.SinonStub> - let unixFsStatStub: Sinon.SinonStub> /** * used as promises to pass to makeAbortedRequest that will abort the request as soon as it's resolved. @@ -44,19 +44,13 @@ describe('abort-handling', function () { let blockBrokerRetrieveCalled: DeferredPromise let dnsLinkResolverCalled: DeferredPromise let peerIdResolverCalled: DeferredPromise - let unixFsStatCalled: DeferredPromise - let unixFsCatCalled: DeferredPromise beforeEach(async () => { peerIdResolver = sandbox.stub() dnsLinkResolver = sandbox.stub() - unixFsCatStub = sandbox.stub() - unixFsStatStub = sandbox.stub() peerIdResolverCalled = pDefer() dnsLinkResolverCalled = pDefer() blockBrokerRetrieveCalled = pDefer() - unixFsStatCalled = pDefer() - unixFsCatCalled = pDefer() dnsLinkResolver.withArgs('timeout-5000-example.com', Sinon.match.any).callsFake(async (_domain, options) => { dnsLinkResolverCalled.resolve() @@ -66,35 +60,12 @@ describe('abort-handling', function () { peerIdResolverCalled.resolve() return getAbortablePromise(options.signal) }) - blockRetriever = stubInterface({ + blockRetriever = stubInterface>>({ retrieve: sandbox.stub().callsFake(async (cid, options) => { blockBrokerRetrieveCalled.resolve() return getAbortablePromise(options.signal) }) }) - unixFsCatStub.callsFake((cid, options) => { - unixFsCatCalled.resolve() - return { - async * [Symbol.asyncIterator] () { - await getAbortablePromise(options.signal) - yield new Uint8Array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) - } - } - }) - - unixFsStatStub.callsFake(async (cid, options): Promise => { - unixFsStatCalled.resolve() - await getAbortablePromise(options.signal) - return { - cid, - type: 'file', - fileSize: BigInt(0), - dagSize: BigInt(0), - blocks: 1, - localFileSize: BigInt(0), - localDagSize: BigInt(0) - } - }) logger = prefixLogger('test:abort-handling') sandbox.stub(logger, 'forComponent').callsFake((name) => { @@ -110,14 +81,9 @@ describe('abort-handling', function () { resolveDNSLink: dnsLinkResolver, resolve: peerIdResolver }) - fs = stubInterface({ - cat: unixFsCatStub, - stat: unixFsStatStub - }) verifiedFetch = new VerifiedFetch({ helia, - ipns: name, - unixfs: fs + ipns: name }) }) @@ -159,39 +125,137 @@ describe('abort-handling', function () { expect(blockRetriever.retrieve.callCount).to.equal(1) }) - it('should abort a request during unixfs.stat call', async function () { + it('should abort a request while loading a file root', async function () { const fs = unixfs(helia) - const fileCid = await fs.addBytes(Uint8Array.from([0, 1, 2, 3])) + + // add a file with a very small chunk size - this is to ensure we end up + // with a DAG that contains a root and some leaf nodes + const fileCid = await fs.addBytes(Uint8Array.from([0, 1, 2, 3]), { + chunker: fixedSize({ chunkSize: 2 }) + }) const directoryCid = await fs.addDirectory() const cid = await fs.cp(fileCid, directoryCid, 'index.html') - await expect(makeAbortedRequest(verifiedFetch, [cid], unixFsStatCalled.promise)).to.eventually.be.rejectedWith('aborted') + const leaf1 = CID.parse('bafkreifucp2h2e7of7tmqrns5ykbv6a55bmn6twfjgsyw6lqxolgiw6i2i') + const leaf2 = CID.parse('bafkreihosbapmxbudbk6a4h7iohlb2u5lobrwkrme4h3p32zfv2qichdwm') - expect(peerIdResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString - expect(dnsLinkResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString - expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because the blockstore has the content - expect(unixFsStatStub.callCount).to.equal(1) - expect(unixFsCatStub.callCount).to.equal(0) // not called because the request was aborted during .stat call + // file root + await expect(helia.blockstore.has(fileCid)) + .to.eventually.be.true() + + // leaf nodes + await expect(helia.blockstore.has(leaf1)) + .to.eventually.be.true() + await expect(helia.blockstore.has(leaf2)) + .to.eventually.be.true() + + const fileRootGot = pDefer() + const blockstoreGetSpy = Sinon.stub(helia.blockstore, 'get') + blockstoreGetSpy.callsFake(async (cid, options) => { + if (cid.equals(fileCid)) { + fileRootGot.resolve() + } + + return blockstoreGetSpy.wrappedMethod.call(helia.blockstore, cid, options) + }) + + await expect(makeAbortedRequest(verifiedFetch, [cid], fileRootGot.promise)) + .to.eventually.be.rejectedWith('aborted') + + // not called because parseResource never passes the resource to + // parseUrlString + expect(peerIdResolver.callCount).to.equal(0) + + // not called because parseResource never passes the resource to + // parseUrlString + expect(dnsLinkResolver.callCount).to.equal(0) + + // not called because the blockstore has the content + expect(blockRetriever.retrieve.callCount).to.equal(0) + + // the file root was loaded + expect(blockstoreGetSpy.getCalls().map(call => call.args[0].toString())) + .to.include(fileCid.toString()) + + // the leaf nodes were not loaded because the request was aborted + // after the root node was loaded + expect(blockstoreGetSpy.getCalls().map(call => call.args[0].toString())) + .to.not.include(leaf1.toString()) + expect(blockstoreGetSpy.getCalls().map(call => call.args[0].toString())) + .to.not.include(leaf2.toString()) }) - it('should abort a request during unixfs.cat call', async function () { + it('should abort a request while loading file data', async function () { const fs = unixfs(helia) - const fileCid = await fs.addBytes(Uint8Array.from([0, 1, 2, 3])) + + // add a file with a very small chunk size - this is to ensure we end up + // with a DAG that contains a root and some leaf nodes + const fileCid = await fs.addBytes(Uint8Array.from([0, 1, 2, 3]), { + chunker: fixedSize({ chunkSize: 2 }) + }) const directoryCid = await fs.addDirectory() const cid = await fs.cp(fileCid, directoryCid, 'index.html') - // override the default fake set in beforeEach that would timeout. - unixFsStatStub.callsFake(async (cid, options) => { - unixFsStatCalled.resolve() - return fs.stat(cid, options) + const leaf1 = CID.parse('bafkreifucp2h2e7of7tmqrns5ykbv6a55bmn6twfjgsyw6lqxolgiw6i2i') + const leaf2 = CID.parse('bafkreihosbapmxbudbk6a4h7iohlb2u5lobrwkrme4h3p32zfv2qichdwm') + + // file root + await expect(helia.blockstore.has(fileCid)) + .to.eventually.be.true() + + // leaf nodes + await expect(helia.blockstore.has(leaf1)) + .to.eventually.be.true() + await expect(helia.blockstore.has(leaf2)) + .to.eventually.be.true() + + const leaf1Got = pDefer() + let leaf2Loaded = false + const blockstoreGetSpy = Sinon.stub(helia.blockstore, 'get') + blockstoreGetSpy.callsFake(async (cid, options) => { + if (cid.equals(leaf1)) { + leaf1Got.resolve() + } + + const b = await blockstoreGetSpy.wrappedMethod.call(helia.blockstore, cid, options) + + if (cid.equals(leaf2)) { + leaf2Loaded = true + } + + return b }) - await expect(makeAbortedRequest(verifiedFetch, [cid], unixFsCatCalled.promise)).to.eventually.be.rejectedWith('aborted') + const response = await makeAbortedRequest(verifiedFetch, [cid], leaf1Got.promise) - expect(peerIdResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString - expect(dnsLinkResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString - expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because the blockstore has the content - expect(unixFsStatStub.callCount).to.equal(1) - expect(unixFsCatStub.callCount).to.equal(1) + if (response.body == null) { + throw new Error('Body was not set') + } + + // error occurs during streaming response + await expect(drain(browserReadableStreamToIt(response.body))) + .to.eventually.be.rejectedWith('aborted') + + // not called because parseResource never passes the resource to + // parseUrlString + expect(peerIdResolver.callCount).to.equal(0) + + // not called because parseResource never passes the resource to + // parseUrlString + expect(dnsLinkResolver.callCount).to.equal(0) + + // not called because the blockstore has the content + expect(blockRetriever.retrieve.callCount).to.equal(0) + + // the file root was loaded + expect(blockstoreGetSpy.getCalls().map(call => call.args[0].toString())) + .to.include(fileCid.toString()) + + // the first leaf was loaded + expect(blockstoreGetSpy.getCalls().map(call => call.args[0].toString())) + .to.include(leaf1.toString()) + + // the signal was aborted before the second leaf was loaded + expect(leaf2Loaded).to.be.false() }) })