From b3ebcf3e020bd57cd8fec8d6d8945e533e7c6a2a Mon Sep 17 00:00:00 2001 From: beniaminmunteanu Date: Tue, 2 Apr 2024 18:09:08 +0300 Subject: [PATCH] feat(telemetry): move amount collection in ILP connector --- .../backend/src/accounting/psql/service.ts | 2 - packages/backend/src/accounting/service.ts | 21 +--- .../src/accounting/tigerbeetle/service.ts | 2 - packages/backend/src/index.ts | 14 +-- .../core/factories/rafiki-services.ts | 3 + .../connector/core/middleware/telemetry.ts | 22 ++++ .../ilp/connector/core/rafiki.ts | 5 + .../core/test/middleware/telemetry.test.ts | 111 ++++++++++++++++++ .../src/payment-method/ilp/connector/index.ts | 7 ++ 9 files changed, 156 insertions(+), 31 deletions(-) create mode 100644 packages/backend/src/payment-method/ilp/connector/core/middleware/telemetry.ts create mode 100644 packages/backend/src/payment-method/ilp/connector/core/test/middleware/telemetry.test.ts diff --git a/packages/backend/src/accounting/psql/service.ts b/packages/backend/src/accounting/psql/service.ts index 3a64820ea6..b0b491a035 100644 --- a/packages/backend/src/accounting/psql/service.ts +++ b/packages/backend/src/accounting/psql/service.ts @@ -3,7 +3,6 @@ import { TransactionOrKnex } from 'objection' import { v4 as uuid } from 'uuid' import { Asset } from '../../asset/model' import { BaseService } from '../../shared/baseService' -import { TelemetryService } from '../../telemetry/service' import { isTransferError, TransferError } from '../errors' import { AccountingService, @@ -36,7 +35,6 @@ import { import { LedgerTransfer, LedgerTransferType } from './ledger-transfer/model' export interface ServiceDependencies extends BaseService { - telemetry?: TelemetryService knex: TransactionOrKnex withdrawalThrottleDelay?: number } diff --git a/packages/backend/src/accounting/service.ts b/packages/backend/src/accounting/service.ts index 2894023dfb..214dd006a8 100644 --- a/packages/backend/src/accounting/service.ts +++ b/packages/backend/src/accounting/service.ts @@ -1,7 +1,5 @@ import { TransactionOrKnex } from 'objection' import { BaseService } from '../shared/baseService' -import { TelemetryService } from '../telemetry/service' -import { collectTelemetryAmount } from '../telemetry/transaction-amount' import { TransferError, isTransferError } from './errors' export enum LiquidityAccountType { @@ -93,7 +91,6 @@ export interface TransferToCreate { } export interface BaseAccountingServiceDependencies extends BaseService { - telemetry?: TelemetryService withdrawalThrottleDelay?: number } @@ -121,7 +118,7 @@ export async function createAccountToAccountTransfer( transferArgs } = args - const { withdrawalThrottleDelay, telemetry, logger } = deps + const { withdrawalThrottleDelay } = deps const { sourceAccount, destinationAccount, sourceAmount, destinationAmount } = transferArgs @@ -196,22 +193,8 @@ export async function createAccountToAccountTransfer( withdrawalThrottleDelay }) } - - if ( - destinationAccount.onDebit && - telemetry && - sourceAccount.asset.code && - sourceAccount.asset.scale - ) { - collectTelemetryAmount(telemetry, logger, { - amount: sourceAmount, - asset: { - code: sourceAccount.asset.code, - scale: sourceAccount.asset.scale - } - }) - } }, + void: async (): Promise => { const error = await voidTransfers(pendingTransferIdsOrError) diff --git a/packages/backend/src/accounting/tigerbeetle/service.ts b/packages/backend/src/accounting/tigerbeetle/service.ts index d06ead22bb..16e6ffa0e2 100644 --- a/packages/backend/src/accounting/tigerbeetle/service.ts +++ b/packages/backend/src/accounting/tigerbeetle/service.ts @@ -3,7 +3,6 @@ import { v4 as uuid } from 'uuid' import { BaseService } from '../../shared/baseService' import { validateId } from '../../shared/utils' -import { TelemetryService } from '../../telemetry/service' import { AccountAlreadyExistsError, BalanceTransferError, @@ -49,7 +48,6 @@ export const convertToTigerbeetleAccountCode: { } export interface ServiceDependencies extends BaseService { - telemetry?: TelemetryService tigerbeetle: Client withdrawalThrottleDelay?: number } diff --git a/packages/backend/src/index.ts b/packages/backend/src/index.ts index 72aecfbffc..b700495ae3 100644 --- a/packages/backend/src/index.ts +++ b/packages/backend/src/index.ts @@ -208,11 +208,6 @@ export function initIocContainer( const knex = await deps.use('knex') const config = await deps.use('config') - let telemetry: TelemetryService | undefined - if (config.enableTelemetry && config.openTelemetryCollectors.length > 0) { - telemetry = await deps.use('telemetry') - } - if (config.useTigerbeetle) { container.singleton('tigerbeetle', async (deps) => { const config = await deps.use('config') @@ -226,7 +221,6 @@ export function initIocContainer( return createTigerbeetleAccountingService({ logger, - telemetry, knex, tigerbeetle, withdrawalThrottleDelay: config.withdrawalThrottleDelay @@ -235,7 +229,6 @@ export function initIocContainer( return createPsqlAccountingService({ logger, - telemetry, knex, withdrawalThrottleDelay: config.withdrawalThrottleDelay }) @@ -362,6 +355,10 @@ export function initIocContainer( container.singleton('connectorApp', async (deps) => { const config = await deps.use('config') + let telemetry: TelemetryService | undefined + if (config.enableTelemetry) { + telemetry = await deps.use('telemetry') + } return await createConnectorService({ logger: await deps.use('logger'), redis: await deps.use('redis'), @@ -371,7 +368,8 @@ export function initIocContainer( peerService: await deps.use('peerService'), ratesService: await deps.use('ratesService'), streamServer: await deps.use('streamServer'), - ilpAddress: config.ilpAddress + ilpAddress: config.ilpAddress, + telemetry }) }) diff --git a/packages/backend/src/payment-method/ilp/connector/core/factories/rafiki-services.ts b/packages/backend/src/payment-method/ilp/connector/core/factories/rafiki-services.ts index 12343967a9..561c2486bc 100644 --- a/packages/backend/src/payment-method/ilp/connector/core/factories/rafiki-services.ts +++ b/packages/backend/src/payment-method/ilp/connector/core/factories/rafiki-services.ts @@ -5,9 +5,11 @@ import { StreamServer } from '@interledger/stream-receiver' import { RafikiServices } from '../rafiki' import { MockAccountingService } from '../test/mocks/accounting-service' import { TestLoggerFactory } from './test-logger' +import { MockTelemetryService } from '../../../../../tests/telemetry' interface MockRafikiServices extends RafikiServices { accounting: MockAccountingService + telemetry: MockTelemetryService } export const RafikiServicesFactory = Factory.define( @@ -20,6 +22,7 @@ export const RafikiServicesFactory = Factory.define( .attr('accounting', () => { return new MockAccountingService() }) + .attr('telemetry', () => new MockTelemetryService()) .attr('logger', TestLoggerFactory.build()) .attr( 'walletAddresses', diff --git a/packages/backend/src/payment-method/ilp/connector/core/middleware/telemetry.ts b/packages/backend/src/payment-method/ilp/connector/core/middleware/telemetry.ts new file mode 100644 index 0000000000..dd3682284b --- /dev/null +++ b/packages/backend/src/payment-method/ilp/connector/core/middleware/telemetry.ts @@ -0,0 +1,22 @@ +import { collectTelemetryAmount } from '../../../../../telemetry/transaction-amount' +import { ILPContext, ILPMiddleware } from '../rafiki' + +export function createTelemetryMiddleware(): ILPMiddleware { + return async ( + { request, services, accounts, response }: ILPContext, + next: () => Promise + ): Promise => { + await next() + if ( + services.telemetry && + Number(request.prepare.amount) && + response.fulfill + ) { + const { code, scale } = accounts.outgoing.asset + collectTelemetryAmount(services.telemetry, services.logger, { + amount: BigInt(request.prepare.amount), + asset: { code: code, scale: scale } + }) + } + } +} diff --git a/packages/backend/src/payment-method/ilp/connector/core/rafiki.ts b/packages/backend/src/payment-method/ilp/connector/core/rafiki.ts index f5ed36ba9b..f0a7efb7c9 100644 --- a/packages/backend/src/payment-method/ilp/connector/core/rafiki.ts +++ b/packages/backend/src/payment-method/ilp/connector/core/rafiki.ts @@ -26,6 +26,7 @@ import { ZeroCopyIlpPrepare, createIlpPacketMiddleware } from './middleware/ilp-packet' +import { TelemetryService } from '../../../../telemetry/service' // Model classes that represent an Interledger sender, receiver, or // connector SHOULD implement this ConnectorAccount interface. @@ -71,6 +72,7 @@ export interface AccountingService { export interface RafikiServices { //router: Router accounting: AccountingService + telemetry?: TelemetryService walletAddresses: WalletAddressService logger: Logger incomingPayments: IncomingPaymentService @@ -159,6 +161,9 @@ export class Rafiki { get walletAddresses(): WalletAddressService { return config.walletAddresses }, + get telemetry(): TelemetryService | undefined { + return config.telemetry + }, logger } diff --git a/packages/backend/src/payment-method/ilp/connector/core/test/middleware/telemetry.test.ts b/packages/backend/src/payment-method/ilp/connector/core/test/middleware/telemetry.test.ts new file mode 100644 index 0000000000..05b4773747 --- /dev/null +++ b/packages/backend/src/payment-method/ilp/connector/core/test/middleware/telemetry.test.ts @@ -0,0 +1,111 @@ +import assert from 'assert' +import { IlpResponse, OutgoingAccount, ZeroCopyIlpPrepare } from '../..' +import { IncomingAccountFactory, RafikiServicesFactory } from '../../factories' +import { createTelemetryMiddleware } from '../../middleware/telemetry' +import { createILPContext } from '../../utils' + +import { IlpFulfill } from 'ilp-packet' +import * as telemetry from '../../../../../../telemetry/transaction-amount' + +const incomingAccount = IncomingAccountFactory.build({ id: 'alice' }) + +assert.ok(incomingAccount.id) +const services = RafikiServicesFactory.build({}) + +const ctx = createILPContext({ + services, + request: { + prepare: { + amount: 100n + } as unknown as ZeroCopyIlpPrepare, + rawPrepare: Buffer.from('') + }, + accounts: { + incoming: incomingAccount, + outgoing: { asset: { code: 'USD', scale: 2 } } as OutgoingAccount + }, + state: { + unfulfillable: false, + incomingAccount: { + quote: 'exists' + } + }, + response: { + fulfill: 'exists' as unknown as IlpFulfill + } as IlpResponse +}) + +jest.mock('../../../../../../telemetry/transaction-amount') +const middleware = createTelemetryMiddleware() +const next = jest.fn().mockImplementation(() => Promise.resolve()) + +beforeEach(async () => { + incomingAccount.balance = 100n + incomingAccount.asset.scale = 2 + incomingAccount.asset.code = 'USD' +}) + +describe('Telemetry Middleware', function () { + it('does not gather telemetry if telemetry is not enabled (service is undefined)', async () => { + const collectAmountSpy = jest + .spyOn(telemetry, 'collectTelemetryAmount') + .mockImplementation(() => Promise.resolve()) + + await middleware( + { ...ctx, services: { ...ctx.services, telemetry: undefined } }, + next + ) + expect(collectAmountSpy).not.toHaveBeenCalled() + expect(next).toHaveBeenCalled() + }) + + it('does not gather telemetry if response.fulfill undefined', async () => { + const collectAmountSpy = jest.spyOn(telemetry, 'collectTelemetryAmount') + + await middleware( + { ...ctx, response: { fulfill: undefined } as IlpResponse }, + next + ) + + expect(collectAmountSpy).not.toHaveBeenCalled() + expect(next).toHaveBeenCalled() + }) + + it('does not gather telemetry if amount is invalid', async () => { + const collectAmountSpy = jest.spyOn(telemetry, 'collectTelemetryAmount') + + await middleware( + { + ...ctx, + request: { + ...ctx.request, + prepare: { amount: '0' } as ZeroCopyIlpPrepare + } + }, + next + ) + + expect(collectAmountSpy).not.toHaveBeenCalled() + expect(next).toHaveBeenCalled() + }) + + it('gathers telemetry without blocking middleware chain', async () => { + let nextCalled = false + const next = jest.fn().mockImplementation(() => { + nextCalled = true + return Promise.resolve() + }) + + const collectAmountSpy = jest + .spyOn(telemetry, 'collectTelemetryAmount') + .mockImplementation(() => { + expect(nextCalled).toBe(true) + return Promise.resolve() + }) + + await middleware(ctx, next) + + expect(collectAmountSpy).toHaveBeenCalled() + expect(next).toHaveBeenCalled() + }) +}) diff --git a/packages/backend/src/payment-method/ilp/connector/index.ts b/packages/backend/src/payment-method/ilp/connector/index.ts index c46f91552f..c97f3349d2 100644 --- a/packages/backend/src/payment-method/ilp/connector/index.ts +++ b/packages/backend/src/payment-method/ilp/connector/index.ts @@ -28,10 +28,14 @@ import { createStreamController } from './core' +import { TelemetryService } from '../../../telemetry/service' +import { createTelemetryMiddleware } from './core/middleware/telemetry' + interface ServiceDependencies extends BaseService { redis: Redis ratesService: RatesService accountingService: AccountingService + telemetry?: TelemetryService walletAddressService: WalletAddressService incomingPaymentService: IncomingPaymentService peerService: PeerService @@ -44,6 +48,7 @@ export async function createConnectorService({ redis, ratesService, accountingService, + telemetry, walletAddressService, incomingPaymentService, peerService, @@ -57,6 +62,7 @@ export async function createConnectorService({ service: 'ConnectorService' }), accounting: accountingService, + telemetry, walletAddresses: walletAddressService, incomingPayments: incomingPaymentService, peers: peerService, @@ -76,6 +82,7 @@ export async function createConnectorService({ // Local pay createBalanceMiddleware(), + createTelemetryMiddleware(), // Outgoing Rules createStreamController(),