Skip to content

Commit

Permalink
fix: upgrade filecoin api with content store to rely on roundabout
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Apr 29, 2024
1 parent 5779161 commit 2e5bb6a
Show file tree
Hide file tree
Showing 7 changed files with 292 additions and 358 deletions.
8 changes: 4 additions & 4 deletions filecoin/functions/handle-filecoin-submit-message.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import * as Sentry from '@sentry/serverless'
import * as storefrontEvents from '@web3-storage/filecoin-api/storefront/events'

import { createPieceTable } from '../store/piece.js'
import { createDataStore, composeDataStoresWithOrderedStream } from '../store/data.js'
import { createContentStore, composeContentStoresWithOrderedStream } from '../store/content.js'
import { decodeMessage } from '../queue/filecoin-submit-queue.js'
import { mustGetEnv } from './utils.js'

Expand Down Expand Up @@ -46,9 +46,9 @@ async function handleFilecoinSubmitMessage (sqsEvent) {
} = getEnv()
const context = {
pieceStore: createPieceTable(AWS_REGION, pieceTableName),
dataStore: composeDataStoresWithOrderedStream(
createDataStore(AWS_REGION, s3BucketName),
createDataStore(R2_REGION, r2BucketName, {
contentStore: composeContentStoresWithOrderedStream(
createContentStore(AWS_REGION, s3BucketName),
createContentStore(R2_REGION, r2BucketName, {
endpoint: r2BucketEndpoint,
credentials: {
accessKeyId: r2BucketAccessKeyId,
Expand Down
4 changes: 2 additions & 2 deletions filecoin/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
"@ucanto/principal": "^9.0.1",
"@ucanto/transport": "^9.1.1",
"@web3-storage/data-segment": "^5.1.0",
"@web3-storage/filecoin-api": "^5.0.0",
"@web3-storage/filecoin-client": "^3.3.1",
"@web3-storage/filecoin-api": "^6.0.0",
"@web3-storage/filecoin-client": "^3.3.2",
"fr32-sha2-256-trunc254-padded-binary-tree-multihash": "^3.3.0",
"multiformats": "^13.1.0",
"p-retry": "^6.2.0",
Expand Down
66 changes: 19 additions & 47 deletions filecoin/store/data.js → filecoin/store/content.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import {
S3Client,
GetObjectCommand,
PutObjectCommand
GetObjectCommand
} from '@aws-sdk/client-s3'
import * as CAR from '@ucanto/transport/car'
import { sha256 } from 'multiformats/hashes/sha2'
import { CID } from 'multiformats/cid'
import * as raw from 'multiformats/codecs/raw'
import { CarWriter } from '@ipld/car'
import pRetry from 'p-retry'
import { StoreOperationFailed, RecordNotFound } from '@web3-storage/filecoin-api/errors'

/**
* @typedef {import('multiformats').UnknownLink} UnknownLink
* @typedef {import('@web3-storage/filecoin-api/storefront/api').ContentStore<UnknownLink, Uint8Array>} ContentStore
*/

/**
* Abstraction layer with Factory to perform operations on bucket storing
* data receipts.
Expand All @@ -19,49 +18,22 @@ import { StoreOperationFailed, RecordNotFound } from '@web3-storage/filecoin-api
* @param {string} bucketName
* @param {import('@aws-sdk/client-s3').ServiceInputTypes} [options]
*/
export function createDataStore(region, bucketName, options = {}) {
export function createContentStore(region, bucketName, options = {}) {
// TODO: Roundabout url
const s3client = new S3Client({
region,
...options,
})
return useDataStore(s3client, bucketName)
return useContentStore(s3client, bucketName)
}

/**
* @param {S3Client} s3client
* @param {string} bucketName
* @returns {import('@web3-storage/filecoin-api/storefront/api').DataStore}
* @returns {ContentStore}
*/
export const useDataStore = (s3client, bucketName) => {
export const useContentStore = (s3client, bucketName) => {
return {
// Only used for testing storing a CAR
// until we hook up claims to look for data
put: async (bytes) => {
const hash = await sha256.digest(bytes)
const root = CID.create(1, raw.code, hash)

const { writer, out } = CarWriter.create(root)
writer.put({ cid: root, bytes })
writer.close()

const chunks = []
for await (const chunk of out) {
chunks.push(chunk)
}
const blob = new Blob(chunks)
const cid = await CAR.codec.link(new Uint8Array(await blob.arrayBuffer()))

const putCmd = new PutObjectCommand({
Bucket: bucketName,
Key: `${cid.toString()}/${cid.toString()}.car`,
Body: bytes
})
await s3client.send(putCmd)

return {
ok: {}
}
},
/**
* Stream Blob bytes for a given invocation.
*/
Expand Down Expand Up @@ -111,22 +83,22 @@ export const useDataStore = (s3client, bucketName) => {
}

/**
* compose many data stores.
* compose many content stores.
* store#stream will check stores in order until 0-1 `ok` result is found.
*
* @param {import('@web3-storage/filecoin-api/storefront/api').DataStore} dataStore
* @param {Array<import('@web3-storage/filecoin-api/storefront/api').DataStore>} moreDataStores
* @returns {import('@web3-storage/filecoin-api/storefront/api').DataStore}
* @param {ContentStore} contentStore
* @param {Array<ContentStore>} moreContentStores
* @returns {ContentStore}
*/
export function composeDataStoresWithOrderedStream(dataStore, ...moreDataStores) {
export function composeContentStoresWithOrderedStream(contentStore, ...moreContentStores) {
return {
...dataStore,
stream: composeSome(dataStore.stream, ...moreDataStores.map(s => s.stream.bind(s))),
...contentStore,
stream: composeSome(contentStore.stream, ...moreContentStores.map(s => s.stream.bind(s))),
}
}

/**
* @typedef {AsyncIterable<Uint8Array>} Rec
* @typedef {ReadableStream<Uint8Array>} Rec
* @typedef {import('@web3-storage/filecoin-api/types').StoreGetError} StoreGetError
* @typedef {import('@ucanto/interface').Result<Rec, StoreGetError>} Result
*/
Expand Down
61 changes: 55 additions & 6 deletions filecoin/test/helpers/service-context.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import * as CAR from '@ucanto/transport/car'
import pRetry from 'p-retry'
import {
PutObjectCommand,
} from '@aws-sdk/client-s3'
import { PutObjectCommand } from '@aws-sdk/client-s3'
import { sha256 } from 'multiformats/hashes/sha2'
import { CID } from 'multiformats/cid'
import * as raw from 'multiformats/codecs/raw'
import { CarWriter } from '@ipld/car'

import { createBucket } from './resources.js'
import { createDynamoTable } from './tables.js'
Expand All @@ -12,7 +14,7 @@ import { encodeAgentMessage } from './ucan.js'
import { pieceTableProps } from '../../store/index.js'

// store clients
import { useDataStore as createDataStoreClient } from '../../store/data.js'
import { useContentStore as createContentStoreClient } from '../../store/content.js'
import { usePieceTable as createPieceStoreClient } from '../../store/piece.js'
import { useTaskStore as createTaskStoreClient } from '../../store/task.js'
import { useReceiptStore as createReceiptStoreClient } from '../../store/receipt.js'
Expand All @@ -21,13 +23,18 @@ import { useReceiptStore as createReceiptStoreClient } from '../../store/receipt
import { createClient as createPieceOfferQueueClient } from '../../queue/piece-offer-queue.js'
import { createClient as createFilecoinSubmitQueueClient } from '../../queue/filecoin-submit-queue.js'

/**
* @typedef {import('multiformats').UnknownLink} UnknownLink
* @typedef {import('@web3-storage/filecoin-api/test/types').TestContentStore<UnknownLink, Uint8Array>} TestContentStore
*/

/**
* @param {import('./context.js').DynamoContext & import('./context.js').S3Context} ctx
*/
export async function getStores (ctx) {
const { dynamoClient, s3Client } = ctx
const pieceStore = await createDynamoTable(dynamoClient, pieceTableProps)
const [ invocationBucketName, workflowBucketName, dataStoreBucketName ] = await Promise.all([
const [ invocationBucketName, workflowBucketName, contentStoreBucketName ] = await Promise.all([
createBucket(s3Client),
createBucket(s3Client),
createBucket(s3Client),
Expand All @@ -37,7 +44,8 @@ export async function getStores (ctx) {
pieceStore: createPieceStoreClient(dynamoClient, pieceStore),
taskStore: getTaskStoreClient(s3Client, invocationBucketName, workflowBucketName),
receiptStore: getReceiptStoreClient(s3Client, invocationBucketName, workflowBucketName),
dataStore: createDataStoreClient(s3Client, dataStoreBucketName)
contentStore: createContentStoreClient(s3Client, contentStoreBucketName),
testContentStore: getTestContentStoreClient(s3Client, contentStoreBucketName),
}
}

Expand All @@ -55,6 +63,47 @@ export function getQueues (ctx) {
}
}

/**
* @param {import('@aws-sdk/client-s3').S3Client} s3Client
* @param {string} bucketName
* @returns {TestContentStore}
*/
function getTestContentStoreClient(s3Client, bucketName) {
const contentStore = createContentStoreClient(s3Client, bucketName)

return {
...contentStore,
// Only used for testing storing a CAR
// until we hook up claims to look for data
put: async (bytes) => {
const hash = await sha256.digest(bytes)
const root = CID.create(1, raw.code, hash)

const { writer, out } = CarWriter.create(root)
writer.put({ cid: root, bytes })
writer.close()

const chunks = []
for await (const chunk of out) {
chunks.push(chunk)
}
const blob = new Blob(chunks)
const cid = await CAR.codec.link(new Uint8Array(await blob.arrayBuffer()))

const putCmd = new PutObjectCommand({
Bucket: bucketName,
Key: `${cid.toString()}/${cid.toString()}.car`,
Body: bytes
})
await s3Client.send(putCmd)

return {
ok: {}
}
},
}
}

/**
* @param {import('@aws-sdk/client-s3').S3Client} s3client
* @param {string} invocationBucketName
Expand Down
Loading

0 comments on commit 2e5bb6a

Please sign in to comment.