Skip to content

Commit

Permalink
feat(telemetry): move amount collection in ILP connector
Browse files Browse the repository at this point in the history
  • Loading branch information
beniaminmunteanu committed Apr 2, 2024
1 parent 88ae1d4 commit b3ebcf3
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 31 deletions.
2 changes: 0 additions & 2 deletions packages/backend/src/accounting/psql/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { TransactionOrKnex } from 'objection'
import { v4 as uuid } from 'uuid'
import { Asset } from '../../asset/model'
import { BaseService } from '../../shared/baseService'
import { TelemetryService } from '../../telemetry/service'
import { isTransferError, TransferError } from '../errors'
import {
AccountingService,
Expand Down Expand Up @@ -36,7 +35,6 @@ import {
import { LedgerTransfer, LedgerTransferType } from './ledger-transfer/model'

export interface ServiceDependencies extends BaseService {
telemetry?: TelemetryService
knex: TransactionOrKnex
withdrawalThrottleDelay?: number
}
Expand Down
21 changes: 2 additions & 19 deletions packages/backend/src/accounting/service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import { TransactionOrKnex } from 'objection'
import { BaseService } from '../shared/baseService'
import { TelemetryService } from '../telemetry/service'
import { collectTelemetryAmount } from '../telemetry/transaction-amount'
import { TransferError, isTransferError } from './errors'

export enum LiquidityAccountType {
Expand Down Expand Up @@ -93,7 +91,6 @@ export interface TransferToCreate {
}

export interface BaseAccountingServiceDependencies extends BaseService {
telemetry?: TelemetryService
withdrawalThrottleDelay?: number
}

Expand Down Expand Up @@ -121,7 +118,7 @@ export async function createAccountToAccountTransfer(
transferArgs
} = args

const { withdrawalThrottleDelay, telemetry, logger } = deps
const { withdrawalThrottleDelay } = deps

const { sourceAccount, destinationAccount, sourceAmount, destinationAmount } =
transferArgs
Expand Down Expand Up @@ -196,22 +193,8 @@ export async function createAccountToAccountTransfer(
withdrawalThrottleDelay
})
}

if (
destinationAccount.onDebit &&
telemetry &&
sourceAccount.asset.code &&
sourceAccount.asset.scale
) {
collectTelemetryAmount(telemetry, logger, {
amount: sourceAmount,
asset: {
code: sourceAccount.asset.code,
scale: sourceAccount.asset.scale
}
})
}
},

