diff --git a/packages/filecoin-api/src/aggregator/api.ts b/packages/filecoin-api/src/aggregator/api.ts index ee79b630d..3c3a587bb 100644 --- a/packages/filecoin-api/src/aggregator/api.ts +++ b/packages/filecoin-api/src/aggregator/api.ts @@ -167,6 +167,10 @@ export interface AggregateRecord { * `bagy...aggregate` Piece CID of an aggregate */ aggregate: PieceLink + /** + * `bafy...cbor` as CID of dag-cbor block with buffer of pieces in an aggregate. + */ + buffer: Link /** * `bafy...cbor` as CID of dag-cbor block with list of pieces in an aggregate. */ @@ -298,6 +302,10 @@ export interface AggregateOfferMessage { * List of pieces in an aggregate. */ pieces: Link + /** + * `bafy...cbor` as CID of dag-cbor block with buffer of pieces in an aggregate. + */ + buffer: Link /** * Grouping information for submitted piece. */ diff --git a/packages/filecoin-api/src/aggregator/buffer-reducing.js b/packages/filecoin-api/src/aggregator/buffer-reducing.js index 03f640e0e..7bb792fe9 100644 --- a/packages/filecoin-api/src/aggregator/buffer-reducing.js +++ b/packages/filecoin-api/src/aggregator/buffer-reducing.js @@ -45,6 +45,7 @@ export async function handleBufferReducingWithAggregate({ pieces: aggregateInfo.addedBufferedPieces, group, } + const piecesBlock = await CBOR.write(aggregateInfo.addedBufferedPieces.map(bf => bf.piece)) const aggregateBlock = await CBOR.write(aggregateReducedBuffer) // Store buffered pieces for aggregate @@ -59,7 +60,8 @@ export async function handleBufferReducingWithAggregate({ // Propagate message for aggregate offer queue const aggregateOfferQueueAdd = await aggregateOfferQueue.add({ aggregate: aggregateInfo.aggregate.link, - pieces: aggregateBlock.cid, + buffer: aggregateBlock.cid, + pieces: piecesBlock.cid, group, }) if (aggregateOfferQueueAdd.error) { diff --git a/packages/filecoin-api/src/aggregator/events.js b/packages/filecoin-api/src/aggregator/events.js index ad76fe69d..d8b27c96d 100644 --- a/packages/filecoin-api/src/aggregator/events.js +++ b/packages/filecoin-api/src/aggregator/events.js @@ -168,12 +168,13 @@ export const handleBufferQueueMessage = async (context, records) => { * @param {import('./api.js').AggregateOfferMessage} message */ export const handleAggregateOfferMessage = async (context, message) => { - const { pieces, aggregate, group } = message + const { pieces, aggregate, buffer, group } = message // Store aggregate information into the store. Store events MAY be used to propagate aggregate over const putRes = await context.aggregateStore.put({ pieces, aggregate, + buffer, group, insertedAt: new Date().toISOString(), }) @@ -196,7 +197,7 @@ export const handleAggregateInsertToPieceAcceptQueue = async ( context, record ) => { - const bufferStoreRes = await context.bufferStore.get(record.pieces) + const bufferStoreRes = await context.bufferStore.get(record.buffer) if (bufferStoreRes.error) { return bufferStoreRes } @@ -331,7 +332,7 @@ export const handleAggregateInsertToAggregateOffer = async ( context, record ) => { - const bufferStoreRes = await context.bufferStore.get(record.pieces) + const bufferStoreRes = await context.bufferStore.get(record.buffer) if (bufferStoreRes.error) { return { error: bufferStoreRes.error, diff --git a/packages/filecoin-api/src/aggregator/service.js b/packages/filecoin-api/src/aggregator/service.js index b1e4d08d0..78a4e3112 100644 --- a/packages/filecoin-api/src/aggregator/service.js +++ b/packages/filecoin-api/src/aggregator/service.js @@ -77,6 +77,7 @@ export const pieceAccept = async ({ capability }, context) => { } } + // Get buffered pieces const [{ aggregate, inclusion }] = getInclusionRes.ok const getAggregateRes = await context.aggregateStore.get({ aggregate }) if (getAggregateRes.error) { diff --git a/packages/filecoin-api/test/events/aggregator.js b/packages/filecoin-api/test/events/aggregator.js index 2742357f7..bba0a694d 100644 --- a/packages/filecoin-api/test/events/aggregator.js +++ b/packages/filecoin-api/test/events/aggregator.js @@ -353,9 +353,9 @@ export const test = { // @ts-expect-error cannot infer buffer message const message = context.queuedMessages.get('aggregateOfferQueue')?.[0] - const bufferGet = await context.bufferStore.get(message.pieces) + const bufferGet = await context.bufferStore.get(message.buffer) assert.ok(bufferGet.ok) - assert.ok(bufferGet.ok?.block.equals(message.pieces)) + assert.ok(bufferGet.ok?.block.equals(message.buffer)) assert.equal(bufferGet.ok?.buffer.group, group) assert.ok(message.aggregate.equals(bufferGet.ok?.buffer.aggregate)) assert.equal(bufferGet.ok?.buffer.pieces.length, totalPieces) @@ -414,7 +414,7 @@ export const test = { const bufferMessage = context.queuedMessages.get('bufferQueue')?.[0] const aggregateBufferGet = await context.bufferStore.get( - aggregateOfferMessage.pieces + aggregateOfferMessage.buffer ) assert.ok(aggregateBufferGet.ok) const remainingBufferGet = await context.bufferStore.get( @@ -554,11 +554,13 @@ export const test = { group, } const block = await CBOR.write(buffer) + const piecesBlock = await CBOR.write(pieces.map((p) => p.link)) /** @type {AggregateOfferMessage} */ const message = { aggregate: aggregate.link, - pieces: block.cid, + pieces: piecesBlock.cid, + buffer: block.cid, group, } @@ -573,7 +575,8 @@ export const test = { }) assert.ok(hasStoredAggregate.ok) assert.ok(hasStoredAggregate.ok?.aggregate.equals(aggregate.link)) - assert.ok(hasStoredAggregate.ok?.pieces.equals(block.cid)) + assert.ok(hasStoredAggregate.ok?.buffer.equals(block.cid)) + assert.ok(hasStoredAggregate.ok?.pieces.equals(piecesBlock.cid)) assert.equal(hasStoredAggregate.ok?.group, group) assert.ok(hasStoredAggregate.ok?.insertedAt) }, @@ -593,11 +596,13 @@ export const test = { group, } const block = await CBOR.write(buffer) + const piecesBlock = await CBOR.write(pieces.map((p) => p.link)) /** @type {AggregateOfferMessage} */ const message = { aggregate: aggregate.link, - pieces: block.cid, + buffer: block.cid, + pieces: piecesBlock.cid, group, } @@ -631,6 +636,7 @@ export const test = { group, } const block = await CBOR.write(buffer) + const piecesBlock = await CBOR.write(pieces.map((p) => p.link)) // Put buffer record const putBufferRes = await context.bufferStore.put({ @@ -641,7 +647,8 @@ export const test = { // Put aggregate record const aggregateRecord = { - pieces: block.cid, + buffer: block.cid, + pieces: piecesBlock.cid, aggregate: aggregate.link, group, insertedAt: new Date().toISOString(), @@ -697,11 +704,13 @@ export const test = { group, } const block = await CBOR.write(buffer) + const piecesBlock = await CBOR.write(pieces.map((p) => p.link)) // Put aggregate record const aggregateRecord = { - pieces: block.cid, + buffer: block.cid, aggregate: aggregate.link, + pieces: piecesBlock.cid, group, insertedAt: new Date().toISOString(), } @@ -745,6 +754,8 @@ export const test = { group, } const block = await CBOR.write(buffer) + const piecesBlock = await CBOR.write(pieces.map((p) => p.link)) + // Put buffer record const putBufferRes = await context.bufferStore.put({ buffer, @@ -754,8 +765,9 @@ export const test = { // Put aggregate record const aggregateRecord = { - pieces: block.cid, + buffer: block.cid, aggregate: aggregate.link, + pieces: piecesBlock.cid, group, insertedAt: new Date().toISOString(), } @@ -1114,8 +1126,9 @@ export const test = { // Put aggregate record const aggregateRecord = { - pieces: blockBuffer.cid, + buffer: blockBuffer.cid, aggregate: aggregate.link, + pieces: blockPieces.cid, group, insertedAt: new Date().toISOString(), } @@ -1163,11 +1176,13 @@ export const test = { group, } const blockBuffer = await CBOR.write(buffer) + const piecesBlock = await CBOR.write(pieces.map((p) => p.link)) // Put aggregate record const aggregateRecord = { - pieces: blockBuffer.cid, + buffer: blockBuffer.cid, aggregate: aggregate.link, + pieces: piecesBlock.cid, group, insertedAt: new Date().toISOString(), } diff --git a/packages/filecoin-api/test/services/aggregator.js b/packages/filecoin-api/test/services/aggregator.js index f8695f794..b15e4cf6a 100644 --- a/packages/filecoin-api/test/services/aggregator.js +++ b/packages/filecoin-api/test/services/aggregator.js @@ -20,6 +20,7 @@ import { /** * @typedef {import('@web3-storage/data-segment').PieceLink} PieceLink * @typedef {import('@ucanto/interface').Link} Link + * @typedef {import('../../src/aggregator/api.js').Buffer} Buffer * @typedef {import('../../src/aggregator/api.js').PieceRecord} PieceRecord * @typedef {import('../../src/aggregator/api.js').PieceRecordKey} PieceRecordKey * @typedef {import('../../src/aggregator/api.js').BufferRecord} BufferRecord @@ -234,6 +235,16 @@ export const test = { const group = storefront.did() const { pieces, aggregate } = await randomAggregate(100, 128) const piece = pieces[0].link + /** @type {Buffer} */ + const buffer = { + pieces: pieces.map((p) => ({ + piece: p.link, + insertedAt: new Date().toISOString(), + policy: 0, + })), + group, + } + const block = await CBOR.write(buffer) // Store aggregate record into store const offer = pieces.map((p) => p.link) @@ -241,6 +252,7 @@ export const test = { const aggregatePutRes = await context.aggregateStore.put({ aggregate: aggregate.link, pieces: piecesBlock.cid, + buffer: block.cid, group, insertedAt: new Date().toISOString(), })