Skip to content

Commit

Permalink
fix: check for blob/accept receipts before blob/add is concluded (#1459)
Browse files Browse the repository at this point in the history
- Verifies that `blob/accept` has succeeded before concluding `blob/add`
as per
#1425 (comment). A
location commitment is returned instead of the blob multihash.

---------

Co-authored-by: Vasco Santos <santos.vasco10@gmail.com>
Co-authored-by: Alan Shaw <alan.shaw@protocol.ai>
  • Loading branch information
3 people authored Jun 4, 2024
1 parent 7944077 commit 462518c
Show file tree
Hide file tree
Showing 22 changed files with 553 additions and 93 deletions.
1 change: 1 addition & 0 deletions packages/blob-index/src/index.js
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * as ShardedDAGIndex from './sharded-dag-index.js'
export * from './digest-map.js'
export { indexShardedDAG } from './util.js'
16 changes: 16 additions & 0 deletions packages/blob-index/src/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,19 @@ export const fromShardArchives = async (content, shards) => {
}
return index
}

/**
* Indexes a sharded DAG
*
* @param {import('multiformats').Link} root
* @param {import('@web3-storage/capabilities/types').CARLink[]} shards
* @param {Array<Map<API.SliceDigest, API.Position>>} shardIndexes
*/
export async function indexShardedDAG(root, shards, shardIndexes) {
const index = create(root)
for (const [i, shard] of shards.entries()) {
const slices = shardIndexes[i]
index.shards.set(shard.multihash, slices)
}
return await index.archive()
}
2 changes: 2 additions & 0 deletions packages/upload-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"test:node": "hundreds -r html -r text mocha 'test/**/!(*.browser).test.js' -n experimental-vm-modules -n no-warnings",
"test:browser": "playwright-test 'test/**/!(*.node).test.js'",
"mock": "run-p mock:*",
"mock:receipts-server": "PORT=9201 node test/helpers/receipts-server.js",
"mock:bucket-200": "PORT=9200 STATUS=200 node test/helpers/bucket-server.js",
"mock:bucket-401": "PORT=9400 STATUS=400 node test/helpers/bucket-server.js",
"mock:bucket-500": "PORT=9500 STATUS=500 node test/helpers/bucket-server.js",
Expand Down Expand Up @@ -94,6 +95,7 @@
"@types/varint": "^6.0.1",
"@ucanto/principal": "^9.0.1",
"@ucanto/server": "^10.0.0",
"@web3-storage/content-claims": "^5.0.0",
"@web3-storage/eslint-config-w3up": "workspace:^",
"assert": "^2.0.0",
"blockstore-core": "^3.0.0",
Expand Down
26 changes: 23 additions & 3 deletions packages/upload-client/src/blob.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ import { sha256 } from 'multiformats/hashes/sha2'
import { ed25519 } from '@ucanto/principal'
import { conclude } from '@web3-storage/capabilities/ucan'
import * as UCAN from '@web3-storage/capabilities/ucan'
import { Receipt } from '@ucanto/core'
import { Delegation, Receipt } from '@ucanto/core'
import * as W3sBlobCapabilities from '@web3-storage/capabilities/web3.storage/blob'
import * as BlobCapabilities from '@web3-storage/capabilities/blob'
import * as HTTPCapabilities from '@web3-storage/capabilities/http'
import { SpaceDID } from '@web3-storage/capabilities/utils'
import retry, { AbortError } from 'p-retry'
import { servicePrincipal, connection } from './service.js'
import { REQUEST_RETRIES } from './constants.js'
import { poll } from './receipts.js'

/**
* @param {string} url
Expand Down Expand Up @@ -166,7 +167,7 @@ export function createConcludeInvocation(id, serviceDid, receipt) {
* The issuer needs the `blob/add` delegated capability.
* @param {Blob|Uint8Array} data Blob data.
* @param {import('./types.js').RequestOptions} [options]
* @returns {Promise<import('multiformats').MultihashDigest>}
* @returns {Promise<import('./types.js').BlobAddOk>}
*/
export async function add(
{ issuer, with: resource, proofs, audience },
Expand Down Expand Up @@ -303,7 +304,26 @@ export async function add(
})
}

