diff --git a/packages/filecoin-api/package.json b/packages/filecoin-api/package.json index b4ae38c40..253376bab 100644 --- a/packages/filecoin-api/package.json +++ b/packages/filecoin-api/package.json @@ -189,6 +189,7 @@ "project": "./tsconfig.json" }, "env": { + "browser": true, "mocha": true }, "ignorePatterns": [ diff --git a/packages/filecoin-api/src/aggregator/api.ts b/packages/filecoin-api/src/aggregator/api.ts index a12fce06e..9b3b639a0 100644 --- a/packages/filecoin-api/src/aggregator/api.ts +++ b/packages/filecoin-api/src/aggregator/api.ts @@ -328,6 +328,13 @@ export interface AggregateConfig { minUtilizationFactor: number prependBufferedPieces?: BufferedPiece[] hasher?: DataSegmentAPI.SyncMultihashHasher + /** + * The maximum number of pieces per aggregate. If set, it takes precedence + * over `minAggregateSize` because it is typically a hard limit related to + * the number of hashes that can be performed within the maximum lambda + * execution time limit. + */ + maxAggregatePieces?: number } // Enums diff --git a/packages/filecoin-api/src/aggregator/buffer-reducing.js b/packages/filecoin-api/src/aggregator/buffer-reducing.js index 5e06b54f1..ce695ebcd 100644 --- a/packages/filecoin-api/src/aggregator/buffer-reducing.js +++ b/packages/filecoin-api/src/aggregator/buffer-reducing.js @@ -1,6 +1,5 @@ import { Aggregate, Piece, NODE_SIZE, Index } from '@web3-storage/data-segment' import { CBOR } from '@ucanto/core' - import { UnexpectedState } from '../errors.js' /** @@ -192,12 +191,16 @@ export function aggregatePieces(bufferedPieces, config) { addedBufferedPieces.push(bufferedPiece) } - for (const bufferedPiece of bufferedPieces) { + for (const [i, bufferedPiece] of bufferedPieces.entries()) { const p = Piece.fromLink(bufferedPiece.piece) if (builder.estimate(p).error) { remainingBufferedPieces.push(bufferedPiece) continue } + if (addedBufferedPieces.length === config.maxAggregatePieces) { + remainingBufferedPieces.push(...bufferedPieces.slice(i)) + break + } builder.write(p) addedBufferedPieces.push(bufferedPiece) } @@ -205,9 +208,16 @@ export function aggregatePieces(bufferedPieces, config) { builder.offset * BigInt(NODE_SIZE) + BigInt(builder.limit) * BigInt(Index.EntrySize) + console.log(`Used ${totalUsedSpace} bytes in ${addedBufferedPieces.length} pieces (min ${config.minAggregateSize} bytes)`) + // If not enough space return undefined if (totalUsedSpace < BigInt(config.minAggregateSize)) { - return + // ...but only if not exceeded max aggregate pieces + if (addedBufferedPieces.length === config.maxAggregatePieces) { + console.warn(`Building aggregate: max allowed pieces reached (${config.maxAggregatePieces})`) + } else { + return console.log('Not enough data for aggregate.') + } } const aggregate = builder.build() diff --git a/packages/filecoin-api/src/aggregator/events.js b/packages/filecoin-api/src/aggregator/events.js index 0726bc291..11e054731 100644 --- a/packages/filecoin-api/src/aggregator/events.js +++ b/packages/filecoin-api/src/aggregator/events.js @@ -117,6 +117,7 @@ export const handleBufferQueueMessage = async (context, records) => { minUtilizationFactor: context.config.minUtilizationFactor, prependBufferedPieces: context.config.prependBufferedPieces, hasher: context.config.hasher, + maxAggregatePieces: context.config.maxAggregatePieces, }) // Store buffered pieces if not enough to do aggregate and re-queue them diff --git a/packages/filecoin-api/test/events/aggregator.js b/packages/filecoin-api/test/events/aggregator.js index 283558a1f..f7b55f069 100644 --- a/packages/filecoin-api/test/events/aggregator.js +++ b/packages/filecoin-api/test/events/aggregator.js @@ -546,6 +546,83 @@ export const test = { totalPieces ) }, + 'handles buffer queue messages successfully when max aggregate pieces is exceeded': + async (assert, context) => { + const group = context.id.did() + const totalBuffers = 2 + const piecesPerBuffer = 16 + const pieceSize = 66_666 + const totalPieces = totalBuffers * piecesPerBuffer + + const { buffers, blocks } = await getBuffers(totalBuffers, group, { + length: piecesPerBuffer, + size: pieceSize, + }) + + // Store buffers + for (let i = 0; i < blocks.length; i++) { + const putBufferRes = await context.bufferStore.put({ + buffer: buffers[i], + block: blocks[i].cid, + }) + assert.ok(putBufferRes.ok) + } + + const config = { + minAggregateSize: 2 ** 21, // 2,097,152 + maxAggregateSize: 2 ** 22, // 4,194,304 + // 15 (padded) pieces of 66,666 bytes results in an aggregate of size + // 1,968,128 which is below the `minAggregateSize` of 2,097,152 bytes. + // i.e. it wouldn't normally result in an aggregate. + maxAggregatePieces: 15, + minUtilizationFactor: 10, + } + + const handledMessageRes = await AggregatorEvents.handleBufferQueueMessage( + { ...context, config }, + blocks.map((b) => ({ + pieces: b.cid, + group, + })) + ) + assert.ok(handledMessageRes.ok) + assert.equal(handledMessageRes.ok?.aggregatedPieces, config.maxAggregatePieces) + + await pWaitFor( + () => + // there should still be an item of remaining pieces in the queue + context.queuedMessages.get('bufferQueue')?.length === 1 && + // there should be 1 new aggregate in the queue + context.queuedMessages.get('aggregateOfferQueue')?.length === 1 + ) + /** @type {AggregateOfferMessage} */ + // @ts-expect-error cannot infer buffer message + const aggregateOfferMessage = context.queuedMessages.get( + 'aggregateOfferQueue' + )?.[0] + /** @type {BufferMessage} */ + // @ts-expect-error cannot infer buffer message + const bufferMessage = context.queuedMessages.get('bufferQueue')?.[0] + + const aggregateBufferGet = await context.bufferStore.get( + aggregateOfferMessage.buffer + ) + assert.ok(aggregateBufferGet.ok) + assert.equal( + aggregateBufferGet.ok?.buffer.pieces.length, + handledMessageRes.ok?.aggregatedPieces + ) + + const remainingBufferGet = await context.bufferStore.get( + bufferMessage.pieces + ) + assert.ok(remainingBufferGet.ok) + assert.equal( + (aggregateBufferGet.ok?.buffer.pieces.length || 0) + + (remainingBufferGet.ok?.buffer.pieces.length || 0), + totalPieces + ) + }, 'handles buffer queue message errors when fails to access buffer store': wichMockableContext( async (assert, context) => {