Skip to content

Commit

Permalink
tests: egress traffic
Browse files Browse the repository at this point in the history
  • Loading branch information
fforbeck committed Oct 18, 2024
1 parent 8ed550e commit 8279643
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 108 deletions.
16 changes: 12 additions & 4 deletions billing/data/egress.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Link } from '@ucanto/server'
import { DecodeFailure, EncodeFailure, Schema } from './lib.js'

export const egressSchema = Schema.struct({
Expand All @@ -13,7 +14,14 @@ export const validate = input => egressSchema.read(input)
/** @type {import('../lib/api').Encoder<import('../lib/api').EgressTrafficData, string>} */
export const encode = input => {
try {
return { ok: JSON.stringify(input) }
return {
ok: JSON.stringify({
customer: input.customer.toString(),
resource: input.resource.toString(),
bytes: input.bytes.toString(),
servedAt: input.servedAt.toISOString(),
})
}
} catch (/** @type {any} */ err) {
return {
error: new EncodeFailure(`encoding string egress event: ${err.message}`, { cause: err })
Expand All @@ -27,9 +35,9 @@ export const decode = input => {
return {
ok: {
customer: Schema.did({ method: 'mailto' }).from(input.customer),
resource: Schema.link().from(input.resourceId),
bytes: Schema.bigint().from(input.bytes),
servedAt: Schema.date().from(input.servedAt),
resource: Link.parse(/** @type {string} */(input.resource)),
bytes: BigInt(input.bytes),
servedAt: new Date(input.servedAt),
}
}
} catch (/** @type {any} */ err) {
Expand Down
38 changes: 20 additions & 18 deletions billing/functions/egress-traffic-handler.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import * as Sentry from '@sentry/serverless'
import { expect } from './lib.js'
import { decode } from '../data/egress.js'
import { SQSClient, DeleteMessageCommand } from '@aws-sdk/client-sqs'
import { decodeStr } from '../data/egress.js'
import { mustGetEnv } from '../../lib/env.js'
import { createCustomerStore } from '../tables/customer.js'
import Stripe from 'stripe'
Expand All @@ -16,10 +15,11 @@ Sentry.AWSLambda.init({
/**
* @typedef {{
* region?: 'us-west-2'|'us-east-2'
* queueUrl?: string
* egressTrafficQueueUrl?: string
* customerTable?: string
* billingMeterName?: string
* stripeSecretKey?: string
* customerStore?: import('../lib/api').CustomerStore
* }} CustomHandlerContext
*/

Expand All @@ -37,21 +37,23 @@ export const handler = Sentry.AWSLambda.wrapHandler(
/** @type {CustomHandlerContext|undefined} */
const customContext = context?.clientContext?.Custom
const region = customContext?.region ?? mustGetEnv('AWS_REGION')
const queueUrl = customContext?.queueUrl ?? mustGetEnv('EGRESS_TRAFFIC_QUEUE_URL')
// const queueUrl = customContext?.egressTrafficQueueUrl ?? mustGetEnv('EGRESS_TRAFFIC_QUEUE_URL')
// const sqsClient = new SQSClient({ region })
const customerTable = customContext?.customerTable ?? mustGetEnv('CUSTOMER_TABLE_NAME')
const sqsClient = new SQSClient({ region })
const customerStore = createCustomerStore({ region }, { tableName: customerTable })
const billingMeterName = customContext?.billingMeterName ?? 'gateway_egress_traffic'
const stripeSecretKey = customContext?.stripeSecretKey ?? Config.STRIPE_SECRET_KEY
const customerStore = customContext?.customerStore ?? createCustomerStore({ region }, { tableName: customerTable })

const stripeSecretKey = customContext?.stripeSecretKey ?? Config.STRIPE_SECRET_KEY
if (!stripeSecretKey) throw new Error('missing secret: STRIPE_SECRET_KEY')

const billingMeterName = customContext?.billingMeterName ?? mustGetEnv('STRIPE_BILLING_METER_EVENT_NAME')
if (!billingMeterName) throw new Error('missing secret: STRIPE_BILLING_METER_EVENT_NAME')

const stripe = new Stripe(stripeSecretKey, { apiVersion: '2023-10-16' })

for (const record of event.Records) {
try {
const messageBody = JSON.parse(record.body)
const decoded = decode(messageBody)
const egressEvent = expect(decoded, 'Failed to decode egress message')
const decoded = decodeStr(record.body)
const egressEvent = expect(decoded, 'Failed to decode egress event')

expect(
await recordEgress(customerStore, stripe, billingMeterName, egressEvent),
Expand All @@ -62,10 +64,10 @@ export const handler = Sentry.AWSLambda.wrapHandler(
* SQS requires explicit acknowledgment that a message has been successfully processed.
* This is done by deleting the message from the queue using its ReceiptHandle
*/
await sqsClient.send(new DeleteMessageCommand({
QueueUrl: queueUrl,
ReceiptHandle: record.receiptHandle
}))
// await sqsClient.send(new DeleteMessageCommand({
// QueueUrl: queueUrl,
// ReceiptHandle: record.receiptHandle
// }))
} catch (error) {
console.error('Error processing egress event:', error)
}
Expand All @@ -83,10 +85,10 @@ export const handler = Sentry.AWSLambda.wrapHandler(
*
* @param {import('../lib/api.js').CustomerStore} customerStore
* @param {import('stripe').Stripe} stripe
* @param {string} billingMeterName
* @param {string} billingMeterEventName
* @param {import('../lib/api.js').EgressTrafficData} egressEventData
*/
async function recordEgress(customerStore, stripe, billingMeterName, egressEventData) {
async function recordEgress(customerStore, stripe, billingMeterEventName, egressEventData) {
const response = await customerStore.get({ customer: egressEventData.customer })
if (response.error) {
return {
Expand All @@ -111,7 +113,7 @@ async function recordEgress(customerStore, stripe, billingMeterName, egressEvent

/** @type {import('stripe').Stripe.Billing.MeterEvent} */
const meterEvent = await stripe.billing.meterEvents.create({
event_name: billingMeterName,
event_name: billingMeterEventName,
payload: {
stripe_customer_id: stripeCustomerId,
value: egressEventData.bytes.toString(),
Expand Down
68 changes: 40 additions & 28 deletions billing/test/helpers/context.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import dotenv from 'dotenv'
import path from 'node:path'
import Stripe from 'stripe'
import { createDynamoDB, createSQS, createQueue, createTable } from './aws.js'
import { createCustomerStore, customerTableProps } from '../../tables/customer.js'
import { encode as encodeCustomer, validate as validateCustomer } from '../../data/customer.js'
Expand All @@ -22,6 +21,7 @@ import { createUsageStore, usageTableProps } from '../../tables/usage.js'
import { createQueueRemoverClient } from './queue.js'
import { createEgressTrafficQueue } from '../../queues/egress-traffic.js'
import { handler as createEgressTrafficHandler } from '../../functions/egress-traffic-handler.js'
import Stripe from 'stripe'

dotenv.config({ path: path.resolve('../.env.local'), override: true, debug: true })

Expand All @@ -41,6 +41,26 @@ const createAWSServices = async () => {
}
}

/**
* @returns {{ stripe: Stripe, stripeSecretKey: string, billingMeterEventName: string, billingMeterId: string }}
*/
const createStripeService = () => {
const stripeSecretKey = process.env.STRIPE_TEST_SECRET_KEY
if (!stripeSecretKey) {
throw new Error('STRIPE_TEST_SECRET_KEY environment variable is not set')
}
const billingMeterEventName = process.env.STRIPE_BILLING_METER_EVENT_NAME
if (!billingMeterEventName) {
throw new Error('STRIPE_BILLING_METER_EVENT_NAME environment variable is not set')
}
const billingMeterId = process.env.STRIPE_BILLING_METER_ID
if (!billingMeterId) {
throw new Error('STRIPE_BILLING_METER_ID environment variable is not set')
}
const stripe = new Stripe(stripeSecretKey, { apiVersion: "2023-10-16" })
return { stripe, stripeSecretKey, billingMeterEventName, billingMeterId }
}

export const createBillingCronTestContext = async () => {
await createAWSServices()

Expand Down Expand Up @@ -150,10 +170,6 @@ export const createUCANStreamTestContext = async () => {
*/
export const createEgressTrafficTestContext = async () => {
await createAWSServices()
const stripeSecretKey = process.env.STRIPE_TEST_SECRET_KEY
if (!stripeSecretKey) {
throw new Error('STRIPE_TEST_SECRET_KEY environment variable is not set')
}

const egressQueueURL = new URL(await createQueue(awsServices.sqs.client, 'egress-traffic-queue-'))
const egressTrafficQueue = {
Expand All @@ -162,37 +178,33 @@ export const createEgressTrafficTestContext = async () => {
}

const accountId = (await awsServices.sqs.client.config.credentials()).accountId
const region = await awsServices.sqs.client.config.region()
const region = 'us-west-2'

const customerTable = await createTable(awsServices.dynamo.client, customerTableProps, 'customer-')
const customerStore = {
...createCustomerStore(awsServices.dynamo.client, { tableName: customerTable }),
...createStorePutterClient(awsServices.dynamo.client, {
tableName: customerTable,
validate: validateCustomer, // assume test data is valid
encode: encodeCustomer
})
}

const { stripe, stripeSecretKey, billingMeterEventName, billingMeterId } = createStripeService()

// @ts-expect-error -- Don't need to initialize the full lambda context for testing
return {
egressTrafficQueue,
egressTrafficQueueUrl: egressQueueURL.toString(),
egressTrafficHandler: createEgressTrafficHandler,
accountId: accountId ?? '',
region: region ?? '',
customerTable,
customerStore,
billingMeterEventName,
billingMeterId,
stripeSecretKey,
stripe: new Stripe(stripeSecretKey, { apiVersion: '2023-10-16' }),
// Add mock properties for default Context
callbackWaitsForEmptyEventLoop: false,
functionName: 'egress-traffic-handler',
functionVersion: '1',
invokedFunctionArn: `arn:aws:lambda:${region}:${accountId}:function:egress-traffic-handler`,
memoryLimitInMB: '128',
awsRequestId: 'mockRequestId',
logGroupName: 'mockLogGroup',
logStreamName: 'mockLogStream',
identity: undefined,
clientContext: undefined,
getRemainingTimeInMillis: () => 30000, // mock implementation
done: () => {
console.log('Egress traffic handler done')
},
fail: () => {
console.log('Egress traffic handler fail')
},
succeed: () => {
console.log('Egress traffic handler succeed')
}
stripe,
}
}

Expand Down
2 changes: 1 addition & 1 deletion billing/test/helpers/did.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const randomDomain = () =>
`${randomAlphas(randomInteger(1, 32))}.${tlds[randomInteger(0, tlds.length)]}`

/** @returns {import("@ucanto/interface").DID<'mailto'>} */
export const randomDIDMailto = () =>
export const randomDIDMailto = () =>
`did:mailto:${randomDomain()}:${randomAlphas(randomInteger(1, 16))}`

/** @returns {Promise<import("@ucanto/interface").DID>} */
Expand Down
13 changes: 6 additions & 7 deletions billing/test/helpers/egress.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import { randomDIDMailto } from './did.js'
import { randomLink } from './dag.js'

/**
* @param {Partial<import('../../lib/api').EgressTrafficData>} [base]
* @returns {Promise<import('../../lib/api').EgressTrafficData>}
* @param {import('../../lib/api').Customer} customer
* @returns {import('../../lib/api').EgressTrafficData}
*/
export const randomEgressEvent = async (base = {}) => ({
customer: await randomDIDMailto(),
export const randomEgressEvent = (customer) => ({
customer: customer.customer,
resource: randomLink(),
bytes: BigInt(Math.floor(Math.random() * 1000000)),
servedAt: new Date(),
...base
// Random timestamp within the last 1 hour
servedAt: new Date(Date.now() - Math.floor(Math.random() * 60 * 60 * 1000)),
})
4 changes: 4 additions & 0 deletions billing/test/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ export interface EgressTrafficTestContext extends Context {
egressTrafficHandler: Handler<SQSEvent, { statusCode: number, body: string }>
accountId: string
region: string
customerTable: string
customerStore: CustomerStore
billingMeterEventName: string
billingMeterId: string
stripeSecretKey: string
stripe: Stripe
}
Expand Down
Loading

0 comments on commit 8279643

Please sign in to comment.