diff --git a/billing/functions/ucan-stream.js b/billing/functions/ucan-stream.js index 5704de09..2c3eb0b9 100644 --- a/billing/functions/ucan-stream.js +++ b/billing/functions/ucan-stream.js @@ -5,7 +5,7 @@ import { LRUCache } from 'lru-cache' import { createSpaceDiffStore } from '../tables/space-diff.js' import { createConsumerStore } from '../tables/consumer.js' import { expect } from './lib.js' -import { findSpaceUsageDeltas, storeSpaceUsageDelta } from '../lib/ucan-stream.js' +import { findSpaceUsageDeltas, storeSpaceUsageDeltas } from '../lib/ucan-stream.js' import { mustGetEnv } from '../../lib/env.js' Sentry.AWSLambda.init({ @@ -35,24 +35,17 @@ export const handler = Sentry.AWSLambda.wrapHandler( const region = customContext?.region ?? mustGetEnv('AWS_REGION') const messages = parseUcanStreamEvent(event) - if (!messages || messages.length > 1) { - throw new Error(`invalid batch size, expected: 1, actual: ${messages.length}`) - } - const deltas = findSpaceUsageDeltas(messages) if (!deltas.length) { console.log("No messages found that contain space usage deltas", "capabilities", messages[0].value.att.map((att) => att.can), "resources", messages[0].value.att.map((att) => att.with) ) return } - console.log("Storing space usage delta", deltas[0]) - + console.log(`Storing ${deltas.length} space usage deltas`) + const consumerStore = createConsumerStore({ region }, { tableName: consumerTable }) const spaceDiffStore = createSpaceDiffStore({ region }, { tableName: spaceDiffTable }) const ctx = { spaceDiffStore, consumerStore: withConsumerListCache(consumerStore) } - expect( - await storeSpaceUsageDelta(deltas[0], ctx), - `storing space usage delta for: ${deltas[0].resource}, cause: ${deltas[0].cause}` - ) + expect(await storeSpaceUsageDeltas(deltas, ctx), 'storing space usage deltas') } ) diff --git a/billing/lib/api.ts b/billing/lib/api.ts index f15e0332..22df39de 100644 --- a/billing/lib/api.ts +++ b/billing/lib/api.ts @@ -43,7 +43,7 @@ export interface SpaceDiffListKey { } export type SpaceDiffStore = - & StorePutter + & StoreBatchPutter & StoreLister /** Captures size of a space at a given point in time. */ @@ -294,12 +294,31 @@ export interface RecordNotFound extends Failure { key: K } +/** Not enough records were provided for the operation. */ +export interface InsufficientRecords extends Failure { + name: 'InsufficientRecords' +} + /** StorePutter allows a single item to be put in the store by it's key. */ export interface StorePutter { /** Puts a single item into the store by it's key */ put: (rec: T) => Promise> } +/** + * StoreBatchPutter allows multiple items to be put in the store. Note: this is + * not transactional. A failure may mean 1 or more records succeeded to + * be written. + */ +export interface StoreBatchPutter { + /** + * Puts multiple items into the store by their key. Note: this is + * not transactional. A failure may mean 1 or more records succeeded to + * be written. + */ + batchPut: (rec: Iterable) => Promise> +} + /** StoreGetter allows a single item to be retrieved by it's key. */ export interface StoreGetter { /** Gets a single item by it's key. */ diff --git a/billing/lib/ucan-stream.js b/billing/lib/ucan-stream.js index a2e1643c..da1d552e 100644 --- a/billing/lib/ucan-stream.js +++ b/billing/lib/ucan-stream.js @@ -62,33 +62,41 @@ export const findSpaceUsageDeltas = messages => { * multiple calls to this function with the same data must not add _another_ * record to the store. * - * @param {import('./api.js').UsageDelta} delta + * @param {import('./api.js').UsageDelta[]} deltas * @param {{ * spaceDiffStore: import('./api').SpaceDiffStore * consumerStore: import('./api').ConsumerStore * }} ctx - * @returns {Promise>} */ -export const storeSpaceUsageDelta = async (delta, ctx) => { - const consumerList = await ctx.consumerStore.list({ consumer: delta.resource }) - if (consumerList.error) return consumerList +export const storeSpaceUsageDeltas = async (deltas, ctx) => { + const spaceDiffResults = await Promise.all(deltas.map(async delta => { + const consumerList = await ctx.consumerStore.list({ consumer: delta.resource }) + if (consumerList.error) return consumerList - // There should only be one subscription per provider, but in theory you - // could have multiple providers for the same consumer (space). - for (const consumer of consumerList.ok.results) { - const spaceDiffPut = await ctx.spaceDiffStore.put({ - provider: consumer.provider, - subscription: consumer.subscription, - space: delta.resource, - cause: delta.cause, - delta: delta.delta, - receiptAt: delta.receiptAt, - insertedAt: new Date() - }) - if (spaceDiffPut.error) return spaceDiffPut + const diffs = [] + // There should only be one subscription per provider, but in theory you + // could have multiple providers for the same consumer (space). + for (const consumer of consumerList.ok.results) { + diffs.push({ + provider: consumer.provider, + subscription: consumer.subscription, + space: delta.resource, + cause: delta.cause, + delta: delta.delta, + receiptAt: delta.receiptAt, + insertedAt: new Date() + }) + } + return { ok: diffs, error: undefined } + })) + + const spaceDiffs = [] + for (const res of spaceDiffResults) { + if (res.error) return res + spaceDiffs.push(...res.ok) } - return { ok: {} } + return ctx.spaceDiffStore.batchPut(spaceDiffs) } /** diff --git a/billing/tables/client.js b/billing/tables/client.js index 79b9131f..15706639 100644 --- a/billing/tables/client.js +++ b/billing/tables/client.js @@ -1,7 +1,7 @@ -import { DynamoDBClient, GetItemCommand, PutItemCommand, QueryCommand, ScanCommand } from '@aws-sdk/client-dynamodb' +import { BatchWriteItemCommand, DynamoDBClient, GetItemCommand, PutItemCommand, QueryCommand, ScanCommand } from '@aws-sdk/client-dynamodb' import { marshall, unmarshall, convertToAttr } from '@aws-sdk/util-dynamodb' import retry from 'p-retry' -import { RecordNotFound, StoreOperationFailure } from './lib.js' +import { InsufficientRecords, RecordNotFound, StoreOperationFailure } from './lib.js' import { getDynamoClient } from '../../lib/aws/dynamo.js' /** @param {{ region: string } | DynamoDBClient} target */ @@ -54,6 +54,53 @@ export const createStorePutterClient = (conf, context) => { } } +/** + * @template T + * @param {{ region: string } | import('@aws-sdk/client-dynamodb').DynamoDBClient} conf + * @param {object} context + * @param {string} context.tableName + * @param {import('../lib/api').Validator} context.validate + * @param {import('../lib/api').Encoder} context.encode + * @returns {import('../lib/api').StoreBatchPutter} + */ +export const createStoreBatchPutterClient = (conf, context) => { + const client = connectTable(conf) + return { + batchPut: async (records) => { + /** @type {import('@aws-sdk/client-dynamodb').WriteRequest[]} */ + const writeRequests = [] + for (const record of records) { + const validation = context.validate(record) + if (validation.error) return validation + + const encoding = context.encode(record) + if (encoding.error) return encoding + writeRequests.push(({ PutRequest: { Item: marshall(encoding.ok, { removeUndefinedValues: true }) } })) + } + + if (!writeRequests.length) { + return { error: new InsufficientRecords('records must have length greater than or equal to 1') } + } + + try { + let requestItems = { [context.tableName]: writeRequests } + await retry(async () => { + const cmd = new BatchWriteItemCommand({ RequestItems: requestItems }) + const res = await client.send(cmd) + if (res.UnprocessedItems && Object.keys(res.UnprocessedItems).length) { + requestItems = res.UnprocessedItems + throw new Error('unprocessed items') + } + }, { onFailedAttempt: console.warn }) + return { ok: {} } + } catch (/** @type {any} */ err) { + console.error(err) + return { error: new StoreOperationFailure(err.message, { cause: err }) } + } + } + } +} + /** * @template {object} K * @template V diff --git a/billing/tables/lib.js b/billing/tables/lib.js index 6bbe9d36..6cb0f772 100644 --- a/billing/tables/lib.js +++ b/billing/tables/lib.js @@ -35,3 +35,19 @@ export class RecordNotFound extends Failure { return { ...super.toJSON(), key: this.key } } } + +export class InsufficientRecords extends Failure { + /** + * @param {string} [message] Context for the message. + * @param {ErrorOptions} [options] + */ + constructor (message, options) { + super(undefined, options) + this.name = /** @type {const} */ ('InsufficientRecords') + this.detail = message + } + + describe () { + return this.detail ?? 'insufficient records were provided for the operation' + } +} diff --git a/billing/tables/space-diff.js b/billing/tables/space-diff.js index 644daf59..247b8090 100644 --- a/billing/tables/space-diff.js +++ b/billing/tables/space-diff.js @@ -1,4 +1,4 @@ -import { createStoreListerClient, createStorePutterClient } from './client.js' +import { createStoreBatchPutterClient, createStoreListerClient } from './client.js' import { validate, encode, lister, decode } from '../data/space-diff.js' /** @@ -36,6 +36,6 @@ export const spaceDiffTableProps = { * @returns {import('../lib/api').SpaceDiffStore} */ export const createSpaceDiffStore = (conf, { tableName }) => ({ - ...createStorePutterClient(conf, { tableName, validate, encode }), + ...createStoreBatchPutterClient(conf, { tableName, validate, encode }), ...createStoreListerClient(conf, { tableName, encodeKey: lister.encodeKey, decode }) }) diff --git a/billing/test/lib/space-billing-queue.js b/billing/test/lib/space-billing-queue.js index 72cd6831..c336c415 100644 --- a/billing/test/lib/space-billing-queue.js +++ b/billing/test/lib/space-billing-queue.js @@ -14,7 +14,7 @@ export const test = { const to = startOfMonth(now) const delta = 1024 * 1024 * 1024 // 1GiB - await ctx.spaceDiffStore.put({ + await ctx.spaceDiffStore.batchPut([{ provider: consumer.provider, space: consumer.consumer, subscription: consumer.subscription, @@ -22,7 +22,7 @@ export const test = { delta, receiptAt: from, insertedAt: new Date() - }) + }]) /** @type {import('../../lib/api.js').SpaceBillingInstruction} */ const instruction = { @@ -75,28 +75,29 @@ export const test = { insertedAt: new Date() }) - // add 1GiB - await ctx.spaceDiffStore.put({ - provider: consumer.provider, - space: consumer.consumer, - subscription: consumer.subscription, - cause: randomLink(), - delta, - receiptAt: from, - insertedAt: new Date() - }) - - // remove 1GiB - await ctx.spaceDiffStore.put({ - provider: consumer.provider, - space: consumer.consumer, - subscription: consumer.subscription, - cause: randomLink(), - delta: -delta, - // removed exactly half way through the month - receiptAt: new Date(from.getTime() + ((to.getTime() - from.getTime()) / 2)), - insertedAt: new Date() - }) + await ctx.spaceDiffStore.batchPut([ + // add 1GiB + { + provider: consumer.provider, + space: consumer.consumer, + subscription: consumer.subscription, + cause: randomLink(), + delta, + receiptAt: from, + insertedAt: new Date() + }, + // remove 1GiB + { + provider: consumer.provider, + space: consumer.consumer, + subscription: consumer.subscription, + cause: randomLink(), + delta: -delta, + // removed exactly half way through the month + receiptAt: new Date(from.getTime() + ((to.getTime() - from.getTime()) / 2)), + insertedAt: new Date() + } + ]) /** @type {import('../../lib/api.js').SpaceBillingInstruction} */ const instruction = { @@ -157,7 +158,7 @@ export const test = { return yest } - await ctx.spaceDiffStore.put({ + await ctx.spaceDiffStore.batchPut([{ provider: consumer.provider, space: consumer.consumer, subscription: consumer.subscription, @@ -166,7 +167,7 @@ export const test = { // store/add 24h prior to end of billing receiptAt: yesterday(to), insertedAt: new Date() - }) + }]) /** @type {import('../../lib/api.js').SpaceBillingInstruction} */ const instruction = { diff --git a/billing/test/lib/ucan-stream.js b/billing/test/lib/ucan-stream.js index 90d1ff14..483427b7 100644 --- a/billing/test/lib/ucan-stream.js +++ b/billing/test/lib/ucan-stream.js @@ -2,7 +2,7 @@ import { Schema } from '@ucanto/core' import * as ServiceBlobCaps from '@web3-storage/capabilities/web3.storage/blob' import * as BlobCaps from '@web3-storage/capabilities/blob' import * as StoreCaps from '@web3-storage/capabilities/store' -import { findSpaceUsageDeltas, storeSpaceUsageDelta } from '../../lib/ucan-stream.js' +import { findSpaceUsageDeltas, storeSpaceUsageDeltas } from '../../lib/ucan-stream.js' import { randomConsumer } from '../helpers/consumer.js' import { randomLink } from '../helpers/dag.js' import { randomDID, randomDIDKey } from '../helpers/did.js' @@ -174,11 +174,8 @@ export const test = { }] const deltas = findSpaceUsageDeltas(receipts) - - for (const d of deltas) { - const res = await storeSpaceUsageDelta(d, ctx) - assert.ok(res.ok) - } + const storeDeltasRes = await storeSpaceUsageDeltas(deltas, ctx) + assert.ok(storeDeltasRes.ok) const res = await ctx.spaceDiffStore.list({ provider: consumer.provider, @@ -230,11 +227,8 @@ export const test = { }] const deltas = findSpaceUsageDeltas(receipts) - - for (const d of deltas) { - const res = await storeSpaceUsageDelta(d, ctx) - assert.ok(res.ok) - } + const storeDeltasRes = await storeSpaceUsageDeltas(deltas, ctx) + assert.equal(storeDeltasRes.error?.name, 'InsufficientRecords') const res = await ctx.spaceDiffStore.list({ provider: consumer.provider, diff --git a/stacks/billing-stack.js b/stacks/billing-stack.js index 96c2fd7f..6cdbdcbd 100644 --- a/stacks/billing-stack.js +++ b/stacks/billing-stack.js @@ -107,7 +107,8 @@ export function BillingStack ({ stack, app }) { function: ucanStreamHandler, cdk: { eventSource: { - batchSize: 1, + batchSize: 25, // max dynamo BatchWriteItems size + bisectBatchOnError: true, startingPosition: StartingPosition.LATEST, filters: [ FilterCriteria.filter({