return multihash
// Ensure the blob has been accepted
const acceptReceipt = await poll(nextTasks.accept.task.link(), options)

const blocks = new Map(
[...acceptReceipt.iterateIPLDBlocks()].map((block) => [
`${block.cid}`,
block,
])
)
const site = Delegation.view({
root: /** @type {import('@ucanto/interface').UCANLink} */ (
acceptReceipt.out.ok.site
),
blocks,
})

return {
multihash,
site,
}
}

/**
Expand Down
20 changes: 8 additions & 12 deletions packages/upload-client/src/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import * as PieceHasher from '@web3-storage/data-segment/multihash'
import { Storefront } from '@web3-storage/filecoin-client'
import { ShardedDAGIndex } from '@web3-storage/blob-index'
import * as Link from 'multiformats/link'
import * as raw from 'multiformats/codecs/raw'
import * as Store from './store.js'
Expand All @@ -10,10 +9,12 @@ import * as Upload from './upload.js'
import * as UnixFS from './unixfs.js'
import * as CAR from './car.js'
import { ShardingStream, defaultFileComparator } from './sharding.js'
import { codec as carCodec } from '@ucanto/transport/car'
import { indexShardedDAG } from '@web3-storage/blob-index'

export { Blob, Index, Store, Upload, UnixFS, CAR }
export * from './sharding.js'
export { receiptsEndpoint } from './service.js'
export * as Receipt from './receipts.js'

/**
* Uploads a file to the service and returns the root data CID for the
Expand Down Expand Up @@ -144,9 +145,9 @@ async function uploadBlockStream(
async transform(car, controller) {
const bytes = new Uint8Array(await car.arrayBuffer())
// Invoke blob/add and write bytes to write target
const multihash = await Blob.add(conf, bytes, options)
const { multihash } = await Blob.add(conf, bytes, options)
// Should this be raw instead?
const cid = Link.create(carCodec.code, multihash)
const cid = Link.create(CAR.code, multihash)
let piece
if (pieceHasher) {
const multihashDigest = await pieceHasher.digest(bytes)
Expand Down Expand Up @@ -199,20 +200,15 @@ async function uploadBlockStream(
/* c8 ignore next */
if (!root) throw new Error('missing root CID')

const index = ShardedDAGIndex.create(root)
for (const [i, shard] of shards.entries()) {
const slices = shardIndexes[i]
index.shards.set(shard.multihash, slices)
}
const indexBytes = await index.archive()
const indexBytes = await indexShardedDAG(root, shards, shardIndexes)
/* c8 ignore next 3 */
if (!indexBytes.ok) {
throw new Error('failed to archive DAG index', { cause: indexBytes.error })
}

// Store the index in the space
const indexDigest = await Blob.add(conf, indexBytes.ok, options)
const indexLink = Link.create(carCodec.code, indexDigest)
const { multihash } = await Blob.add(conf, indexBytes.ok, options)
const indexLink = Link.create(CAR.code, multihash)

// Register the index with the service
await Index.add(conf, indexLink, options)
Expand Down
119 changes: 119 additions & 0 deletions packages/upload-client/src/receipts.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import retry, { AbortError } from 'p-retry'
import { CAR } from '@ucanto/transport'
import { receiptsEndpoint } from './service.js'
import { REQUEST_RETRIES } from './constants.js'

export class ReceiptNotFound extends Error {
/**
* @param {import('multiformats').UnknownLink} taskCid
*/
constructor(taskCid) {
super()
this.taskCid = taskCid
}

/* c8 ignore start */
get reason() {
return `receipt not found for task ${this.taskCid} in the indexed workflow`
}
/* c8 ignore end */

get name() {
return 'ReceiptNotFound'
}
}