void: async (): Promise<void | TransferError> => {
const error = await voidTransfers(pendingTransferIdsOrError)

Expand Down
2 changes: 0 additions & 2 deletions packages/backend/src/accounting/tigerbeetle/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { v4 as uuid } from 'uuid'

import { BaseService } from '../../shared/baseService'
import { validateId } from '../../shared/utils'
import { TelemetryService } from '../../telemetry/service'
import {
AccountAlreadyExistsError,
BalanceTransferError,
Expand Down Expand Up @@ -49,7 +48,6 @@ export const convertToTigerbeetleAccountCode: {
}

export interface ServiceDependencies extends BaseService {
telemetry?: TelemetryService
tigerbeetle: Client
withdrawalThrottleDelay?: number
}
Expand Down
14 changes: 6 additions & 8 deletions packages/backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,6 @@ export function initIocContainer(
const knex = await deps.use('knex')
const config = await deps.use('config')

let telemetry: TelemetryService | undefined
if (config.enableTelemetry && config.openTelemetryCollectors.length > 0) {
telemetry = await deps.use('telemetry')
}

if (config.useTigerbeetle) {
container.singleton('tigerbeetle', async (deps) => {
const config = await deps.use('config')
Expand All @@ -226,7 +221,6 @@ export function initIocContainer(

return createTigerbeetleAccountingService({
logger,
telemetry,
knex,
tigerbeetle,
withdrawalThrottleDelay: config.withdrawalThrottleDelay
Expand All @@ -235,7 +229,6 @@ export function initIocContainer(

return createPsqlAccountingService({
logger,
telemetry,
knex,
withdrawalThrottleDelay: config.withdrawalThrottleDelay
})
Expand Down Expand Up @@ -362,6 +355,10 @@ export function initIocContainer(

container.singleton('connectorApp', async (deps) => {
const config = await deps.use('config')
let telemetry: TelemetryService | undefined
if (config.enableTelemetry) {
telemetry = await deps.use('telemetry')
}
return await createConnectorService({
logger: await deps.use('logger'),
redis: await deps.use('redis'),
Expand All @@ -371,7 +368,8 @@ export function initIocContainer(
peerService: await deps.use('peerService'),
ratesService: await deps.use('ratesService'),
streamServer: await deps.use('streamServer'),
ilpAddress: config.ilpAddress
ilpAddress: config.ilpAddress,
telemetry
})
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import { StreamServer } from '@interledger/stream-receiver'
import { RafikiServices } from '../rafiki'
import { MockAccountingService } from '../test/mocks/accounting-service'
import { TestLoggerFactory } from './test-logger'
import { MockTelemetryService } from '../../../../../tests/telemetry'

interface MockRafikiServices extends RafikiServices {
accounting: MockAccountingService
telemetry: MockTelemetryService
}

export const RafikiServicesFactory = Factory.define<MockRafikiServices>(
Expand All @@ -20,6 +22,7 @@ export const RafikiServicesFactory = Factory.define<MockRafikiServices>(
.attr('accounting', () => {
return new MockAccountingService()
})
.attr('telemetry', () => new MockTelemetryService())
.attr('logger', TestLoggerFactory.build())
.attr(
'walletAddresses',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { collectTelemetryAmount } from '../../../../../telemetry/transaction-amount'
import { ILPContext, ILPMiddleware } from '../rafiki'

export function createTelemetryMiddleware(): ILPMiddleware {
return async (
{ request, services, accounts, response }: ILPContext,
next: () => Promise<void>
): Promise<void> => {
await next()
if (
services.telemetry &&
Number(request.prepare.amount) &&
response.fulfill
) {
const { code, scale } = accounts.outgoing.asset
collectTelemetryAmount(services.telemetry, services.logger, {
amount: BigInt(request.prepare.amount),
asset: { code: code, scale: scale }
})
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
ZeroCopyIlpPrepare,
createIlpPacketMiddleware
} from './middleware/ilp-packet'
import { TelemetryService } from '../../../../telemetry/service'

// Model classes that represent an Interledger sender, receiver, or
// connector SHOULD implement this ConnectorAccount interface.
Expand Down Expand Up @@ -71,6 +72,7 @@ export interface AccountingService {
export interface RafikiServices {
//router: Router
accounting: AccountingService
telemetry?: TelemetryService
walletAddresses: WalletAddressService
logger: Logger
incomingPayments: IncomingPaymentService
Expand Down Expand Up @@ -159,6 +161,9 @@ export class Rafiki<T = any> {
get walletAddresses(): WalletAddressService {
return config.walletAddresses
},
get telemetry(): TelemetryService | undefined {
return config.telemetry
},

logger
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import assert from 'assert'
import { IlpResponse, OutgoingAccount, ZeroCopyIlpPrepare } from '../..'
import { IncomingAccountFactory, RafikiServicesFactory } from '../../factories'
import { createTelemetryMiddleware } from '../../middleware/telemetry'
import { createILPContext } from '../../utils'

import { IlpFulfill } from 'ilp-packet'
import * as telemetry from '../../../../../../telemetry/transaction-amount'

const incomingAccount = IncomingAccountFactory.build({ id: 'alice' })

assert.ok(incomingAccount.id)
const services = RafikiServicesFactory.build({})

const ctx = createILPContext({
services,
request: {
prepare: {
amount: 100n
} as unknown as ZeroCopyIlpPrepare,
rawPrepare: Buffer.from('')
},
accounts: {
incoming: incomingAccount,
outgoing: { asset: { code: 'USD', scale: 2 } } as OutgoingAccount
},
state: {
unfulfillable: false,
incomingAccount: {
quote: 'exists'
}
},
response: {
fulfill: 'exists' as unknown as IlpFulfill
} as IlpResponse
})

jest.mock('../../../../../../telemetry/transaction-amount')
const middleware = createTelemetryMiddleware()
const next = jest.fn().mockImplementation(() => Promise.resolve())

beforeEach(async () => {
incomingAccount.balance = 100n
incomingAccount.asset.scale = 2
incomingAccount.asset.code = 'USD'
})

describe('Telemetry Middleware', function () {
it('does not gather telemetry if telemetry is not enabled (service is undefined)', async () => {
const collectAmountSpy = jest
.spyOn(telemetry, 'collectTelemetryAmount')
.mockImplementation(() => Promise.resolve())

await middleware(
{ ...ctx, services: { ...ctx.services, telemetry: undefined } },
next
)
expect(collectAmountSpy).not.toHaveBeenCalled()
expect(next).toHaveBeenCalled()
})

it('does not gather telemetry if response.fulfill undefined', async () => {
const collectAmountSpy = jest.spyOn(telemetry, 'collectTelemetryAmount')

await middleware(
{ ...ctx, response: { fulfill: undefined } as IlpResponse },
next
)

expect(collectAmountSpy).not.toHaveBeenCalled()
expect(next).toHaveBeenCalled()
})

it('does not gather telemetry if amount is invalid', async () => {
const collectAmountSpy = jest.spyOn(telemetry, 'collectTelemetryAmount')

await middleware(
{
...ctx,
request: {
...ctx.request,
prepare: { amount: '0' } as ZeroCopyIlpPrepare
}
},
next
)

expect(collectAmountSpy).not.toHaveBeenCalled()
expect(next).toHaveBeenCalled()
})

it('gathers telemetry without blocking middleware chain', async () => {
let nextCalled = false
const next = jest.fn().mockImplementation(() => {
nextCalled = true
return Promise.resolve()
})

const collectAmountSpy = jest
.spyOn(telemetry, 'collectTelemetryAmount')
.mockImplementation(() => {
expect(nextCalled).toBe(true)
return Promise.resolve()
})

await middleware(ctx, next)

expect(collectAmountSpy).toHaveBeenCalled()
expect(next).toHaveBeenCalled()
})
})
7 changes: 7 additions & 0 deletions packages/backend/src/payment-method/ilp/connector/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@ import {
createStreamController
} from './core'

import { TelemetryService } from '../../../telemetry/service'
import { createTelemetryMiddleware } from './core/middleware/telemetry'

interface ServiceDependencies extends BaseService {
redis: Redis
ratesService: RatesService
accountingService: AccountingService
telemetry?: TelemetryService
walletAddressService: WalletAddressService
incomingPaymentService: IncomingPaymentService
peerService: PeerService
Expand All @@ -44,6 +48,7 @@ export async function createConnectorService({
redis,
ratesService,
accountingService,
telemetry,
walletAddressService,
incomingPaymentService,
peerService,
Expand All @@ -57,6 +62,7 @@ export async function createConnectorService({
service: 'ConnectorService'
}),
accounting: accountingService,
telemetry,
walletAddresses: walletAddressService,
incomingPayments: incomingPaymentService,
peers: peerService,
Expand All @@ -76,6 +82,7 @@ export async function createConnectorService({

// Local pay
createBalanceMiddleware(),
createTelemetryMiddleware(),

// Outgoing Rules
createStreamController(),
Expand Down

0 comments on commit b3ebcf3

Please sign in to comment.