Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: aggregate offer invocation cid wrong #1063

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions packages/filecoin-api/src/aggregator/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ export interface AggregateRecord {
* `bagy...aggregate` Piece CID of an aggregate
*/
aggregate: PieceLink
/**
* `bafy...cbor` as CID of dag-cbor block with buffer of pieces in an aggregate.
*/
buffer: Link
/**
* `bafy...cbor` as CID of dag-cbor block with list of pieces in an aggregate.
*/
Expand Down Expand Up @@ -298,6 +302,10 @@ export interface AggregateOfferMessage {
* List of pieces in an aggregate.
*/
pieces: Link
/**
* `bafy...cbor` as CID of dag-cbor block with buffer of pieces in an aggregate.
*/
buffer: Link
/**
* Grouping information for submitted piece.
*/
Expand Down
4 changes: 3 additions & 1 deletion packages/filecoin-api/src/aggregator/buffer-reducing.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export async function handleBufferReducingWithAggregate({
pieces: aggregateInfo.addedBufferedPieces,
group,
}
const piecesBlock = await CBOR.write(aggregateInfo.addedBufferedPieces.map(bf => bf.piece))
const aggregateBlock = await CBOR.write(aggregateReducedBuffer)

// Store buffered pieces for aggregate
Expand All @@ -59,7 +60,8 @@ export async function handleBufferReducingWithAggregate({
// Propagate message for aggregate offer queue
const aggregateOfferQueueAdd = await aggregateOfferQueue.add({
aggregate: aggregateInfo.aggregate.link,
pieces: aggregateBlock.cid,
buffer: aggregateBlock.cid,
pieces: piecesBlock.cid,
group,
})
if (aggregateOfferQueueAdd.error) {
Expand Down
7 changes: 4 additions & 3 deletions packages/filecoin-api/src/aggregator/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,13 @@ export const handleBufferQueueMessage = async (context, records) => {
* @param {import('./api.js').AggregateOfferMessage} message
*/
export const handleAggregateOfferMessage = async (context, message) => {
const { pieces, aggregate, group } = message
const { pieces, aggregate, buffer, group } = message

// Store aggregate information into the store. Store events MAY be used to propagate aggregate over
const putRes = await context.aggregateStore.put({
pieces,
aggregate,
buffer,
group,
insertedAt: new Date().toISOString(),
})
Expand All @@ -196,7 +197,7 @@ export const handleAggregateInsertToPieceAcceptQueue = async (
context,
record
) => {
const bufferStoreRes = await context.bufferStore.get(record.pieces)
const bufferStoreRes = await context.bufferStore.get(record.buffer)
if (bufferStoreRes.error) {
return bufferStoreRes
}
Expand Down Expand Up @@ -331,7 +332,7 @@ export const handleAggregateInsertToAggregateOffer = async (
context,
record
) => {
const bufferStoreRes = await context.bufferStore.get(record.pieces)
const bufferStoreRes = await context.bufferStore.get(record.buffer)
if (bufferStoreRes.error) {
return {
error: bufferStoreRes.error,
Expand Down
1 change: 1 addition & 0 deletions packages/filecoin-api/src/aggregator/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export const pieceAccept = async ({ capability }, context) => {
}
}

// Get buffered pieces
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line 88 .pieces now has pieces CID instead of buffer :)

const [{ aggregate, inclusion }] = getInclusionRes.ok
const getAggregateRes = await context.aggregateStore.get({ aggregate })
if (getAggregateRes.error) {
Expand Down
37 changes: 26 additions & 11 deletions packages/filecoin-api/test/events/aggregator.js
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,9 @@ export const test = {
// @ts-expect-error cannot infer buffer message
const message = context.queuedMessages.get('aggregateOfferQueue')?.[0]

const bufferGet = await context.bufferStore.get(message.pieces)
const bufferGet = await context.bufferStore.get(message.buffer)
assert.ok(bufferGet.ok)
assert.ok(bufferGet.ok?.block.equals(message.pieces))
assert.ok(bufferGet.ok?.block.equals(message.buffer))
assert.equal(bufferGet.ok?.buffer.group, group)
assert.ok(message.aggregate.equals(bufferGet.ok?.buffer.aggregate))
assert.equal(bufferGet.ok?.buffer.pieces.length, totalPieces)
Expand Down Expand Up @@ -414,7 +414,7 @@ export const test = {
const bufferMessage = context.queuedMessages.get('bufferQueue')?.[0]

const aggregateBufferGet = await context.bufferStore.get(
aggregateOfferMessage.pieces
aggregateOfferMessage.buffer
)
assert.ok(aggregateBufferGet.ok)
const remainingBufferGet = await context.bufferStore.get(
Expand Down Expand Up @@ -554,11 +554,13 @@ export const test = {
group,
}
const block = await CBOR.write(buffer)
const piecesBlock = await CBOR.write(pieces.map((p) => p.link))

/** @type {AggregateOfferMessage} */
const message = {
aggregate: aggregate.link,
pieces: block.cid,
pieces: piecesBlock.cid,
buffer: block.cid,
group,
}

Expand All @@ -573,7 +575,8 @@ export const test = {
})
assert.ok(hasStoredAggregate.ok)
assert.ok(hasStoredAggregate.ok?.aggregate.equals(aggregate.link))
assert.ok(hasStoredAggregate.ok?.pieces.equals(block.cid))
assert.ok(hasStoredAggregate.ok?.buffer.equals(block.cid))
assert.ok(hasStoredAggregate.ok?.pieces.equals(piecesBlock.cid))
assert.equal(hasStoredAggregate.ok?.group, group)
assert.ok(hasStoredAggregate.ok?.insertedAt)
},
Expand All @@ -593,11 +596,13 @@ export const test = {
group,
}
const block = await CBOR.write(buffer)
const piecesBlock = await CBOR.write(pieces.map((p) => p.link))

/** @type {AggregateOfferMessage} */
const message = {
aggregate: aggregate.link,
pieces: block.cid,
buffer: block.cid,
pieces: piecesBlock.cid,
group,
}

Expand Down Expand Up @@ -631,6 +636,7 @@ export const test = {
group,
}
const block = await CBOR.write(buffer)
const piecesBlock = await CBOR.write(pieces.map((p) => p.link))

// Put buffer record
const putBufferRes = await context.bufferStore.put({
Expand All @@ -641,7 +647,8 @@ export const test = {

// Put aggregate record
const aggregateRecord = {
pieces: block.cid,
buffer: block.cid,
pieces: piecesBlock.cid,
aggregate: aggregate.link,
group,
insertedAt: new Date().toISOString(),
Expand Down Expand Up @@ -697,11 +704,13 @@ export const test = {
group,
}
const block = await CBOR.write(buffer)
const piecesBlock = await CBOR.write(pieces.map((p) => p.link))

// Put aggregate record
const aggregateRecord = {
pieces: block.cid,
buffer: block.cid,
aggregate: aggregate.link,
pieces: piecesBlock.cid,
group,
insertedAt: new Date().toISOString(),
}
Expand Down Expand Up @@ -745,6 +754,8 @@ export const test = {
group,
}
const block = await CBOR.write(buffer)
const piecesBlock = await CBOR.write(pieces.map((p) => p.link))

// Put buffer record
const putBufferRes = await context.bufferStore.put({
buffer,
Expand All @@ -754,8 +765,9 @@ export const test = {

// Put aggregate record
const aggregateRecord = {
pieces: block.cid,
buffer: block.cid,
aggregate: aggregate.link,
pieces: piecesBlock.cid,
group,
insertedAt: new Date().toISOString(),
}
Expand Down Expand Up @@ -1114,8 +1126,9 @@ export const test = {

// Put aggregate record
const aggregateRecord = {
pieces: blockBuffer.cid,
buffer: blockBuffer.cid,
aggregate: aggregate.link,
pieces: blockPieces.cid,
group,
insertedAt: new Date().toISOString(),
}
Expand Down Expand Up @@ -1163,11 +1176,13 @@ export const test = {
group,
}
const blockBuffer = await CBOR.write(buffer)
const piecesBlock = await CBOR.write(pieces.map((p) => p.link))

// Put aggregate record
const aggregateRecord = {
pieces: blockBuffer.cid,
buffer: blockBuffer.cid,
aggregate: aggregate.link,
pieces: piecesBlock.cid,
group,
insertedAt: new Date().toISOString(),
}
Expand Down
12 changes: 12 additions & 0 deletions packages/filecoin-api/test/services/aggregator.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
/**
* @typedef {import('@web3-storage/data-segment').PieceLink} PieceLink
* @typedef {import('@ucanto/interface').Link} Link
* @typedef {import('../../src/aggregator/api.js').Buffer} Buffer
* @typedef {import('../../src/aggregator/api.js').PieceRecord} PieceRecord
* @typedef {import('../../src/aggregator/api.js').PieceRecordKey} PieceRecordKey
* @typedef {import('../../src/aggregator/api.js').BufferRecord} BufferRecord
Expand Down Expand Up @@ -234,13 +235,24 @@ export const test = {
const group = storefront.did()
const { pieces, aggregate } = await randomAggregate(100, 128)
const piece = pieces[0].link
/** @type {Buffer} */
const buffer = {
pieces: pieces.map((p) => ({
piece: p.link,
insertedAt: new Date().toISOString(),
policy: 0,
})),
group,
}
const block = await CBOR.write(buffer)

// Store aggregate record into store
const offer = pieces.map((p) => p.link)
const piecesBlock = await CBOR.write(offer)
const aggregatePutRes = await context.aggregateStore.put({
aggregate: aggregate.link,
pieces: piecesBlock.cid,
buffer: block.cid,
group,
insertedAt: new Date().toISOString(),
})
Expand Down
Loading