export class ReceiptMissing extends Error {
/**
* @param {import('multiformats').UnknownLink} taskCid
*/
constructor(taskCid) {
super()
this.taskCid = taskCid
}

/* c8 ignore start */
get reason() {
return `receipt missing for task ${this.taskCid}`
}
/* c8 ignore end */

get name() {
return 'ReceiptMissing'
}
}

/**
* Polls for a receipt for an executed task by its CID.
*
* @param {import('multiformats').UnknownLink} taskCid
* @param {import('./types.js').RequestOptions} [options]
* @returns {Promise<import('@ucanto/interface').Receipt>}
*/
export async function poll(taskCid, options = {}) {
return await retry(
async () => {
const res = await get(taskCid, options)
if (res.error) {
// @ts-ignore
if (res.error.name === 'ReceiptNotFound') {
// throw an error that will cause `p-retry` to retry with
throw res.error
} else {
throw new AbortError(
new Error('failed to fetch blob/accept receipt', {
cause: res.error,
})
)
}
}
return res.ok
},
{
onFailedAttempt: console.warn,
/* c8 ignore next */
retries: options.retries ?? REQUEST_RETRIES,
}
)
}

/**
* Get a receipt for an executed task by its CID.
*
* @param {import('multiformats').UnknownLink} taskCid
* @param {import('./types.js').RequestOptions} [options]
* @returns {Promise<import('@ucanto/client').Result<import('@ucanto/interface').Receipt, Error>>}
*/
async function get(taskCid, options = {}) {
// Fetch receipt from endpoint
const url = new URL(
taskCid.toString(),
options.receiptsEndpoint ?? receiptsEndpoint
)
const fetchReceipt = options.fetch ?? globalThis.fetch.bind(globalThis)
const workflowResponse = await fetchReceipt(url)
/* c8 ignore start */
if (workflowResponse.status === 404) {
return {
error: new ReceiptNotFound(taskCid),
}
}
/* c8 ignore stop */
// Get receipt from Message Archive
const agentMessageBytes = new Uint8Array(await workflowResponse.arrayBuffer())
// Decode message
const agentMessage = await CAR.request.decode({
body: agentMessageBytes,
headers: {},
})
// Get receipt from the potential multiple receipts in the message
const receipt = agentMessage.receipts.get(taskCid.toString())
if (!receipt) {
return {
error: new ReceiptMissing(taskCid),
}
}
return {
ok: receipt,
}
}
1 change: 1 addition & 0 deletions packages/upload-client/src/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as DID from '@ipld/dag-ucan/did'

export const serviceURL = new URL('https://up.web3.storage')
export const servicePrincipal = DID.parse('did:web:web3.storage')
export const receiptsEndpoint = 'https://up.web3.storage/receipt/'

/** @type {import('@ucanto/interface').ConnectionView<import('./types.js').Service>} */
export const connection = connect({
Expand Down
16 changes: 15 additions & 1 deletion packages/upload-client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ import type {
FetchOptions as IpfsUtilsFetchOptions,
ProgressStatus as XHRProgressStatus,
} from 'ipfs-utils/src/types.js'
import { Link, UnknownLink, Version, MultihashHasher } from 'multiformats'
import {
MultihashDigest,
Link,
UnknownLink,
Version,
MultihashHasher,
} from 'multiformats'
import { Block } from '@ipld/unixfs'
import {
ServiceMethod,
Expand All @@ -12,6 +18,8 @@ import {
DID,
Principal,
Failure,
Delegation,
Capabilities,
} from '@ucanto/interface'
import {
UCANConclude,
Expand Down Expand Up @@ -307,6 +315,7 @@ export interface RequestOptions
UploadProgressTrackable {
fetch?: typeof globalThis.fetch
nonce?: string
receiptsEndpoint?: string
}

export interface ListRequestOptions extends RequestOptions, Pageable {}
Expand Down Expand Up @@ -374,3 +383,8 @@ export interface FileLike extends BlobLike {
*/
name: string
}

export interface BlobAddOk {
multihash: MultihashDigest
site: Delegation<Capabilities>
}
Loading

0 comments on commit 462518c

Please sign in to comment.