From 827964341f479cbbaea229fdf988ebe01c6c8490 Mon Sep 17 00:00:00 2001 From: Felipe Forbeck Date: Fri, 18 Oct 2024 15:03:11 -0300 Subject: [PATCH] tests: egress traffic --- billing/data/egress.js | 16 ++- billing/functions/egress-traffic-handler.js | 38 ++--- billing/test/helpers/context.js | 68 +++++---- billing/test/helpers/did.js | 2 +- billing/test/helpers/egress.js | 13 +- billing/test/lib/api.ts | 4 + billing/test/lib/egress-traffic.js | 147 +++++++++++++------- 7 files changed, 180 insertions(+), 108 deletions(-) diff --git a/billing/data/egress.js b/billing/data/egress.js index 9093f3e7..d44bbd3c 100644 --- a/billing/data/egress.js +++ b/billing/data/egress.js @@ -1,3 +1,4 @@ +import { Link } from '@ucanto/server' import { DecodeFailure, EncodeFailure, Schema } from './lib.js' export const egressSchema = Schema.struct({ @@ -13,7 +14,14 @@ export const validate = input => egressSchema.read(input) /** @type {import('../lib/api').Encoder} */ export const encode = input => { try { - return { ok: JSON.stringify(input) } + return { + ok: JSON.stringify({ + customer: input.customer.toString(), + resource: input.resource.toString(), + bytes: input.bytes.toString(), + servedAt: input.servedAt.toISOString(), + }) + } } catch (/** @type {any} */ err) { return { error: new EncodeFailure(`encoding string egress event: ${err.message}`, { cause: err }) @@ -27,9 +35,9 @@ export const decode = input => { return { ok: { customer: Schema.did({ method: 'mailto' }).from(input.customer), - resource: Schema.link().from(input.resourceId), - bytes: Schema.bigint().from(input.bytes), - servedAt: Schema.date().from(input.servedAt), + resource: Link.parse(/** @type {string} */(input.resource)), + bytes: BigInt(input.bytes), + servedAt: new Date(input.servedAt), } } } catch (/** @type {any} */ err) { diff --git a/billing/functions/egress-traffic-handler.js b/billing/functions/egress-traffic-handler.js index 8c54c6f4..cb5d4cfa 100644 --- a/billing/functions/egress-traffic-handler.js +++ b/billing/functions/egress-traffic-handler.js @@ -1,7 +1,6 @@ import * as Sentry from '@sentry/serverless' import { expect } from './lib.js' -import { decode } from '../data/egress.js' -import { SQSClient, DeleteMessageCommand } from '@aws-sdk/client-sqs' +import { decodeStr } from '../data/egress.js' import { mustGetEnv } from '../../lib/env.js' import { createCustomerStore } from '../tables/customer.js' import Stripe from 'stripe' @@ -16,10 +15,11 @@ Sentry.AWSLambda.init({ /** * @typedef {{ * region?: 'us-west-2'|'us-east-2' - * queueUrl?: string + * egressTrafficQueueUrl?: string * customerTable?: string * billingMeterName?: string * stripeSecretKey?: string + * customerStore?: import('../lib/api').CustomerStore * }} CustomHandlerContext */ @@ -37,21 +37,23 @@ export const handler = Sentry.AWSLambda.wrapHandler( /** @type {CustomHandlerContext|undefined} */ const customContext = context?.clientContext?.Custom const region = customContext?.region ?? mustGetEnv('AWS_REGION') - const queueUrl = customContext?.queueUrl ?? mustGetEnv('EGRESS_TRAFFIC_QUEUE_URL') + // const queueUrl = customContext?.egressTrafficQueueUrl ?? mustGetEnv('EGRESS_TRAFFIC_QUEUE_URL') + // const sqsClient = new SQSClient({ region }) const customerTable = customContext?.customerTable ?? mustGetEnv('CUSTOMER_TABLE_NAME') - const sqsClient = new SQSClient({ region }) - const customerStore = createCustomerStore({ region }, { tableName: customerTable }) - const billingMeterName = customContext?.billingMeterName ?? 'gateway_egress_traffic' - const stripeSecretKey = customContext?.stripeSecretKey ?? Config.STRIPE_SECRET_KEY + const customerStore = customContext?.customerStore ?? createCustomerStore({ region }, { tableName: customerTable }) + const stripeSecretKey = customContext?.stripeSecretKey ?? Config.STRIPE_SECRET_KEY if (!stripeSecretKey) throw new Error('missing secret: STRIPE_SECRET_KEY') + + const billingMeterName = customContext?.billingMeterName ?? mustGetEnv('STRIPE_BILLING_METER_EVENT_NAME') + if (!billingMeterName) throw new Error('missing secret: STRIPE_BILLING_METER_EVENT_NAME') + const stripe = new Stripe(stripeSecretKey, { apiVersion: '2023-10-16' }) for (const record of event.Records) { try { - const messageBody = JSON.parse(record.body) - const decoded = decode(messageBody) - const egressEvent = expect(decoded, 'Failed to decode egress message') + const decoded = decodeStr(record.body) + const egressEvent = expect(decoded, 'Failed to decode egress event') expect( await recordEgress(customerStore, stripe, billingMeterName, egressEvent), @@ -62,10 +64,10 @@ export const handler = Sentry.AWSLambda.wrapHandler( * SQS requires explicit acknowledgment that a message has been successfully processed. * This is done by deleting the message from the queue using its ReceiptHandle */ - await sqsClient.send(new DeleteMessageCommand({ - QueueUrl: queueUrl, - ReceiptHandle: record.receiptHandle - })) + // await sqsClient.send(new DeleteMessageCommand({ + // QueueUrl: queueUrl, + // ReceiptHandle: record.receiptHandle + // })) } catch (error) { console.error('Error processing egress event:', error) } @@ -83,10 +85,10 @@ export const handler = Sentry.AWSLambda.wrapHandler( * * @param {import('../lib/api.js').CustomerStore} customerStore * @param {import('stripe').Stripe} stripe - * @param {string} billingMeterName + * @param {string} billingMeterEventName * @param {import('../lib/api.js').EgressTrafficData} egressEventData */ -async function recordEgress(customerStore, stripe, billingMeterName, egressEventData) { +async function recordEgress(customerStore, stripe, billingMeterEventName, egressEventData) { const response = await customerStore.get({ customer: egressEventData.customer }) if (response.error) { return { @@ -111,7 +113,7 @@ async function recordEgress(customerStore, stripe, billingMeterName, egressEvent /** @type {import('stripe').Stripe.Billing.MeterEvent} */ const meterEvent = await stripe.billing.meterEvents.create({ - event_name: billingMeterName, + event_name: billingMeterEventName, payload: { stripe_customer_id: stripeCustomerId, value: egressEventData.bytes.toString(), diff --git a/billing/test/helpers/context.js b/billing/test/helpers/context.js index 6e061217..0ee2a2cc 100644 --- a/billing/test/helpers/context.js +++ b/billing/test/helpers/context.js @@ -1,6 +1,5 @@ import dotenv from 'dotenv' import path from 'node:path' -import Stripe from 'stripe' import { createDynamoDB, createSQS, createQueue, createTable } from './aws.js' import { createCustomerStore, customerTableProps } from '../../tables/customer.js' import { encode as encodeCustomer, validate as validateCustomer } from '../../data/customer.js' @@ -22,6 +21,7 @@ import { createUsageStore, usageTableProps } from '../../tables/usage.js' import { createQueueRemoverClient } from './queue.js' import { createEgressTrafficQueue } from '../../queues/egress-traffic.js' import { handler as createEgressTrafficHandler } from '../../functions/egress-traffic-handler.js' +import Stripe from 'stripe' dotenv.config({ path: path.resolve('../.env.local'), override: true, debug: true }) @@ -41,6 +41,26 @@ const createAWSServices = async () => { } } +/** + * @returns {{ stripe: Stripe, stripeSecretKey: string, billingMeterEventName: string, billingMeterId: string }} + */ +const createStripeService = () => { + const stripeSecretKey = process.env.STRIPE_TEST_SECRET_KEY + if (!stripeSecretKey) { + throw new Error('STRIPE_TEST_SECRET_KEY environment variable is not set') + } + const billingMeterEventName = process.env.STRIPE_BILLING_METER_EVENT_NAME + if (!billingMeterEventName) { + throw new Error('STRIPE_BILLING_METER_EVENT_NAME environment variable is not set') + } + const billingMeterId = process.env.STRIPE_BILLING_METER_ID + if (!billingMeterId) { + throw new Error('STRIPE_BILLING_METER_ID environment variable is not set') + } + const stripe = new Stripe(stripeSecretKey, { apiVersion: "2023-10-16" }) + return { stripe, stripeSecretKey, billingMeterEventName, billingMeterId } +} + export const createBillingCronTestContext = async () => { await createAWSServices() @@ -150,10 +170,6 @@ export const createUCANStreamTestContext = async () => { */ export const createEgressTrafficTestContext = async () => { await createAWSServices() - const stripeSecretKey = process.env.STRIPE_TEST_SECRET_KEY - if (!stripeSecretKey) { - throw new Error('STRIPE_TEST_SECRET_KEY environment variable is not set') - } const egressQueueURL = new URL(await createQueue(awsServices.sqs.client, 'egress-traffic-queue-')) const egressTrafficQueue = { @@ -162,37 +178,33 @@ export const createEgressTrafficTestContext = async () => { } const accountId = (await awsServices.sqs.client.config.credentials()).accountId - const region = await awsServices.sqs.client.config.region() + const region = 'us-west-2' + + const customerTable = await createTable(awsServices.dynamo.client, customerTableProps, 'customer-') + const customerStore = { + ...createCustomerStore(awsServices.dynamo.client, { tableName: customerTable }), + ...createStorePutterClient(awsServices.dynamo.client, { + tableName: customerTable, + validate: validateCustomer, // assume test data is valid + encode: encodeCustomer + }) + } + + const { stripe, stripeSecretKey, billingMeterEventName, billingMeterId } = createStripeService() + // @ts-expect-error -- Don't need to initialize the full lambda context for testing return { egressTrafficQueue, egressTrafficQueueUrl: egressQueueURL.toString(), egressTrafficHandler: createEgressTrafficHandler, accountId: accountId ?? '', region: region ?? '', + customerTable, + customerStore, + billingMeterEventName, + billingMeterId, stripeSecretKey, - stripe: new Stripe(stripeSecretKey, { apiVersion: '2023-10-16' }), - // Add mock properties for default Context - callbackWaitsForEmptyEventLoop: false, - functionName: 'egress-traffic-handler', - functionVersion: '1', - invokedFunctionArn: `arn:aws:lambda:${region}:${accountId}:function:egress-traffic-handler`, - memoryLimitInMB: '128', - awsRequestId: 'mockRequestId', - logGroupName: 'mockLogGroup', - logStreamName: 'mockLogStream', - identity: undefined, - clientContext: undefined, - getRemainingTimeInMillis: () => 30000, // mock implementation - done: () => { - console.log('Egress traffic handler done') - }, - fail: () => { - console.log('Egress traffic handler fail') - }, - succeed: () => { - console.log('Egress traffic handler succeed') - } + stripe, } } diff --git a/billing/test/helpers/did.js b/billing/test/helpers/did.js index c58fb9e7..3f0bc759 100644 --- a/billing/test/helpers/did.js +++ b/billing/test/helpers/did.js @@ -8,7 +8,7 @@ const randomDomain = () => `${randomAlphas(randomInteger(1, 32))}.${tlds[randomInteger(0, tlds.length)]}` /** @returns {import("@ucanto/interface").DID<'mailto'>} */ -export const randomDIDMailto = () => +export const randomDIDMailto = () => `did:mailto:${randomDomain()}:${randomAlphas(randomInteger(1, 16))}` /** @returns {Promise} */ diff --git a/billing/test/helpers/egress.js b/billing/test/helpers/egress.js index 71b0c876..f2110081 100644 --- a/billing/test/helpers/egress.js +++ b/billing/test/helpers/egress.js @@ -1,14 +1,13 @@ -import { randomDIDMailto } from './did.js' import { randomLink } from './dag.js' /** - * @param {Partial} [base] - * @returns {Promise} + * @param {import('../../lib/api').Customer} customer + * @returns {import('../../lib/api').EgressTrafficData} */ -export const randomEgressEvent = async (base = {}) => ({ - customer: await randomDIDMailto(), +export const randomEgressEvent = (customer) => ({ + customer: customer.customer, resource: randomLink(), bytes: BigInt(Math.floor(Math.random() * 1000000)), - servedAt: new Date(), - ...base + // Random timestamp within the last 1 hour + servedAt: new Date(Date.now() - Math.floor(Math.random() * 60 * 60 * 1000)), }) \ No newline at end of file diff --git a/billing/test/lib/api.ts b/billing/test/lib/api.ts index fbee21d4..081e1186 100644 --- a/billing/test/lib/api.ts +++ b/billing/test/lib/api.ts @@ -58,6 +58,10 @@ export interface EgressTrafficTestContext extends Context { egressTrafficHandler: Handler accountId: string region: string + customerTable: string + customerStore: CustomerStore + billingMeterEventName: string + billingMeterId: string stripeSecretKey: string stripe: Stripe } diff --git a/billing/test/lib/egress-traffic.js b/billing/test/lib/egress-traffic.js index ceb26cfa..3d3cabab 100644 --- a/billing/test/lib/egress-traffic.js +++ b/billing/test/lib/egress-traffic.js @@ -1,62 +1,109 @@ +import { encode } from '../../data/egress.js' +import { randomCustomer } from '../helpers/customer.js' +import { randomDIDMailto } from '../helpers/did.js' import { randomEgressEvent } from '../helpers/egress.js' +import * as DidMailto from '@web3-storage/did-mailto' /** @type {import('./api').TestSuite} */ export const test = { - 'should process egress events': async (/** @type {import('entail').assert} */ assert, ctx) => { - const maxEvents = 100 - const events = await Promise.all( - Array.from({ length: maxEvents }, () => randomEgressEvent()) - ) + /** + * @param {import('entail').assert} assert + * @param {import('./api').EgressTrafficTestContext} ctx + */ + 'should process all the egress traffic events from the queue': async (assert, ctx) => { + let stripeCustomerId; + try { + // 0. Create a test customer email, add it to stripe and to the customer store + const didMailto = randomDIDMailto() + const email = DidMailto.toEmail(/** @type {`did:mailto:${string}:${string}`} */(didMailto)) + const stripeCustomer = await ctx.stripe.customers.create({ email }) + assert.ok(stripeCustomer.id, 'Error adding customer to stripe') + stripeCustomerId = stripeCustomer.id - // 1. Add egress events to the queue to simulate events from the Freeway worker - for (const e of events) { - console.log(`Adding egress event to the queue: CustomerId: ${e.customer}, ResourceId: ${e.resource}, ServedAt: ${e.servedAt.toISOString()}`) - await ctx.egressTrafficQueue.add(e) - } + const customer = randomCustomer({ + customer: didMailto, + /** @type {`stripe:${string}`} */ + account: `stripe:${stripeCustomerId}` + }) + const { error } = await ctx.customerStore.put(customer) + assert.ok(!error, 'Error adding customer') + // 1. Add egress events to the queue to simulate egress traffic from the Freeway worker + const maxEvents = 10 + /** @type {import('../../lib/api').EgressTrafficData[]} */ + const events = await Promise.all( + Array.from( + { length: maxEvents }, + () => randomEgressEvent(customer) + ) + ) - // 2. Create a SQS event batch - // @type {import('aws-lambda').SQSEvent} - const sqsEventBatch = { - Records: events.map(e => ({ - // @type {import('aws-lambda').SQSRecord} - body: JSON.stringify(e), - messageId: Math.random().toString(), - receiptHandle: Math.random().toString(), - awsRegion: ctx.region, - eventSource: 'aws:sqs', - eventSourceARN: `arn:aws:sqs:${ctx.region}:${ctx.accountId}:${ctx.egressTrafficQueueUrl}`, - awsAccountId: ctx.accountId, - md5OfBody: '', - md5OfMessageAttributes: '', - attributes: { - ApproximateReceiveCount: '1', - SentTimestamp: e.servedAt.getTime().toString(), - SenderId: ctx.accountId, - ApproximateFirstReceiveTimestamp: e.servedAt.getTime().toString(), - }, - messageAttributes: {}, - })) - } + for (const e of events) { + console.log(`Egress traffic for ${e.customer}, bytes: ${e.bytes}, servedAt: ${e.servedAt.toISOString()}, `) + const result = await ctx.egressTrafficQueue.add(e) + assert.ok(!result.error, 'Error adding egress event to the queue') + } - // 3. Process the SQS event to trigger the handler - await ctx.egressTrafficHandler(sqsEventBatch, ctx, (err, res) => { - if (err) { - assert.fail(err) + // 2. Create a SQS event batch + // @type {import('aws-lambda').SQSEvent} + const sqsEventBatch = { + Records: events.map(e => ({ + // @type {import('aws-lambda').SQSRecord} + body: encode(e).ok ?? '', + messageId: Math.random().toString(), + receiptHandle: Math.random().toString(), + awsRegion: ctx.region, + eventSource: 'aws:sqs', + eventSourceARN: `arn:aws:sqs:${ctx.region}:${ctx.accountId}:${ctx.egressTrafficQueueUrl}`, + awsAccountId: ctx.accountId, + md5OfBody: '', + md5OfMessageAttributes: '', + attributes: { + ApproximateReceiveCount: '1', + SentTimestamp: e.servedAt.getTime().toString(), + SenderId: ctx.accountId, + ApproximateFirstReceiveTimestamp: e.servedAt.getTime().toString(), + }, + messageAttributes: {}, + })) } - assert.ok(res) - assert.equal(res.statusCode, 200) - assert.equal(res.body, 'Egress events processed successfully') - }) - // 4. Ensure we got a billing meter event or each egress event in the queue - // query stripe for the billing meter events - // const billingMeterEvents = await ctx.stripe.billing.meterEvents.list({ - // limit: maxEvents, - // }) - // assert.equal(billingMeterEvents.data.length, events.length) - // FIXME (fforbeck): how to check we send the events to stripe? - // we need to mock the stripe client - // and check that the correct events are sent to stripe + // 3. Process the SQS event to trigger the handler using the custom context + const customCtx = { + clientContext: { + Custom: ctx, + }, + } + // @ts-expect-error -- Don't need to initialize the full lambda context for testing + await ctx.egressTrafficHandler(sqsEventBatch, customCtx, (err, res) => { + if (err) { + assert.fail(err) + } + assert.ok(res) + assert.equal(res.statusCode, 200) + assert.equal(res.body, 'Egress events processed successfully') + }) + + // 4. Check if the aggregated meter event exists and has a value greater than 0 + const aggregatedMeterEvent = await ctx.stripe.billing.meters.listEventSummaries( + ctx.billingMeterId, + { + customer: stripeCustomerId, + start_time: Math.floor(events[0].servedAt.getTime() / 1000), + end_time: Math.floor(Date.now() / 1000), + } + ) + assert.ok(aggregatedMeterEvent.data, 'No aggregated meter event found') + assert.equal(aggregatedMeterEvent.data.length, 1, 'Expected 1 aggregated meter event') + // We can't verify the total bytes served because the meter events are not immediately available in stripe + // and the test would fail intermittently + assert.ok(aggregatedMeterEvent.data[0].aggregated_value > 0, 'Aggregated value is 0') + } finally { + if (stripeCustomerId) { + // 5. Delete the test customer from stripe + const deletedCustomer = await ctx.stripe.customers.del(stripeCustomerId); + assert.ok(deletedCustomer.deleted, 'Error deleting customer from stripe') + } + } } } \ No newline at end of file