Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use batching for billing UCAN stream handler #410

Merged
merged 5 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions billing/functions/ucan-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { LRUCache } from 'lru-cache'
import { createSpaceDiffStore } from '../tables/space-diff.js'
import { createConsumerStore } from '../tables/consumer.js'
import { expect } from './lib.js'
import { findSpaceUsageDeltas, storeSpaceUsageDelta } from '../lib/ucan-stream.js'
import { findSpaceUsageDeltas, storeSpaceUsageDeltas } from '../lib/ucan-stream.js'
import { mustGetEnv } from '../../lib/env.js'

Sentry.AWSLambda.init({
Expand Down Expand Up @@ -35,24 +35,17 @@ export const handler = Sentry.AWSLambda.wrapHandler(
const region = customContext?.region ?? mustGetEnv('AWS_REGION')

const messages = parseUcanStreamEvent(event)
if (!messages || messages.length > 1) {
throw new Error(`invalid batch size, expected: 1, actual: ${messages.length}`)
}

const deltas = findSpaceUsageDeltas(messages)
if (!deltas.length) {
console.log("No messages found that contain space usage deltas", "capabilities", messages[0].value.att.map((att) => att.can), "resources", messages[0].value.att.map((att) => att.with) )
return
}
console.log("Storing space usage delta", deltas[0])
console.log(`Storing ${deltas.length} space usage deltas`)

const consumerStore = createConsumerStore({ region }, { tableName: consumerTable })
const spaceDiffStore = createSpaceDiffStore({ region }, { tableName: spaceDiffTable })
const ctx = { spaceDiffStore, consumerStore: withConsumerListCache(consumerStore) }
expect(
await storeSpaceUsageDelta(deltas[0], ctx),
`storing space usage delta for: ${deltas[0].resource}, cause: ${deltas[0].cause}`
)
expect(await storeSpaceUsageDeltas(deltas, ctx), 'storing space usage deltas')
}
)

Expand Down
21 changes: 20 additions & 1 deletion billing/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export interface SpaceDiffListKey {
}

export type SpaceDiffStore =
& StorePutter<SpaceDiff>
& StoreBatchPutter<SpaceDiff>
& StoreLister<SpaceDiffListKey, SpaceDiff>

/** Captures size of a space at a given point in time. */
Expand Down Expand Up @@ -294,12 +294,31 @@ export interface RecordNotFound<K> extends Failure {
key: K
}

/** Not enough records were provided for the operation. */
export interface InsufficientRecords extends Failure {
name: 'InsufficientRecords'
}

/** StorePutter allows a single item to be put in the store by it's key. */
export interface StorePutter<T> {
/** Puts a single item into the store by it's key */
put: (rec: T) => Promise<Result<Unit, EncodeFailure|StoreOperationFailure|Failure>>
}

/**
* StoreBatchPutter allows multiple items to be put in the store. Note: this is
* not transactional. A failure may mean 1 or more records succeeded to
* be written.
*/
export interface StoreBatchPutter<T> {
/**
* Puts multiple items into the store by their key. Note: this is
* not transactional. A failure may mean 1 or more records succeeded to
* be written.
*/
batchPut: (rec: Iterable<T>) => Promise<Result<Unit, InsufficientRecords|EncodeFailure|StoreOperationFailure|Failure>>
}

/** StoreGetter allows a single item to be retrieved by it's key. */
export interface StoreGetter<K extends {}, V> {
/** Gets a single item by it's key. */
Expand Down
46 changes: 27 additions & 19 deletions billing/lib/ucan-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,33 +62,41 @@ export const findSpaceUsageDeltas = messages => {
* multiple calls to this function with the same data must not add _another_
* record to the store.
*
* @param {import('./api.js').UsageDelta} delta
* @param {import('./api.js').UsageDelta[]} deltas
* @param {{
* spaceDiffStore: import('./api').SpaceDiffStore
* consumerStore: import('./api').ConsumerStore
* }} ctx
* @returns {Promise<import('@ucanto/interface').Result<import('@ucanto/interface').Unit>>}
*/
export const storeSpaceUsageDelta = async (delta, ctx) => {
const consumerList = await ctx.consumerStore.list({ consumer: delta.resource })
if (consumerList.error) return consumerList
export const storeSpaceUsageDeltas = async (deltas, ctx) => {
const spaceDiffResults = await Promise.all(deltas.map(async delta => {
const consumerList = await ctx.consumerStore.list({ consumer: delta.resource })
if (consumerList.error) return consumerList

// There should only be one subscription per provider, but in theory you
// could have multiple providers for the same consumer (space).
for (const consumer of consumerList.ok.results) {
const spaceDiffPut = await ctx.spaceDiffStore.put({
provider: consumer.provider,
subscription: consumer.subscription,
space: delta.resource,
cause: delta.cause,
delta: delta.delta,
receiptAt: delta.receiptAt,
insertedAt: new Date()
})
if (spaceDiffPut.error) return spaceDiffPut
const diffs = []
// There should only be one subscription per provider, but in theory you
// could have multiple providers for the same consumer (space).
for (const consumer of consumerList.ok.results) {
diffs.push({
provider: consumer.provider,
subscription: consumer.subscription,
space: delta.resource,
cause: delta.cause,
delta: delta.delta,
receiptAt: delta.receiptAt,
insertedAt: new Date()
})
}
return { ok: diffs, error: undefined }
}))

const spaceDiffs = []
for (const res of spaceDiffResults) {
if (res.error) return res
spaceDiffs.push(...res.ok)
}

return { ok: {} }
return ctx.spaceDiffStore.batchPut(spaceDiffs)
}

/**
Expand Down
51 changes: 49 additions & 2 deletions billing/tables/client.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { DynamoDBClient, GetItemCommand, PutItemCommand, QueryCommand, ScanCommand } from '@aws-sdk/client-dynamodb'
import { BatchWriteItemCommand, DynamoDBClient, GetItemCommand, PutItemCommand, QueryCommand, ScanCommand } from '@aws-sdk/client-dynamodb'
import { marshall, unmarshall, convertToAttr } from '@aws-sdk/util-dynamodb'
import retry from 'p-retry'
import { RecordNotFound, StoreOperationFailure } from './lib.js'
import { InsufficientRecords, RecordNotFound, StoreOperationFailure } from './lib.js'
import { getDynamoClient } from '../../lib/aws/dynamo.js'

/** @param {{ region: string } | DynamoDBClient} target */
Expand Down Expand Up @@ -54,6 +54,53 @@ export const createStorePutterClient = (conf, context) => {
}
}

/**
* @template T
* @param {{ region: string } | import('@aws-sdk/client-dynamodb').DynamoDBClient} conf
* @param {object} context
* @param {string} context.tableName
* @param {import('../lib/api').Validator<T>} context.validate
* @param {import('../lib/api').Encoder<T, import('../types').StoreRecord>} context.encode
* @returns {import('../lib/api').StoreBatchPutter<T>}
*/
export const createStoreBatchPutterClient = (conf, context) => {
const client = connectTable(conf)
return {
batchPut: async (records) => {
/** @type {import('@aws-sdk/client-dynamodb').WriteRequest[]} */
const writeRequests = []
for (const record of records) {
const validation = context.validate(record)
if (validation.error) return validation

const encoding = context.encode(record)
if (encoding.error) return encoding
writeRequests.push(({ PutRequest: { Item: marshall(encoding.ok, { removeUndefinedValues: true }) } }))
}

if (!writeRequests.length) {
return { error: new InsufficientRecords('records must have length greater than or equal to 1') }
}

try {
let requestItems = { [context.tableName]: writeRequests }
await retry(async () => {
const cmd = new BatchWriteItemCommand({ RequestItems: requestItems })
const res = await client.send(cmd)
if (res.UnprocessedItems && Object.keys(res.UnprocessedItems).length) {
requestItems = res.UnprocessedItems
throw new Error('unprocessed items')
}
}, { onFailedAttempt: console.warn })
return { ok: {} }
} catch (/** @type {any} */ err) {
console.error(err)
return { error: new StoreOperationFailure(err.message, { cause: err }) }
}
}
}
}

/**
* @template {object} K
* @template V
Expand Down
16 changes: 16 additions & 0 deletions billing/tables/lib.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,19 @@ export class RecordNotFound extends Failure {
return { ...super.toJSON(), key: this.key }
}
}

export class InsufficientRecords extends Failure {
/**
* @param {string} [message] Context for the message.
* @param {ErrorOptions} [options]
*/
constructor (message, options) {
super(undefined, options)
this.name = /** @type {const} */ ('InsufficientRecords')
this.detail = message
}

describe () {
return this.detail ?? 'insufficient records were provided for the operation'
}
}
4 changes: 2 additions & 2 deletions billing/tables/space-diff.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { createStoreListerClient, createStorePutterClient } from './client.js'
import { createStoreBatchPutterClient, createStoreListerClient } from './client.js'
import { validate, encode, lister, decode } from '../data/space-diff.js'

/**
Expand Down Expand Up @@ -36,6 +36,6 @@ export const spaceDiffTableProps = {
* @returns {import('../lib/api').SpaceDiffStore}
*/
export const createSpaceDiffStore = (conf, { tableName }) => ({
...createStorePutterClient(conf, { tableName, validate, encode }),
...createStoreBatchPutterClient(conf, { tableName, validate, encode }),
...createStoreListerClient(conf, { tableName, encodeKey: lister.encodeKey, decode })
})
53 changes: 27 additions & 26 deletions billing/test/lib/space-billing-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ export const test = {
const to = startOfMonth(now)
const delta = 1024 * 1024 * 1024 // 1GiB

await ctx.spaceDiffStore.put({
await ctx.spaceDiffStore.batchPut([{
provider: consumer.provider,
space: consumer.consumer,
subscription: consumer.subscription,
cause: randomLink(),
delta,
receiptAt: from,
insertedAt: new Date()
})
}])

/** @type {import('../../lib/api.js').SpaceBillingInstruction} */
const instruction = {
Expand Down Expand Up @@ -75,28 +75,29 @@ export const test = {
insertedAt: new Date()
})

// add 1GiB
await ctx.spaceDiffStore.put({
provider: consumer.provider,
space: consumer.consumer,
subscription: consumer.subscription,
cause: randomLink(),
delta,
receiptAt: from,
insertedAt: new Date()
})

// remove 1GiB
await ctx.spaceDiffStore.put({
provider: consumer.provider,
space: consumer.consumer,
subscription: consumer.subscription,
cause: randomLink(),
delta: -delta,
// removed exactly half way through the month
receiptAt: new Date(from.getTime() + ((to.getTime() - from.getTime()) / 2)),
insertedAt: new Date()
})
await ctx.spaceDiffStore.batchPut([
// add 1GiB
{
provider: consumer.provider,
space: consumer.consumer,
subscription: consumer.subscription,
cause: randomLink(),
delta,
receiptAt: from,
insertedAt: new Date()
},
// remove 1GiB
{
provider: consumer.provider,
space: consumer.consumer,
subscription: consumer.subscription,
cause: randomLink(),
delta: -delta,
// removed exactly half way through the month
receiptAt: new Date(from.getTime() + ((to.getTime() - from.getTime()) / 2)),
insertedAt: new Date()
}
])

/** @type {import('../../lib/api.js').SpaceBillingInstruction} */
const instruction = {
Expand Down Expand Up @@ -157,7 +158,7 @@ export const test = {
return yest
}

await ctx.spaceDiffStore.put({
await ctx.spaceDiffStore.batchPut([{
provider: consumer.provider,
space: consumer.consumer,
subscription: consumer.subscription,
Expand All @@ -166,7 +167,7 @@ export const test = {
// store/add 24h prior to end of billing
receiptAt: yesterday(to),
insertedAt: new Date()
})
}])

/** @type {import('../../lib/api.js').SpaceBillingInstruction} */
const instruction = {
Expand Down
16 changes: 5 additions & 11 deletions billing/test/lib/ucan-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Schema } from '@ucanto/core'
import * as ServiceBlobCaps from '@web3-storage/capabilities/web3.storage/blob'
import * as BlobCaps from '@web3-storage/capabilities/blob'
import * as StoreCaps from '@web3-storage/capabilities/store'
import { findSpaceUsageDeltas, storeSpaceUsageDelta } from '../../lib/ucan-stream.js'
import { findSpaceUsageDeltas, storeSpaceUsageDeltas } from '../../lib/ucan-stream.js'
import { randomConsumer } from '../helpers/consumer.js'
import { randomLink } from '../helpers/dag.js'
import { randomDID, randomDIDKey } from '../helpers/did.js'
Expand Down Expand Up @@ -174,11 +174,8 @@ export const test = {
}]

const deltas = findSpaceUsageDeltas(receipts)

for (const d of deltas) {
const res = await storeSpaceUsageDelta(d, ctx)
assert.ok(res.ok)
}
const storeDeltasRes = await storeSpaceUsageDeltas(deltas, ctx)
assert.ok(storeDeltasRes.ok)

const res = await ctx.spaceDiffStore.list({
provider: consumer.provider,
Expand Down Expand Up @@ -230,11 +227,8 @@ export const test = {
}]

const deltas = findSpaceUsageDeltas(receipts)

for (const d of deltas) {
const res = await storeSpaceUsageDelta(d, ctx)
assert.ok(res.ok)
}
const storeDeltasRes = await storeSpaceUsageDeltas(deltas, ctx)
assert.equal(storeDeltasRes.error?.name, 'InsufficientRecords')

const res = await ctx.spaceDiffStore.list({
provider: consumer.provider,
Expand Down
3 changes: 2 additions & 1 deletion stacks/billing-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ export function BillingStack ({ stack, app }) {
function: ucanStreamHandler,
cdk: {
eventSource: {
batchSize: 1,
batchSize: 25, // max dynamo BatchWriteItems size
bisectBatchOnError: true,
startingPosition: StartingPosition.LATEST,
filters: [
FilterCriteria.filter({
Expand Down
Loading