diff --git a/docs/transaction-api.md b/docs/transaction-api.md index 760fb2cd62..9cb5db93fd 100644 --- a/docs/transaction-api.md +++ b/docs/transaction-api.md @@ -87,7 +87,7 @@ An expired invoice that has never received money is deleted. Rafiki sends webhook events to notify the wallet of payment lifecycle states that require liquidity to be added or removed. -Webhook event handlers must be idempotent. +Webhook event handlers must be idempotent and return `200` on success. Rafiki will retry unsuccessful webhook requests for up to one day. ### `EventType` @@ -95,39 +95,37 @@ Webhook event handlers must be idempotent. Invoice has expired. -Credit `invoice.received` to the wallet balance for `invoice.accountId` and return `200`. +Credit `invoice.received` to the wallet balance for `invoice.accountId`, and call `Mutation.withdrawEventLiquidity` with the event id. #### `invoice.paid` Invoice has received its specified `amount`. -Credit `invoice.received` to the wallet balance for `invoice.accountId` and return `200`. +Credit `invoice.received` to the wallet balance for `invoice.accountId`, and call `Mutation.withdrawEventLiquidity` with the event id. #### `outgoing_payment.funding` Payment needs liquidity in order to send quoted amount. -To fund the payment, deduct `quote.maxSourceAmount` from the wallet balance for `payment.accountId` and return `200`. - -To cancel the payment, return `403`. +To fund the payment, deduct `quote.maxSourceAmount` from the wallet balance for `payment.accountId` and call `Mutation.depositEventLiquidity` with the event id. #### `outgoing_payment.cancelled` Payment was cancelled. -Credit `payment.balance` to the wallet balance for `payment.accountId` and return `200` or `205` to retry the payment. +Credit `payment.balance` to the wallet balance for `payment.accountId`, and call `Mutation.withdrawEventLiquidity` with the event id. #### `outgoing_payment.completed` Payment completed sending the quoted amount. -Credit `payment.balance` to the wallet balance for `payment.accountId` and return `200`. +Credit `payment.balance` to the wallet balance for `payment.accountId`, and call `Mutation.withdrawEventLiquidity` with the event id. ### Webhook Event | Name | Optional | Type | Description | | :----- | :------- | :------------------------------------------------------------- | :------------------------------------------------ | -| `id` | No | `ID` | Unique ID of the `data` object. | +| `id` | No | `ID` | Unique ID of the webhook event. | | `type` | No | [`EventType`](#eventtype) | Description of the event. | | `data` | No | [`Invoice`](#invoice) or [`OutgoingPayment`](#outgoingpayment) | Object containing data associated with the event. | @@ -187,13 +185,12 @@ The intent must include `invoiceUrl` xor (`paymentPointer` and `amountToSend`). ### `Invoice` -| Name | Optional | Type | Description | -| :------------ | :------- | :-------- | :----------------------------------------------------------------------------------------------------------------------------- | -| `id` | No | `ID` | Unique ID for this invoice, randomly generated by Rafiki. | -| `accountId` | No | `String` | Id of the recipient's Open Payments account. | -| `amount` | No | `UInt64` | The amount that must be paid at the time the invoice is created, in base units of the account asset. | -| `received` | No | `UInt64` | The total amount received, in base units of the account asset. | -| `active` | No | `Boolean` | If `true`, the invoice may receive funds. If `false`, the invoice is either expired or has already received `amount` of funds. | -| `description` | Yes | `String` | Human readable description of the invoice. | -| `createdAt` | No | `String` | | -| `expiresAt` | No | `String` | | +| Name | Optional | Type | Description | +| :------------ | :------- | :------- | :--------------------------------------------------------------------------------------------------- | +| `id` | No | `ID` | Unique ID for this invoice, randomly generated by Rafiki. | +| `accountId` | No | `String` | Id of the recipient's Open Payments account. | +| `amount` | No | `UInt64` | The amount that must be paid at the time the invoice is created, in base units of the account asset. | +| `received` | No | `UInt64` | The total amount received, in base units of the account asset. | +| `description` | Yes | `String` | Human readable description of the invoice. | +| `createdAt` | No | `String` | | +| `expiresAt` | No | `String` | | diff --git a/packages/backend/migrations/20211022210203_create_invoices_table.js b/packages/backend/migrations/20211022210203_create_invoices_table.js index 6b80caeb2b..013683cffc 100644 --- a/packages/backend/migrations/20211022210203_create_invoices_table.js +++ b/packages/backend/migrations/20211022210203_create_invoices_table.js @@ -12,8 +12,6 @@ exports.up = function (knex) { table.timestamp('processAt').nullable() - table.integer('webhookAttempts').notNullable().defaultTo(0) - table.timestamp('createdAt').defaultTo(knex.fn.now()) table.timestamp('updatedAt').defaultTo(knex.fn.now()) diff --git a/packages/backend/migrations/20211026122545_create_outgoing_payments_table.js b/packages/backend/migrations/20211026122545_create_outgoing_payments_table.js index 025a459fff..ec9f5cc2be 100644 --- a/packages/backend/migrations/20211026122545_create_outgoing_payments_table.js +++ b/packages/backend/migrations/20211026122545_create_outgoing_payments_table.js @@ -5,7 +5,6 @@ exports.up = function (knex) { table.string('state').notNullable().index() // PaymentState table.string('error').nullable() table.integer('stateAttempts').notNullable().defaultTo(0) - table.string('webhookId').nullable() table.string('intentPaymentPointer').nullable() table.string('intentInvoiceUrl').nullable() diff --git a/packages/backend/migrations/20220131110501_create_webhook_events_table.js b/packages/backend/migrations/20220131110501_create_webhook_events_table.js new file mode 100644 index 0000000000..54eb5b6486 --- /dev/null +++ b/packages/backend/migrations/20220131110501_create_webhook_events_table.js @@ -0,0 +1,26 @@ +exports.up = function (knex) { + return knex.schema.createTable('webhookEvents', function (table) { + table.uuid('id').notNullable().primary() + + table.string('type').notNullable() + table.json('data').notNullable() + table.integer('attempts').notNullable().defaultTo(0) + table.integer('statusCode').nullable() + + table.uuid('withdrawalAccountId').nullable() + table.uuid('withdrawalAssetId').nullable() + table.foreign('withdrawalAssetId').references('assets.id') + table.bigInteger('withdrawalAmount').nullable() + + table.timestamp('processAt').nullable().defaultTo(knex.fn.now()) + + table.timestamp('createdAt').defaultTo(knex.fn.now()) + table.timestamp('updatedAt').defaultTo(knex.fn.now()) + + table.index('processAt') + }) +} + +exports.down = function (knex) { + return knex.schema.dropTableIfExists('webhookEvents') +} diff --git a/packages/backend/src/accounting/service.test.ts b/packages/backend/src/accounting/service.test.ts index b4edeaf3bf..0a58c75bf8 100644 --- a/packages/backend/src/accounting/service.test.ts +++ b/packages/backend/src/accounting/service.test.ts @@ -34,7 +34,7 @@ describe('Accounting Service', (): void => { let accountingService: AccountingService let accountFactory: AccountFactory let tigerbeetleContainer: StartedTestContainer - const timeout = BigInt(10e9) // 10 seconds + const timeout = BigInt(10_000) // 10 seconds const messageProducer = new GraphileProducer() const mockMessageProducer = { send: jest.fn() @@ -530,7 +530,15 @@ describe('Accounting Service', (): void => { } ) - describe('Create', (): void => { + describe.each` + timeout | description + ${undefined} | ${'single-phase'} + ${timeout} | ${'two-phase'} + `('Create ($description)', ({ timeout }): void => { + beforeEach((): void => { + withdrawal.timeout = timeout + }) + test('A withdrawal can be created', async (): Promise => { await expect( accountingService.createWithdrawal(withdrawal) @@ -540,7 +548,9 @@ describe('Accounting Service', (): void => { ).resolves.toEqual(startingBalance - withdrawal.amount) await expect( accountingService.getSettlementBalance(withdrawal.account.asset.unit) - ).resolves.toEqual(startingBalance) + ).resolves.toEqual( + timeout ? startingBalance : startingBalance - withdrawal.amount + ) }) test('Cannot create withdrawal with invalid id', async (): Promise => { @@ -649,7 +659,7 @@ describe('Accounting Service', (): void => { const expiringWithdrawal = { ...withdrawal, id: uuid(), - timeout: BigInt(1) // nano-second + timeout: BigInt(1) } await expect( accountingService.createWithdrawal(expiringWithdrawal) @@ -716,7 +726,7 @@ describe('Accounting Service', (): void => { const expiringWithdrawal = { ...withdrawal, id: uuid(), - timeout: BigInt(1) // nano-second + timeout: BigInt(1) } await expect( accountingService.createWithdrawal(expiringWithdrawal) diff --git a/packages/backend/src/accounting/service.ts b/packages/backend/src/accounting/service.ts index b7439dfc86..aabf77b97e 100644 --- a/packages/backend/src/accounting/service.ts +++ b/packages/backend/src/accounting/service.ts @@ -1,3 +1,4 @@ +import assert from 'assert' import { Client, CreateAccountError as CreateAccountErrorCode @@ -43,6 +44,7 @@ export interface LiquidityAccount { id: string unit: number } + onCredit?: (balance: bigint) => Promise } export interface Deposit { @@ -52,7 +54,7 @@ export interface Deposit { } export interface Withdrawal extends Deposit { - timeout: bigint + timeout?: bigint } export interface TransferOptions { @@ -60,7 +62,7 @@ export interface TransferOptions { destinationAccount: LiquidityAccount sourceAmount: bigint destinationAmount?: bigint - timeout: bigint // nano-seconds + timeout: bigint } export interface Transaction { @@ -219,37 +221,49 @@ export async function createTransfer( } const transfers: Required[] = [] - // Same asset - if (sourceAccount.asset.unit === destinationAccount.asset.unit) { + const addTransfer = ({ + sourceAccountId, + destinationAccountId, + amount + }: { + sourceAccountId: string + destinationAccountId: string + amount: bigint + }) => { transfers.push({ id: uuid(), + sourceAccountId, + destinationAccountId, + amount, + timeout + }) + } + + // Same asset + if (sourceAccount.asset.unit === destinationAccount.asset.unit) { + addTransfer({ sourceAccountId: sourceAccount.id, destinationAccountId: destinationAccount.id, amount: destinationAmount && destinationAmount < sourceAmount ? destinationAmount - : sourceAmount, - timeout + : sourceAmount }) // Same asset, different amounts if (destinationAmount && sourceAmount !== destinationAmount) { // Send excess source amount to liquidity account if (destinationAmount < sourceAmount) { - transfers.push({ - id: uuid(), + addTransfer({ sourceAccountId: sourceAccount.id, destinationAccountId: sourceAccount.asset.id, - amount: sourceAmount - destinationAmount, - timeout + amount: sourceAmount - destinationAmount }) // Deliver excess destination amount from liquidity account } else { - transfers.push({ - id: uuid(), + addTransfer({ sourceAccountId: destinationAccount.asset.id, destinationAccountId: destinationAccount.id, - amount: destinationAmount - sourceAmount, - timeout + amount: destinationAmount - sourceAmount }) } } @@ -261,22 +275,16 @@ export async function createTransfer( } // Send to source liquidity account // Deliver from destination liquidity account - transfers.push( - { - id: uuid(), - sourceAccountId: sourceAccount.id, - destinationAccountId: sourceAccount.asset.id, - amount: sourceAmount, - timeout - }, - { - id: uuid(), - sourceAccountId: destinationAccount.asset.id, - destinationAccountId: destinationAccount.id, - amount: destinationAmount, - timeout - } - ) + addTransfer({ + sourceAccountId: sourceAccount.id, + destinationAccountId: sourceAccount.asset.id, + amount: sourceAmount + }) + addTransfer({ + sourceAccountId: destinationAccount.asset.id, + destinationAccountId: destinationAccount.id, + amount: destinationAmount + }) } const error = await createTransfers(deps, transfers) if (error) { @@ -308,6 +316,11 @@ export async function createTransfer( if (error) { return error.error } + if (destinationAccount.onCredit) { + const balance = await getAccountBalance(deps, destinationAccount.id) + assert.ok(balance !== undefined) + await destinationAccount.onCredit(balance) + } }, rollback: async (): Promise => { const error = await rollbackTransfers( diff --git a/packages/backend/src/accounting/transfers.ts b/packages/backend/src/accounting/transfers.ts index 006b6e23e0..0d50381cff 100644 --- a/packages/backend/src/accounting/transfers.ts +++ b/packages/backend/src/accounting/transfers.ts @@ -28,7 +28,7 @@ export interface CreateTransferOptions { sourceAccountId: AccountId destinationAccountId: AccountId amount: bigint - timeout?: bigint // nano-seconds + timeout?: bigint } export async function createTransfers( @@ -57,7 +57,7 @@ export async function createTransfers( reserved: TRANSFER_RESERVED, code: 0, flags, - timeout: transfer.timeout || BigInt(0), + timeout: transfer.timeout ? transfer.timeout * BigInt(10e6) : BigInt(0), // ms -> ns timestamp: BigInt(0) }) } diff --git a/packages/backend/src/app.ts b/packages/backend/src/app.ts index 272215b813..d7cedf9deb 100644 --- a/packages/backend/src/app.ts +++ b/packages/backend/src/app.ts @@ -143,6 +143,9 @@ export class App { for (let i = 0; i < this.config.invoiceWorkers; i++) { process.nextTick(() => this.processInvoice()) } + for (let i = 0; i < this.config.webhookWorkers; i++) { + process.nextTick(() => this.processWebhook()) + } } } @@ -288,4 +291,22 @@ export class App { ).unref() }) } + + private async processWebhook(): Promise { + const webhookService = await this.container.use('webhookService') + return webhookService + .processNext() + .catch((err) => { + this.logger.warn({ error: err.message }, 'processWebhook error') + return true + }) + .then((hasMoreWork) => { + if (hasMoreWork) process.nextTick(() => this.processWebhook()) + else + setTimeout( + () => this.processWebhook(), + this.config.webhookWorkerIdle + ).unref() + }) + } } diff --git a/packages/backend/src/config/app.ts b/packages/backend/src/config/app.ts index 41da1d1554..6eaac02db0 100644 --- a/packages/backend/src/config/app.ts +++ b/packages/backend/src/config/app.ts @@ -64,6 +64,8 @@ export const Config = { invoiceWorkers: envInt('INVOICE_WORKERS', 1), invoiceWorkerIdle: envInt('INVOICE_WORKER_IDLE', 200), // milliseconds + webhookWorkers: envInt('WEBHOOK_WORKERS', 1), + webhookWorkerIdle: envInt('WEBHOOK_WORKER_IDLE', 200), // milliseconds webhookUrl: envString('WEBHOOK_URL', 'http://127.0.0.1:4001/webhook'), webhookSecret: process.env.WEBHOOK_SECRET, // optional webhookTimeout: envInt('WEBHOOK_TIMEOUT', 2000), // milliseconds diff --git a/packages/backend/src/connector/core/factories/account.ts b/packages/backend/src/connector/core/factories/account.ts index b98d9bfa0b..1211ad8e32 100644 --- a/packages/backend/src/connector/core/factories/account.ts +++ b/packages/backend/src/connector/core/factories/account.ts @@ -14,7 +14,11 @@ const accountAttrs = { id: Faker.datatype.uuid(), code: assetCode, scale: assetScale, - unit: Faker.datatype.number() + unit: Faker.datatype.number(), + asset: { + id: Faker.datatype.uuid(), + unit: Faker.datatype.number() + } }, balance: 0n } diff --git a/packages/backend/src/connector/core/middleware/balance.ts b/packages/backend/src/connector/core/middleware/balance.ts index 0fee5bbda0..f5c7fc1715 100644 --- a/packages/backend/src/connector/core/middleware/balance.ts +++ b/packages/backend/src/connector/core/middleware/balance.ts @@ -66,8 +66,6 @@ export function createBalanceMiddleware(): ILPMiddleware { if (response.fulfill) { await trxOrError.commit() - // TODO: move handlePayment inside accountingServices's trxOrError.commit() - await services.invoices.handlePayment(accounts.outgoing.id) } else { await trxOrError.rollback() } diff --git a/packages/backend/src/connector/core/rafiki.ts b/packages/backend/src/connector/core/rafiki.ts index 5ff08b9dc2..718ebab7bc 100644 --- a/packages/backend/src/connector/core/rafiki.ts +++ b/packages/backend/src/connector/core/rafiki.ts @@ -50,7 +50,7 @@ export interface TransferOptions { destinationAccount: OutgoingAccount sourceAmount: bigint destinationAmount?: bigint - timeout: bigint // nano-seconds + timeout: bigint } export interface AccountingService { diff --git a/packages/backend/src/connector/core/test/middleware/balance-middleware.test.ts b/packages/backend/src/connector/core/test/middleware/balance-middleware.test.ts index 06da2dc5a6..96c3f6ad7c 100644 --- a/packages/backend/src/connector/core/test/middleware/balance-middleware.test.ts +++ b/packages/backend/src/connector/core/test/middleware/balance-middleware.test.ts @@ -30,7 +30,7 @@ const ctx = createILPContext({ }, services }) -const { accounting, invoices, rates } = services +const { accounting, rates } = services beforeEach(async () => { ctx.response.fulfill = undefined @@ -53,12 +53,10 @@ describe('Balance Middleware', function () { const next = jest.fn().mockImplementation(() => { ctx.response.fulfill = fulfill }) - const handlePaymentSpy = jest.spyOn(invoices, 'handlePayment') await expect(middleware(ctx, next)).resolves.toBeUndefined() expect(next).toHaveBeenCalledTimes(1) - expect(handlePaymentSpy).toHaveBeenCalledTimes(1) const aliceBalance = await accounting.getBalance(aliceAccount.id) expect(aliceBalance).toEqual(BigInt(0)) @@ -102,7 +100,6 @@ describe('Balance Middleware', function () { }) const createTransferSpy = jest.spyOn(accounting, 'createTransfer') - const handlePaymentSpy = jest.spyOn(invoices, 'handlePayment') if (error) { await expect(middleware(ctx, next)).rejects.toBeInstanceOf(error) @@ -113,7 +110,6 @@ describe('Balance Middleware', function () { } expect(createTransferSpy).toHaveBeenCalledTimes(createTransfer ? 1 : 0) - expect(handlePaymentSpy).not.toHaveBeenCalled() const aliceBalance = await accounting.getBalance(aliceAccount.id) expect(aliceBalance).toEqual(BigInt(100)) diff --git a/packages/backend/src/graphql/generated/graphql.schema.json b/packages/backend/src/graphql/generated/graphql.schema.json index 3951527a5f..35cd1107a1 100644 --- a/packages/backend/src/graphql/generated/graphql.schema.json +++ b/packages/backend/src/graphql/generated/graphql.schema.json @@ -2427,6 +2427,64 @@ "isDeprecated": false, "deprecationReason": null }, + { + "name": "depositEventLiquidity", + "description": "Deposit webhook event liquidity", + "args": [ + { + "name": "eventId", + "description": null, + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "SCALAR", + "name": "String", + "ofType": null + } + }, + "defaultValue": null, + "isDeprecated": false, + "deprecationReason": null + } + ], + "type": { + "kind": "OBJECT", + "name": "LiquidityMutationResponse", + "ofType": null + }, + "isDeprecated": false, + "deprecationReason": null + }, + { + "name": "withdrawEventLiquidity", + "description": "Withdraw webhook event liquidity", + "args": [ + { + "name": "eventId", + "description": null, + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "SCALAR", + "name": "String", + "ofType": null + } + }, + "defaultValue": null, + "isDeprecated": false, + "deprecationReason": null + } + ], + "type": { + "kind": "OBJECT", + "name": "LiquidityMutationResponse", + "ofType": null + }, + "isDeprecated": false, + "deprecationReason": null + }, { "name": "createApiKey", "description": "Create API Key", diff --git a/packages/backend/src/graphql/generated/graphql.ts b/packages/backend/src/graphql/generated/graphql.ts index e6d7d16456..28af5a91fe 100644 --- a/packages/backend/src/graphql/generated/graphql.ts +++ b/packages/backend/src/graphql/generated/graphql.ts @@ -282,6 +282,10 @@ export type Mutation = { finalizeLiquidityWithdrawal?: Maybe; /** Rollback liquidity withdrawal */ rollbackLiquidityWithdrawal?: Maybe; + /** Deposit webhook event liquidity */ + depositEventLiquidity?: Maybe; + /** Withdraw webhook event liquidity */ + withdrawEventLiquidity?: Maybe; /** Create API Key */ createApiKey?: Maybe; /** Delete all API Keys */ @@ -365,6 +369,16 @@ export type MutationRollbackLiquidityWithdrawalArgs = { }; +export type MutationDepositEventLiquidityArgs = { + eventId: Scalars['String']; +}; + + +export type MutationWithdrawEventLiquidityArgs = { + eventId: Scalars['String']; +}; + + export type MutationCreateApiKeyArgs = { input: CreateApiKeyInput; }; @@ -963,6 +977,8 @@ export type MutationResolvers, ParentType, ContextType, RequireFields>; finalizeLiquidityWithdrawal?: Resolver, ParentType, ContextType, RequireFields>; rollbackLiquidityWithdrawal?: Resolver, ParentType, ContextType, RequireFields>; + depositEventLiquidity?: Resolver, ParentType, ContextType, RequireFields>; + withdrawEventLiquidity?: Resolver, ParentType, ContextType, RequireFields>; createApiKey?: Resolver, ParentType, ContextType, RequireFields>; deleteAllApiKeys?: Resolver, ParentType, ContextType, RequireFields>; redeemApiKey?: Resolver, ParentType, ContextType, RequireFields>; diff --git a/packages/backend/src/graphql/resolvers/index.ts b/packages/backend/src/graphql/resolvers/index.ts index 38f75d8db3..0ac2afe14f 100644 --- a/packages/backend/src/graphql/resolvers/index.ts +++ b/packages/backend/src/graphql/resolvers/index.ts @@ -24,7 +24,9 @@ import { createPeerLiquidityWithdrawal, createAccountWithdrawal, finalizeLiquidityWithdrawal, - rollbackLiquidityWithdrawal + rollbackLiquidityWithdrawal, + depositEventLiquidity, + withdrawEventLiquidity } from './liquidity' import { GraphQLBigInt } from '../scalars' import { refreshSession, revokeSession } from './session' @@ -70,6 +72,8 @@ export const resolvers: Resolvers = { createPeerLiquidityWithdrawal: createPeerLiquidityWithdrawal, createAccountWithdrawal, finalizeLiquidityWithdrawal: finalizeLiquidityWithdrawal, - rollbackLiquidityWithdrawal: rollbackLiquidityWithdrawal + rollbackLiquidityWithdrawal: rollbackLiquidityWithdrawal, + depositEventLiquidity, + withdrawEventLiquidity } } diff --git a/packages/backend/src/graphql/resolvers/liquidity.test.ts b/packages/backend/src/graphql/resolvers/liquidity.test.ts index 167b77ea2b..e8e5052de3 100644 --- a/packages/backend/src/graphql/resolvers/liquidity.test.ts +++ b/packages/backend/src/graphql/resolvers/liquidity.test.ts @@ -1,34 +1,50 @@ +import assert from 'assert' import { gql } from 'apollo-server-koa' import Knex from 'knex' import { v4 as uuid } from 'uuid' +import * as Pay from '@interledger/pay' +import { DepositEventType } from './liquidity' import { createTestApp, TestContainer } from '../../tests/app' import { IocContract } from '@adonisjs/fold' import { AppServices } from '../../app' import { initIocContainer } from '../..' import { Config } from '../../config/app' -import { AccountingService } from '../../accounting/service' +import { + AccountingService, + LiquidityAccount, + Withdrawal +} from '../../accounting/service' import { Asset } from '../../asset/model' import { AssetService } from '../../asset/service' import { Account } from '../../open_payments/account/model' +import { Invoice, InvoiceEventType } from '../../open_payments/invoice/model' +import { + OutgoingPayment, + PaymentState, + PaymentEvent, + PaymentWithdrawType, + isPaymentEventType +} from '../../outgoing_payment/model' import { Peer } from '../../peer/model' import { randomAsset } from '../../tests/asset' import { PeerFactory } from '../../tests/peerFactory' import { truncateTables } from '../../tests/tableManager' +import { WebhookEvent } from '../../webhook/model' import { LiquidityError, LiquidityMutationResponse, AccountWithdrawalMutationResponse } from '../generated/graphql' -describe('Withdrawal Resolvers', (): void => { +describe('Liquidity Resolvers', (): void => { let deps: IocContract let appContainer: TestContainer let accountingService: AccountingService let assetService: AssetService let peerFactory: PeerFactory let knex: Knex - const timeout = BigInt(10e9) // 10 seconds + const timeout = BigInt(10_000) // 10 seconds beforeAll( async (): Promise => { @@ -1561,4 +1577,351 @@ describe('Withdrawal Resolvers', (): void => { }) } ) + + { + let invoice: Invoice + let payment: OutgoingPayment + + beforeEach( + async (): Promise => { + const accountService = await deps.use('accountService') + const { id: accountId } = await accountService.create({ + asset: randomAsset() + }) + const invoiceService = await deps.use('invoiceService') + invoice = await invoiceService.create({ + accountId, + amount: BigInt(56), + expiresAt: new Date(Date.now() + 60 * 1000), + description: 'description!' + }) + const outgoingPaymentService = await deps.use('outgoingPaymentService') + const config = await deps.use('config') + const invoiceUrl = `${config.publicHost}/invoices/${invoice.id}` + // create and then patch quote + payment = await outgoingPaymentService.create({ + accountId, + invoiceUrl, + autoApprove: false + }) + await payment.$query(knex).patch({ + state: PaymentState.Funding, + quote: { + timestamp: new Date(), + activationDeadline: new Date(Date.now() + 1000), + targetType: Pay.PaymentType.FixedSend, + minDeliveryAmount: BigInt(123), + maxSourceAmount: BigInt(456), + maxPacketAmount: BigInt(789), + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + minExchangeRate: Pay.Ratio.from(1.23)!, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + lowExchangeRateEstimate: Pay.Ratio.from(1.2)!, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + highExchangeRateEstimate: Pay.Ratio.from(2.3)!, + amountSent: BigInt(0) + } + }) + await expect(accountingService.getBalance(payment.id)).resolves.toEqual( + BigInt(0) + ) + } + ) + + describe('depositEventLiquidity', (): void => { + describe.each(Object.values(DepositEventType).map((type) => [type]))( + '%s', + (type): void => { + let eventId: string + + beforeEach( + async (): Promise => { + eventId = uuid() + await PaymentEvent.query(knex).insertAndFetch({ + id: eventId, + type, + data: payment.toData({ + amountSent: BigInt(0), + balance: BigInt(0) + }) + }) + } + ) + + test('Can deposit account liquidity', async (): Promise => { + const depositSpy = jest.spyOn(accountingService, 'createDeposit') + const response = await appContainer.apolloClient + .mutate({ + mutation: gql` + mutation DepositLiquidity($eventId: String!) { + depositEventLiquidity(eventId: $eventId) { + code + success + message + error + } + } + `, + variables: { + eventId + } + }) + .then( + (query): LiquidityMutationResponse => { + if (query.data) { + return query.data.depositEventLiquidity + } else { + throw new Error('Data was empty') + } + } + ) + + expect(response.success).toBe(true) + expect(response.code).toEqual('200') + expect(response.error).toBeNull() + assert.ok(payment.quote) + await expect(depositSpy).toHaveBeenCalledWith({ + id: eventId, + account: expect.any(OutgoingPayment), + amount: payment.quote.maxSourceAmount + }) + await expect( + accountingService.getBalance(payment.id) + ).resolves.toEqual(payment.quote.maxSourceAmount) + }) + + test("Can't deposit for non-existent webhook event id", async (): Promise => { + const response = await appContainer.apolloClient + .mutate({ + mutation: gql` + mutation DepositLiquidity($eventId: String!) { + depositEventLiquidity(eventId: $eventId) { + code + success + message + error + } + } + `, + variables: { + eventId: uuid() + } + }) + .then( + (query): LiquidityMutationResponse => { + if (query.data) { + return query.data.depositEventLiquidity + } else { + throw new Error('Data was empty') + } + } + ) + + expect(response.success).toBe(false) + expect(response.code).toEqual('400') + expect(response.message).toEqual('Invalid id') + expect(response.error).toEqual(LiquidityError.InvalidId) + }) + + test('Returns an error for existing transfer', async (): Promise => { + await expect( + accountingService.createDeposit({ + id: eventId, + account: invoice, + amount: BigInt(100) + }) + ).resolves.toBeUndefined() + const response = await appContainer.apolloClient + .mutate({ + mutation: gql` + mutation DepositLiquidity($eventId: String!) { + depositEventLiquidity(eventId: $eventId) { + code + success + message + error + } + } + `, + variables: { + eventId + } + }) + .then( + (query): LiquidityMutationResponse => { + if (query.data) { + return query.data.depositEventLiquidity + } else { + throw new Error('Data was empty') + } + } + ) + + expect(response.success).toBe(false) + expect(response.code).toEqual('409') + expect(response.message).toEqual('Transfer exists') + expect(response.error).toEqual(LiquidityError.TransferExists) + }) + } + ) + }) + + const WithdrawEventType = { ...InvoiceEventType, ...PaymentWithdrawType } + type WithdrawEventType = InvoiceEventType | PaymentWithdrawType + + describe('withdrawEventLiquidity', (): void => { + describe.each(Object.values(WithdrawEventType).map((type) => [type]))( + '%s', + (type): void => { + let eventId: string + let withdrawal: Withdrawal + + beforeEach( + async (): Promise => { + eventId = uuid() + const amount = BigInt(10) + let account: LiquidityAccount + let data: Record + if (isPaymentEventType(type)) { + account = payment + data = payment.toData({ + amountSent: BigInt(0), + balance: amount + }) + } else { + account = invoice + data = invoice.toData(amount) + } + await WebhookEvent.query(knex).insertAndFetch({ + id: eventId, + type, + data, + withdrawal: { + accountId: account.id, + assetId: account.asset.id, + amount + } + }) + await expect( + accountingService.createDeposit({ + id: uuid(), + account, + amount + }) + ).resolves.toBeUndefined() + await expect( + accountingService.getBalance(account.id) + ).resolves.toEqual(amount) + withdrawal = { + id: eventId, + account, + amount + } + } + ) + + test('Can withdraw account liquidity', async (): Promise => { + const response = await appContainer.apolloClient + .mutate({ + mutation: gql` + mutation WithdrawLiquidity($eventId: String!) { + withdrawEventLiquidity(eventId: $eventId) { + code + success + message + error + } + } + `, + variables: { + eventId + } + }) + .then( + (query): LiquidityMutationResponse => { + if (query.data) { + return query.data.withdrawEventLiquidity + } else { + throw new Error('Data was empty') + } + } + ) + + expect(response.success).toBe(true) + expect(response.code).toEqual('200') + expect(response.error).toBeNull() + }) + + test('Returns error for non-existent webhook event id', async (): Promise => { + const response = await appContainer.apolloClient + .mutate({ + mutation: gql` + mutation WithdrawLiquidity($eventId: String!) { + withdrawEventLiquidity(eventId: $eventId) { + code + success + message + error + } + } + `, + variables: { + eventId: uuid() + } + }) + .then( + (query): LiquidityMutationResponse => { + if (query.data) { + return query.data.withdrawEventLiquidity + } else { + throw new Error('Data was empty') + } + } + ) + + expect(response.success).toBe(false) + expect(response.code).toEqual('400') + expect(response.message).toEqual('Invalid id') + expect(response.error).toEqual(LiquidityError.InvalidId) + }) + + test('Returns error for already completed withdrawal', async (): Promise => { + await expect( + accountingService.createWithdrawal(withdrawal) + ).resolves.toBeUndefined() + const response = await appContainer.apolloClient + .mutate({ + mutation: gql` + mutation WithdrawLiquidity($eventId: String!) { + withdrawEventLiquidity(eventId: $eventId) { + code + success + message + error + } + } + `, + variables: { + eventId + } + }) + .then( + (query): LiquidityMutationResponse => { + if (query.data) { + return query.data.withdrawEventLiquidity + } else { + throw new Error('Data was empty') + } + } + ) + + expect(response.success).toBe(false) + expect(response.code).toEqual('403') + expect(response.message).toEqual('Insufficient balance') + expect(response.error).toEqual(LiquidityError.InsufficientBalance) + }) + } + ) + }) + } }) diff --git a/packages/backend/src/graphql/resolvers/liquidity.ts b/packages/backend/src/graphql/resolvers/liquidity.ts index 78821b12d3..ec0f888845 100644 --- a/packages/backend/src/graphql/resolvers/liquidity.ts +++ b/packages/backend/src/graphql/resolvers/liquidity.ts @@ -1,3 +1,4 @@ +import assert from 'assert' import { ResolversTypes, MutationResolvers, @@ -5,8 +6,12 @@ import { LiquidityMutationResponse, AccountWithdrawalMutationResponse } from '../generated/graphql' -import { TransferError } from '../../accounting/errors' import { ApolloContext } from '../../app' +import { FundingError, isFundingError } from '../../outgoing_payment/errors' +import { + isPaymentEvent, + PaymentDepositType +} from '../../outgoing_payment/model' export const addPeerLiquidity: MutationResolvers['addPeerLiquidity'] = async ( parent, @@ -278,11 +283,110 @@ export const rollbackLiquidityWithdrawal: MutationResolvers['roll } } +export const DepositEventType = PaymentDepositType +export type DepositEventType = PaymentDepositType + +// eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/explicit-module-boundary-types +const isDepositEventType = (o: any): o is DepositEventType => + Object.values(DepositEventType).includes(o) + +export const depositEventLiquidity: MutationResolvers['depositEventLiquidity'] = async ( + parent, + args, + ctx +): ResolversTypes['LiquidityMutationResponse'] => { + try { + const webhookService = await ctx.container.use('webhookService') + const event = await webhookService.getEvent(args.eventId) + if (!event || !isPaymentEvent(event) || !isDepositEventType(event.type)) { + return responses[LiquidityError.InvalidId] + } + assert.ok(event.data.payment?.quote?.maxSourceAmount) + const outgoingPaymentService = await ctx.container.use( + 'outgoingPaymentService' + ) + const paymentOrErr = await outgoingPaymentService.fund({ + id: event.data.payment.id, + amount: BigInt(event.data.payment.quote.maxSourceAmount), + transferId: event.id + }) + if (isFundingError(paymentOrErr)) { + return errorToResponse(paymentOrErr) + } + return { + code: '200', + success: true, + message: 'Deposited liquidity' + } + } catch (error) { + ctx.logger.error( + { + eventId: args.eventId, + error + }, + 'error depositing liquidity' + ) + return { + code: '400', + message: 'Error trying to deposit liquidity', + success: false + } + } +} + +export const withdrawEventLiquidity: MutationResolvers['withdrawEventLiquidity'] = async ( + parent, + args, + ctx +): ResolversTypes['LiquidityMutationResponse'] => { + try { + const webhookService = await ctx.container.use('webhookService') + const event = await webhookService.getEvent(args.eventId) + if (!event || !event.withdrawal) { + return responses[LiquidityError.InvalidId] + } + const assetService = await ctx.container.use('assetService') + const asset = await assetService.getById(event.withdrawal.assetId) + assert.ok(asset) + const accountingService = await ctx.container.use('accountingService') + const error = await accountingService.createWithdrawal({ + id: event.id, + account: { + id: event.withdrawal.accountId, + asset + }, + amount: event.withdrawal.amount + }) + if (error) { + return errorToResponse(error) + } + // TODO: check for and handle leftover invoice or payment balance + return { + code: '200', + success: true, + message: 'Withdrew liquidity' + } + } catch (error) { + ctx.logger.error( + { + eventId: args.eventId, + error + }, + 'error withdrawing liquidity' + ) + return { + code: '400', + message: 'Error trying to withdraw liquidity', + success: false + } + } +} + // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/explicit-module-boundary-types const isLiquidityError = (o: any): o is LiquidityError => Object.values(LiquidityError).includes(o) -const errorToResponse = (error: TransferError): LiquidityMutationResponse => { +const errorToResponse = (error: FundingError): LiquidityMutationResponse => { if (!isLiquidityError(error)) { throw new Error(error) } diff --git a/packages/backend/src/graphql/schema.graphql b/packages/backend/src/graphql/schema.graphql index 6f557fe573..93d2e128a7 100644 --- a/packages/backend/src/graphql/schema.graphql +++ b/packages/backend/src/graphql/schema.graphql @@ -77,6 +77,12 @@ type Mutation { withdrawalId: String! ): LiquidityMutationResponse + "Deposit webhook event liquidity" + depositEventLiquidity(eventId: String!): LiquidityMutationResponse + + "Withdraw webhook event liquidity" + withdrawEventLiquidity(eventId: String!): LiquidityMutationResponse + "Create API Key" createApiKey(input: CreateApiKeyInput!): CreateApiKeyMutationResponse diff --git a/packages/backend/src/index.ts b/packages/backend/src/index.ts index d17ab4be14..7b3813b4ff 100644 --- a/packages/backend/src/index.ts +++ b/packages/backend/src/index.ts @@ -177,6 +177,7 @@ export function initIocContainer( container.singleton('webhookService', async (deps) => { return createWebhookService({ config: await deps.use('config'), + knex: await deps.use('knex'), logger: await deps.use('logger') }) }) @@ -184,8 +185,7 @@ export function initIocContainer( return await createInvoiceService({ logger: await deps.use('logger'), knex: await deps.use('knex'), - accountingService: await deps.use('accountingService'), - webhookService: await deps.use('webhookService') + accountingService: await deps.use('accountingService') }) }) container.singleton('invoiceRoutes', async (deps) => { @@ -237,8 +237,7 @@ export function initIocContainer( accountingService: await deps.use('accountingService'), makeIlpPlugin: await deps.use('makeIlpPlugin'), accountService: await deps.use('accountService'), - ratesService: await deps.use('ratesService'), - webhookService: await deps.use('webhookService') + ratesService: await deps.use('ratesService') }) }) diff --git a/packages/backend/src/open_payments/invoice/model.ts b/packages/backend/src/open_payments/invoice/model.ts index 7c31a2b33d..5225bdc7d2 100644 --- a/packages/backend/src/open_payments/invoice/model.ts +++ b/packages/backend/src/open_payments/invoice/model.ts @@ -4,6 +4,30 @@ import { Asset } from '../../asset/model' import { LiquidityAccount } from '../../accounting/service' import { ConnectorAccount } from '../../connector/core/rafiki' import { BaseModel } from '../../shared/baseModel' +import { WebhookEvent } from '../../webhook/model' + +export enum InvoiceEventType { + InvoiceExpired = 'invoice.expired', + InvoicePaid = 'invoice.paid' +} + +export type InvoiceData = { + invoice: { + id: string + accountId: string + description?: string + createdAt: string + expiresAt: string + amount: string + received: string + } + payment?: never +} + +export class InvoiceEvent extends WebhookEvent { + public type!: InvoiceEventType + public data!: InvoiceData +} export class Invoice extends BaseModel @@ -33,9 +57,40 @@ export class Invoice public processAt!: Date | null - public webhookAttempts!: number - public get asset(): Asset { return this.account.asset } + + public async onCredit(balance: bigint): Promise { + if (this.amount <= balance) { + const invoice = await this.$query() + .patchAndFetch({ + active: false, + // Add 30 seconds to allow a prepared (but not yet fulfilled/rejected) packet to finish before sending webhook event. + processAt: new Date(Date.now() + 30_000) + }) + .where({ + id: this.id, + active: true + }) + if (invoice) { + return invoice + } + } + return this + } + + public toData(amountReceived: bigint): InvoiceData { + return { + invoice: { + id: this.id, + accountId: this.accountId, + amount: this.amount.toString(), + description: this.description, + expiresAt: this.expiresAt.toISOString(), + createdAt: new Date(+this.createdAt).toISOString(), + received: amountReceived.toString() + } + } + } } diff --git a/packages/backend/src/open_payments/invoice/service.test.ts b/packages/backend/src/open_payments/invoice/service.test.ts index 83a5693dd7..b57b3dffc4 100644 --- a/packages/backend/src/open_payments/invoice/service.test.ts +++ b/packages/backend/src/open_payments/invoice/service.test.ts @@ -1,14 +1,12 @@ import assert from 'assert' import Knex from 'knex' import { WorkerUtils, makeWorkerUtils } from 'graphile-worker' -import nock from 'nock' -import { URL } from 'url' import { v4 as uuid } from 'uuid' import { InvoiceService } from './service' import { AccountingService } from '../../accounting/service' import { createTestApp, TestContainer } from '../../tests/app' -import { Invoice } from './model' +import { Invoice, InvoiceEvent, InvoiceEventType } from './model' import { resetGraphileDb } from '../../tests/graphileDb' import { GraphileProducer } from '../../messaging/graphileProducer' import { Config } from '../../config/app' @@ -17,7 +15,6 @@ import { initIocContainer } from '../../' import { AppServices } from '../../app' import { randomAsset } from '../../tests/asset' import { truncateTables } from '../../tests/tableManager' -import { EventType } from '../../webhook/service' describe('Invoice Service', (): void => { let deps: IocContract @@ -31,22 +28,6 @@ describe('Invoice Service', (): void => { const mockMessageProducer = { send: jest.fn() } - const webhookUrl = new URL(Config.webhookUrl) - - function mockWebhookServer( - invoiceId: string, - type: EventType, - status = 200 - ): nock.Scope { - return nock(webhookUrl.origin) - .post(webhookUrl.pathname, (body): boolean => { - expect(body.type).toEqual(type) - expect(body.data.invoice.id).toEqual(invoiceId) - expect(body.data.invoice.active).toEqual(false) - return true - }) - .reply(status) - } beforeAll( async (): Promise => { @@ -98,7 +79,7 @@ describe('Invoice Service', (): void => { expect(invoice).toMatchObject({ id: invoice.id, account: await accountService.get(accountId), - processAt: new Date(invoice.expiresAt.getTime() + 30_000) + processAt: new Date(invoice.expiresAt.getTime()) }) const retrievedInvoice = await invoiceService.get(invoice.id) if (!retrievedInvoice) throw new Error('invoice not found') @@ -133,7 +114,7 @@ describe('Invoice Service', (): void => { }) }) - describe('handlePayment', (): void => { + describe('onCredit', (): void => { let invoice: Invoice beforeEach( @@ -149,35 +130,26 @@ describe('Invoice Service', (): void => { test('Does not deactivate a partially paid invoice', async (): Promise => { await expect( - accountingService.createDeposit({ - id: uuid(), - account: invoice, - amount: invoice.amount - BigInt(1) - }) - ).resolves.toBeUndefined() - - await invoiceService.handlePayment(invoice.id) + invoice.onCredit(invoice.amount - BigInt(1)) + ).resolves.toEqual(invoice) await expect(invoiceService.get(invoice.id)).resolves.toMatchObject({ - active: true + active: true, + processAt: new Date(invoice.expiresAt.getTime()) }) }) test('Deactivates fully paid invoice', async (): Promise => { - await expect( - accountingService.createDeposit({ - id: uuid(), - account: invoice, - amount: invoice.amount - }) - ).resolves.toBeUndefined() - const now = new Date() jest.useFakeTimers('modern') jest.setSystemTime(now) - await invoiceService.handlePayment(invoice.id) + await expect(invoice.onCredit(invoice.amount)).resolves.toMatchObject({ + id: invoice.id, + active: false, + processAt: new Date(now.getTime() + 30_000) + }) await expect(invoiceService.get(invoice.id)).resolves.toMatchObject({ active: false, - processAt: now + processAt: new Date(now.getTime() + 30_000) }) }) }) @@ -218,7 +190,7 @@ describe('Invoice Service', (): void => { await expect(invoiceService.processNext()).resolves.toBe(invoice.id) await expect(invoiceService.get(invoice.id)).resolves.toMatchObject({ active: false, - processAt: now + processAt: new Date(now.getTime() + 30_000) }) }) @@ -235,12 +207,12 @@ describe('Invoice Service', (): void => { }) describe.each` - event | expiresAt | amountReceived - ${EventType.InvoiceExpired} | ${-40_000} | ${BigInt(1)} - ${EventType.InvoicePaid} | ${30_000} | ${BigInt(123)} + eventType | expiresAt | amountReceived + ${InvoiceEventType.InvoiceExpired} | ${-40_000} | ${BigInt(1)} + ${InvoiceEventType.InvoicePaid} | ${30_000} | ${BigInt(123)} `( - 'handleDeactivated ($event)', - ({ event, expiresAt, amountReceived }): void => { + 'handleDeactivated ($eventType)', + ({ eventType, expiresAt, amountReceived }): void => { let invoice: Invoice beforeEach( @@ -258,12 +230,12 @@ describe('Invoice Service', (): void => { amount: amountReceived }) ).resolves.toBeUndefined() - if (event === EventType.InvoiceExpired) { + if (eventType === InvoiceEventType.InvoiceExpired) { await expect(invoiceService.processNext()).resolves.toBe( invoice.id ) } else { - await invoiceService.handlePayment(invoice.id) + await invoice.onCredit(invoice.amount) } invoice = (await invoiceService.get(invoice.id)) as Invoice expect(invoice.active).toBe(false) @@ -277,48 +249,26 @@ describe('Invoice Service', (): void => { } ) - test('Withdraws invoice liquidity', async (): Promise => { - const scope = mockWebhookServer(invoice.id, event) - await expect(invoiceService.processNext()).resolves.toBe(invoice.id) - expect(scope.isDone()).toBe(true) - await expect(invoiceService.get(invoice.id)).resolves.toMatchObject({ - processAt: null, - webhookAttempts: 0 - }) + test('Creates webhook event', async (): Promise => { await expect( - accountingService.getBalance(invoice.id) - ).resolves.toEqual(BigInt(0)) - }) - - test("Doesn't withdraw on webhook error", async (): Promise => { + InvoiceEvent.query(knex).where({ + type: eventType + }) + ).resolves.toHaveLength(0) assert.ok(invoice.processAt) - const scope = mockWebhookServer(invoice.id, event, 504) + jest.useFakeTimers('modern') + jest.setSystemTime(invoice.processAt) await expect(invoiceService.processNext()).resolves.toBe(invoice.id) - expect(scope.isDone()).toBe(true) - await expect(invoiceService.get(invoice.id)).resolves.toMatchObject({ - processAt: new Date(invoice.processAt.getTime() + 10_000), - webhookAttempts: 1 - }) await expect( - accountingService.getBalance(invoice.id) - ).resolves.toEqual(amountReceived) - }) - - test("Doesn't withdraw on webhook timeout", async (): Promise => { - assert.ok(invoice.processAt) - const scope = nock(webhookUrl.origin) - .post(webhookUrl.pathname) - .delayConnection(Config.webhookTimeout + 1) - .reply(200) - await expect(invoiceService.processNext()).resolves.toBe(invoice.id) - expect(scope.isDone()).toBe(true) + InvoiceEvent.query(knex).where({ + type: eventType, + withdrawalAccountId: invoice.id, + withdrawalAmount: amountReceived + }) + ).resolves.toHaveLength(1) await expect(invoiceService.get(invoice.id)).resolves.toMatchObject({ - processAt: new Date(invoice.processAt.getTime() + 10_000), - webhookAttempts: 1 + processAt: null }) - await expect( - accountingService.getBalance(invoice.id) - ).resolves.toEqual(amountReceived) }) } ) diff --git a/packages/backend/src/open_payments/invoice/service.ts b/packages/backend/src/open_payments/invoice/service.ts index 5edd26d098..2b2c82f186 100644 --- a/packages/backend/src/open_payments/invoice/service.ts +++ b/packages/backend/src/open_payments/invoice/service.ts @@ -1,8 +1,7 @@ -import { Invoice } from './model' +import { Invoice, InvoiceEvent, InvoiceEventType } from './model' import { AccountingService } from '../../accounting/service' import { BaseService } from '../../shared/baseService' import { Pagination } from '../../shared/pagination' -import { EventType, WebhookService } from '../../webhook/service' import assert from 'assert' import { Transaction } from 'knex' import { ForeignKeyViolationError, TransactionOrKnex } from 'objection' @@ -27,14 +26,12 @@ export interface InvoiceService { accountId: string, pagination?: Pagination ): Promise - handlePayment(invoiceId: string): Promise processNext(): Promise } interface ServiceDependencies extends BaseService { knex: TransactionOrKnex accountingService: AccountingService - webhookService: WebhookService } export async function createInvoiceService( @@ -52,7 +49,6 @@ export async function createInvoiceService( create: (options, trx) => createInvoice(deps, options, trx), getAccountInvoicesPage: (accountId, pagination) => getAccountInvoicesPage(deps, accountId, pagination), - handlePayment: (invoiceId) => handleInvoicePayment(deps, invoiceId), processNext: () => processNextInvoice(deps) } } @@ -79,8 +75,7 @@ async function createInvoice( expiresAt, amount, active: true, - // Add 30 seconds to allow a prepared (but not yet fulfilled/rejected) packet to finish before being deactivated. - processAt: new Date(expiresAt.getTime() + 30_000) + processAt: new Date(expiresAt.getTime()) }) .withGraphFetched('account.asset') @@ -103,25 +98,6 @@ async function createInvoice( } } -async function handleInvoicePayment( - deps: ServiceDependencies, - invoiceId: string -): Promise { - const amountReceived = await deps.accountingService.getTotalReceived( - invoiceId - ) - if (!amountReceived) { - return - } - await Invoice.query(deps.knex) - .patch({ - active: false, - processAt: new Date() - }) - .where('id', invoiceId) - .andWhere('amount', '<=', amountReceived.toString()) -} - // Fetch (and lock) an invoice for work. // Returns the id of the processed invoice (if any). async function processNextInvoice( @@ -135,7 +111,7 @@ async function processNextInvoice( .forUpdate() // If an invoice is locked, don't wait — just come back for it later. .skipLocked() - .where('processAt', '<', now) + .where('processAt', '<=', now) .withGraphFetched('account.asset') const invoice = invoices[0] @@ -170,7 +146,8 @@ async function handleExpired( deps.logger.trace({ amountReceived }, 'deactivating expired invoice') await invoice.$query(deps.knex).patch({ active: false, - processAt: new Date() + // Add 30 seconds to allow a prepared (but not yet fulfilled/rejected) packet to finish before sending webhook event. + processAt: new Date(Date.now() + 30_000) }) } else { deps.logger.debug({ amountReceived }, 'deleting expired invoice') @@ -178,7 +155,7 @@ async function handleExpired( } } -// Withdraw deactivated invoices' liquidity. +// Create webhook event to withdraw deactivated invoices' liquidity. async function handleDeactivated( deps: ServiceDependencies, invoice: Invoice @@ -191,58 +168,33 @@ async function handleDeactivated( if (!amountReceived) { deps.logger.warn( { amountReceived }, - 'invoice with processAt and empty balance' + 'deactivated invoice and empty balance' ) await invoice.$query(deps.knex).patch({ processAt: null }) return } - deps.logger.trace( - { amountReceived }, - 'withdrawing deactivated invoice balance' - ) - const error = await deps.accountingService.createWithdrawal({ - id: invoice.id, - account: invoice, - amount: amountReceived, - timeout: BigInt(deps.webhookService.timeout) * BigInt(1e6) // ms -> ns - }) - if (error) throw error + const type = + amountReceived < invoice.amount + ? InvoiceEventType.InvoiceExpired + : InvoiceEventType.InvoicePaid + deps.logger.trace({ type }, 'creating invoice webhook event') - const { status } = await deps.webhookService.send({ - id: invoice.id, - type: - amountReceived < invoice.amount - ? EventType.InvoiceExpired - : EventType.InvoicePaid, - invoice, - amountReceived - }) - if (status === 200 || status === 205) { - const error = await deps.accountingService.commitWithdrawal(invoice.id) - if (error) throw error - if (status === 200) { - await invoice.$query(deps.knex).patch({ - processAt: null - }) + await InvoiceEvent.query(deps.knex).insertAndFetch({ + type, + data: invoice.toData(amountReceived), + withdrawal: { + accountId: invoice.id, + assetId: invoice.account.assetId, + amount: amountReceived } - } - } catch (error) { - const webhookAttempts = invoice.webhookAttempts + 1 - deps.logger.warn( - { error, webhookAttempts }, - 'webhook attempt failed; retrying' - ) - await deps.accountingService.rollbackWithdrawal(invoice.id) + }) - const processAt = new Date( - invoice.processAt.getTime() + - Math.min(webhookAttempts, 6) * RETRY_BACKOFF_MS - ) await invoice.$query(deps.knex).patch({ - processAt, - webhookAttempts + processAt: null }) + } catch (error) { + deps.logger.warn({ error }, 'webhook event creation failed; retrying') } } diff --git a/packages/backend/src/outgoing_payment/errors.ts b/packages/backend/src/outgoing_payment/errors.ts index eb4d1ca7c5..6e719cbb0e 100644 --- a/packages/backend/src/outgoing_payment/errors.ts +++ b/packages/backend/src/outgoing_payment/errors.ts @@ -1,13 +1,26 @@ import * as Pay from '@interledger/pay' +import { TransferError } from '../accounting/errors' + +enum OutgoingPaymentError { + UnknownPayment = 'UnknownPayment', + WrongState = 'WrongState', + InvalidAmount = 'InvalidAmount' +} + +export const FundingError = { ...OutgoingPaymentError, ...TransferError } +export type FundingError = OutgoingPaymentError | TransferError + +// eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/explicit-module-boundary-types +export const isFundingError = (o: any): o is FundingError => + Object.values(FundingError).includes(o) + export type PaymentError = LifecycleError | Pay.PaymentError export enum LifecycleError { QuoteExpired = 'QuoteExpired', // Rate fetch failed. PricesUnavailable = 'PricesUnavailable', - // Payment aborted via outgoing_payment.funding webhook response. - CancelledByWebhook = 'CancelledByWebhook', // Edge error due to retries, partial payment, and database write errors. BadState = 'BadState', @@ -15,7 +28,6 @@ export enum LifecycleError { MissingBalance = 'MissingBalance', MissingQuote = 'MissingQuote', MissingInvoice = 'MissingInvoice', - MissingWebhook = 'MissingWebhook', InvalidRatio = 'InvalidRatio' } diff --git a/packages/backend/src/outgoing_payment/lifecycle.ts b/packages/backend/src/outgoing_payment/lifecycle.ts index 7490927c53..16ae77411a 100644 --- a/packages/backend/src/outgoing_payment/lifecycle.ts +++ b/packages/backend/src/outgoing_payment/lifecycle.ts @@ -1,10 +1,14 @@ import * as Pay from '@interledger/pay' import { LifecycleError } from './errors' -import { OutgoingPayment, PaymentState } from './model' +import { + OutgoingPayment, + PaymentState, + PaymentEvent, + PaymentEventType +} from './model' import { ServiceDependencies } from './service' import { IlpPlugin } from './ilp_plugin' -import { EventType } from '../webhook/service' const MAX_INT64 = BigInt('9223372036854775807') @@ -60,9 +64,7 @@ export async function handleQuoting( }, 'quote amountToSend bounds error' ) - await payment.$query(deps.knex).patch({ - state: PaymentState.Completed - }) + await handleCompleted(deps, payment) return } } @@ -96,9 +98,7 @@ export async function handleQuoting( // InvoiceAlreadyPaid: the invoice was already paid, either by this payment (which retried due to a failed SENDING→COMPLETED transition commit) or another payment entirely. if (quote === null) { deps.logger.warn('quote invoice already paid') - await payment.$query(deps.knex).patch({ - state: PaymentState.Completed - }) + await handleCompleted(deps, payment) return } @@ -129,6 +129,10 @@ export async function handleQuoting( amountSent } }) + + if (state === PaymentState.Funding) { + await sendWebhookEvent(deps, payment, PaymentEventType.PaymentFunding) + } } // "payment" is locked by the "deps.knex" transaction. @@ -142,44 +146,13 @@ export async function handleFunding( throw LifecycleError.QuoteExpired } - if (!payment.webhookId) throw LifecycleError.MissingWebhook - - const amountSent = await deps.accountingService.getTotalSent(payment.id) - const balance = await deps.accountingService.getBalance(payment.id) - if (amountSent === undefined || balance === undefined) { - throw LifecycleError.MissingBalance - } - - try { - const { status } = await deps.webhookService.send({ - id: payment.webhookId, - type: EventType.PaymentFunding, - payment, - amountSent, - balance - }) - - if (status === 200) { - const error = await deps.accountingService.createDeposit({ - id: payment.webhookId, - account: payment, - amount: payment.quote.maxSourceAmount - }) - if (error) { - throw new Error('Unable to fund payment. error=' + error) - } - await payment.$query(deps.knex).patch({ state: PaymentState.Sending }) - } - } catch (err) { - if (err.isAxiosError && err.response.status === 403) { - await payment.$query(deps.knex).patch({ - state: PaymentState.Cancelled, - error: LifecycleError.CancelledByWebhook - }) - } else { - throw err - } - } + deps.logger.error( + { + activationDeadline: payment.quote.activationDeadline.getTime(), + now: now.getTime() + }, + "handleFunding for payment quote that isn't expired" + ) } // "payment" is locked by the "deps.knex" transaction. @@ -260,9 +233,7 @@ export async function handleSending( }, 'handleSending payment was already paid' ) - await payment.$query(deps.knex).patch({ - state: PaymentState.Completed - }) + await handleCompleted(deps, payment) return } else if ( newMaxSourceAmount <= BigInt(0) || @@ -335,65 +306,52 @@ export async function handleSending( if (receipt.error) throw receipt.error + await handleCompleted(deps, payment) +} + +export async function handleCancelled( + deps: ServiceDependencies, + payment: OutgoingPayment, + error: string +): Promise { await payment.$query(deps.knex).patch({ - state: PaymentState.Completed + state: PaymentState.Cancelled, + error }) + await sendWebhookEvent(deps, payment, PaymentEventType.PaymentCancelled) } -export async function handleCancelledOrCompleted( +const handleCompleted = async ( deps: ServiceDependencies, payment: OutgoingPayment -): Promise { +): Promise => { + await payment.$query(deps.knex).patch({ + state: PaymentState.Completed + }) + await sendWebhookEvent(deps, payment, PaymentEventType.PaymentCompleted) +} + +const sendWebhookEvent = async ( + deps: ServiceDependencies, + payment: OutgoingPayment, + type: PaymentEventType +): Promise => { const amountSent = await deps.accountingService.getTotalSent(payment.id) const balance = await deps.accountingService.getBalance(payment.id) if (amountSent === undefined || balance === undefined) { throw LifecycleError.MissingBalance } - if (!payment.webhookId) throw LifecycleError.MissingWebhook - - if (balance) { - const error = await deps.accountingService.createWithdrawal({ - id: payment.webhookId, - account: payment, - amount: balance, - timeout: BigInt(deps.webhookService.timeout) * BigInt(1e6) // ms -> ns - }) - if (error) throw error - } - - try { - const { status } = await deps.webhookService.send({ - id: payment.webhookId, - type: - payment.state === PaymentState.Cancelled - ? EventType.PaymentCancelled - : EventType.PaymentCompleted, - payment, - amountSent, - balance - }) - if (status === 200 || status === 205) { - if (balance) { - const error = await deps.accountingService.commitWithdrawal( - payment.webhookId - ) - if (error) throw error - } - if (status === 200) { - await payment.$query(deps.knex).patch({ - webhookId: null - }) - } else if (payment.state === PaymentState.Cancelled) { - await payment.$query(deps.knex).patch({ - state: PaymentState.Quoting - }) + const withdrawal = balance + ? { + accountId: payment.id, + assetId: payment.account.assetId, + amount: balance } - } - } catch (error) { - if (balance) { - await deps.accountingService.rollbackWithdrawal(payment.webhookId) - } - throw error - } + : undefined + await PaymentEvent.query(deps.knex).insertAndFetch({ + type, + data: payment.toData({ amountSent, balance }), + withdrawal + }) } diff --git a/packages/backend/src/outgoing_payment/model.ts b/packages/backend/src/outgoing_payment/model.ts index baefeb825e..9e16fe7092 100644 --- a/packages/backend/src/outgoing_payment/model.ts +++ b/packages/backend/src/outgoing_payment/model.ts @@ -1,12 +1,12 @@ import { Pojo, Model, ModelOptions, QueryContext } from 'objection' import * as Pay from '@interledger/pay' -import { v4 as uuid } from 'uuid' import { LiquidityAccount } from '../accounting/service' import { Asset } from '../asset/model' import { ConnectorAccount } from '../connector/core/rafiki' import { Account } from '../open_payments/account/model' import { BaseModel } from '../shared/baseModel' +import { WebhookEvent } from '../webhook/model' const fieldPrefixes = ['intent', 'quote', 'destinationAccount', 'outcome'] @@ -32,8 +32,6 @@ export class OutgoingPayment // The "| null" is necessary so that `$beforeUpdate` can modify a patch to remove the error. If `$beforeUpdate` set `error = undefined`, the patch would ignore the modification. public error?: string | null public stateAttempts!: number - // The "| null" is necessary so that `$beforeUpdate` can modify a patch to remove the webhookId. If `$beforeUpdate` set `webhookId = undefined`, the patch would ignore the modification. - public webhookId?: string | null public intent!: PaymentIntent @@ -84,15 +82,6 @@ export class OutgoingPayment } if (opts.old['state'] !== this.state) { this.stateAttempts = 0 - switch (this.state) { - case PaymentState.Funding: - case PaymentState.Cancelled: - case PaymentState.Completed: - this.webhookId = uuid() - break - default: - this.webhookId = null - } } } } @@ -144,6 +133,59 @@ export class OutgoingPayment } return json } + + public toData({ + amountSent, + balance + }: { + amountSent: bigint + balance: bigint + }): PaymentData { + const data: PaymentData = { + payment: { + id: this.id, + accountId: this.accountId, + state: this.state, + stateAttempts: this.stateAttempts, + intent: { + autoApprove: this.intent.autoApprove + }, + destinationAccount: this.destinationAccount, + createdAt: new Date(+this.createdAt).toISOString(), + outcome: { + amountSent: amountSent.toString() + }, + balance: balance.toString() + } + } + if (this.intent.paymentPointer) { + data.payment.intent.paymentPointer = this.intent.paymentPointer + } + if (this.intent.invoiceUrl) { + data.payment.intent.invoiceUrl = this.intent.invoiceUrl + } + if (this.intent.amountToSend) { + data.payment.intent.amountToSend = this.intent.amountToSend.toString() + } + if (this.error) { + data.payment.error = this.error + } + if (this.quote) { + data.payment.quote = { + ...this.quote, + timestamp: this.quote.timestamp.toISOString(), + activationDeadline: this.quote.activationDeadline.toISOString(), + minDeliveryAmount: this.quote.minDeliveryAmount.toString(), + maxSourceAmount: this.quote.maxSourceAmount.toString(), + maxPacketAmount: this.quote.maxPacketAmount.toString(), + minExchangeRate: this.quote.minExchangeRate.valueOf(), + lowExchangeRateEstimate: this.quote.lowExchangeRateEstimate.valueOf(), + highExchangeRateEstimate: this.quote.highExchangeRateEstimate.valueOf(), + amountSent: this.quote.amountSent.toString() + } + } + return data + } } export enum PaymentState { @@ -164,3 +206,70 @@ export enum PaymentState { // Successful completion. Completed = 'COMPLETED' } + +export enum PaymentDepositType { + PaymentFunding = 'outgoing_payment.funding' +} + +export enum PaymentWithdrawType { + PaymentCancelled = 'outgoing_payment.cancelled', + PaymentCompleted = 'outgoing_payment.completed' +} + +export const PaymentEventType = { + ...PaymentDepositType, + ...PaymentWithdrawType +} +export type PaymentEventType = PaymentDepositType | PaymentWithdrawType + +export type PaymentData = { + invoice?: never + payment: { + id: string + accountId: string + createdAt: string + state: PaymentState + error?: string + stateAttempts: number + intent: { + paymentPointer?: string + invoiceUrl?: string + amountToSend?: string + autoApprove: boolean + } + quote?: { + timestamp: string + activationDeadline: string + targetType: Pay.PaymentType + minDeliveryAmount: string + maxSourceAmount: string + maxPacketAmount: string + minExchangeRate: number + lowExchangeRateEstimate: number + highExchangeRateEstimate: number + amountSent: string + } + destinationAccount: { + scale: number + code: string + url?: string + } + outcome: { + amountSent: string + } + balance: string + } +} + +// eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/explicit-module-boundary-types +export const isPaymentEventType = (o: any): o is PaymentEventType => + Object.values(PaymentEventType).includes(o) + +// eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/explicit-module-boundary-types +export const isPaymentEvent = (o: any): o is PaymentEvent => + o instanceof WebhookEvent && isPaymentEventType(o.type) + +export class PaymentEvent extends WebhookEvent { + public type!: PaymentEventType + public data!: PaymentData +} diff --git a/packages/backend/src/outgoing_payment/service.test.ts b/packages/backend/src/outgoing_payment/service.test.ts index 67423aca58..1e465fb6af 100644 --- a/packages/backend/src/outgoing_payment/service.test.ts +++ b/packages/backend/src/outgoing_payment/service.test.ts @@ -2,9 +2,9 @@ import assert from 'assert' import nock from 'nock' import Knex from 'knex' import * as Pay from '@interledger/pay' -import { URL } from 'url' import { v4 as uuid } from 'uuid' +import { FundingError } from './errors' import { OutgoingPaymentService } from './service' import { createTestApp, TestContainer } from '../tests/app' import { IAppConfig, Config } from '../config/app' @@ -12,7 +12,13 @@ import { IocContract } from '@adonisjs/fold' import { initIocContainer } from '../' import { AppServices } from '../app' import { truncateTable, truncateTables } from '../tests/tableManager' -import { OutgoingPayment, PaymentIntent, PaymentState } from './model' +import { + OutgoingPayment, + PaymentIntent, + PaymentState, + PaymentEvent, + PaymentEventType +} from './model' import { LifecycleError } from './errors' import { RETRY_BACKOFF_SECONDS } from './worker' import { isTransferError } from '../accounting/errors' @@ -20,7 +26,6 @@ import { AccountingService, TransferOptions } from '../accounting/service' import { AssetOptions } from '../asset/service' import { Invoice } from '../open_payments/invoice/model' import { RatesService } from '../rates/service' -import { EventType } from '../webhook/service' describe('OutgoingPaymentService', (): void => { let deps: IocContract @@ -38,39 +43,14 @@ describe('OutgoingPaymentService', (): void => { let amtDelivered: bigint let config: IAppConfig - const webhookUrl = new URL(Config.webhookUrl) - - enum WebhookState { - Funding = PaymentState.Funding, - Cancelled = PaymentState.Cancelled, - Completed = PaymentState.Completed - } - - const isWebhookState = (state: PaymentState): boolean => - Object.values(WebhookState).includes(state) - const webhookTypes: { - [key in WebhookState]: EventType + [key in PaymentState]: PaymentEventType | undefined } = { - [WebhookState.Funding]: EventType.PaymentFunding, - [WebhookState.Cancelled]: EventType.PaymentCancelled, - [WebhookState.Completed]: EventType.PaymentCompleted - } - - function mockWebhookServer( - paymentId: string, - state: PaymentState, - status = 200 - ): nock.Scope { - assert.ok(isWebhookState(state)) - return nock(webhookUrl.origin) - .post(webhookUrl.pathname, (body): boolean => { - expect(body.type).toEqual(webhookTypes[state]) - expect(body.data.payment.id).toEqual(paymentId) - expect(body.data.payment.state).toEqual(state) - return true - }) - .reply(status) + [PaymentState.Quoting]: undefined, + [PaymentState.Funding]: PaymentEventType.PaymentFunding, + [PaymentState.Sending]: undefined, + [PaymentState.Cancelled]: PaymentEventType.PaymentCancelled, + [PaymentState.Completed]: PaymentEventType.PaymentCompleted } async function processNext( @@ -83,6 +63,14 @@ describe('OutgoingPaymentService', (): void => { if (!payment) throw 'no payment' if (expectState) expect(payment.state).toBe(expectState) expect(payment.error).toEqual(expectedError || null) + const type = webhookTypes[payment.state] + if (type) { + await expect( + PaymentEvent.query(knex).where({ + type + }) + ).resolves.not.toHaveLength(0) + } return payment } @@ -144,12 +132,14 @@ describe('OutgoingPaymentService', (): void => { amountSent, amountDelivered, accountBalance, - invoiceReceived + invoiceReceived, + withdrawAmount }: { amountSent?: bigint amountDelivered?: bigint accountBalance?: bigint invoiceReceived?: bigint + withdrawAmount?: bigint } ) { if (amountSent !== undefined) { @@ -170,6 +160,14 @@ describe('OutgoingPaymentService', (): void => { accountingService.getTotalReceived(invoice.id) ).resolves.toEqual(invoiceReceived) } + if (withdrawAmount !== undefined) { + await expect( + PaymentEvent.query(knex).where({ + withdrawalAccountId: payment.id, + withdrawalAmount: withdrawAmount + }) + ).resolves.toHaveLength(1) + } } beforeAll( @@ -522,7 +520,6 @@ describe('OutgoingPaymentService', (): void => { autoApprove: true }) payment = await processNext(paymentId, PaymentState.Funding) - expect(payment.webhookId).not.toBeNull() } ) @@ -544,55 +541,9 @@ describe('OutgoingPaymentService', (): void => { LifecycleError.QuoteExpired ) }) - - it('CANCELLED (wallet cancelled)', async (): Promise => { - const scope = mockWebhookServer(payment.id, PaymentState.Funding, 403) - await processNext( - payment.id, - PaymentState.Cancelled, - LifecycleError.CancelledByWebhook - ) - expect(scope.isDone()).toBe(true) - }) - - it('SENDING (payment funded)', async (): Promise => { - const scope = mockWebhookServer(payment.id, PaymentState.Funding) - await processNext(payment.id, PaymentState.Sending) - expect(scope.isDone()).toBe(true) - await expectOutcome(payment, { - accountBalance: BigInt(123), - amountSent: BigInt(0), - amountDelivered: BigInt(0) - }) - }) - - it('FUNDING (webhook error)', async (): Promise => { - const scope = mockWebhookServer(payment.id, PaymentState.Funding, 504) - await processNext(payment.id, PaymentState.Funding) - expect(scope.isDone()).toBe(true) - }) - - it('FUNDING (webhook timeout)', async (): Promise => { - const scope = nock(webhookUrl.origin) - .post(webhookUrl.pathname) - .delayConnection(Config.webhookTimeout + 1) - .reply(200) - await processNext(payment.id, PaymentState.Funding) - expect(scope.isDone()).toBe(true) - }) }) describe('SENDING→', (): void => { - beforeEach( - async (): Promise => { - // Don't send invoice.paid webhook events - const invoiceService = await deps.use('invoiceService') - jest - .spyOn(invoiceService, 'handlePayment') - .mockImplementation(() => Promise.resolve()) - } - ) - async function setup( opts: Pick< PaymentIntent, @@ -607,10 +558,17 @@ describe('OutgoingPaymentService', (): void => { trackAmountDelivered(paymentId) - await processNext(paymentId, PaymentState.Funding) - const scope = mockWebhookServer(paymentId, PaymentState.Funding) - await processNext(paymentId, PaymentState.Sending) - expect(scope.isDone()).toBe(true) + const payment = await processNext(paymentId, PaymentState.Funding) + assert.ok(payment.quote) + await expect( + outgoingPaymentService.fund({ + id: paymentId, + amount: payment.quote.maxSourceAmount, + transferId: uuid() + }) + ).resolves.toMatchObject({ + state: PaymentState.Sending + }) return paymentId } @@ -641,7 +599,8 @@ describe('OutgoingPaymentService', (): void => { accountBalance: payment.quote.maxSourceAmount - amountSent, amountSent, amountDelivered: invoice.amount, - invoiceReceived: invoice.amount + invoiceReceived: invoice.amount, + withdrawAmount: payment.quote.maxSourceAmount - amountSent }) }) @@ -659,7 +618,8 @@ describe('OutgoingPaymentService', (): void => { accountBalance: payment.quote.maxSourceAmount - amountSent, amountSent, amountDelivered: invoice.amount - amountAlreadyDelivered, - invoiceReceived: invoice.amount + invoiceReceived: invoice.amount, + withdrawAmount: payment.quote.maxSourceAmount - amountSent }) }) @@ -698,7 +658,8 @@ describe('OutgoingPaymentService', (): void => { await expectOutcome(payment, { accountBalance: BigInt(123 - 10 * 5), amountSent: BigInt(10 * 5), - amountDelivered: BigInt(5 * 5) + amountDelivered: BigInt(5 * 5), + withdrawAmount: BigInt(123 - 10 * 5) }) }) @@ -723,7 +684,8 @@ describe('OutgoingPaymentService', (): void => { await expectOutcome(payment, { accountBalance: BigInt(123 - 10), amountSent: BigInt(10), - amountDelivered: BigInt(5) + amountDelivered: BigInt(5), + withdrawAmount: BigInt(123 - 10) }) }) @@ -793,7 +755,8 @@ describe('OutgoingPaymentService', (): void => { accountBalance: payment.quote.maxSourceAmount, amountSent: BigInt(0), amountDelivered: BigInt(0), - invoiceReceived: invoice.amount + invoiceReceived: invoice.amount, + withdrawAmount: payment.quote.maxSourceAmount }) }) @@ -819,128 +782,81 @@ describe('OutgoingPaymentService', (): void => { ) }) }) + }) - describe.each` - state | error - ${PaymentState.Cancelled} | ${Pay.PaymentError.ReceiverProtocolViolation} - ${PaymentState.Completed} | ${undefined} - `('$state→', ({ state, error }): void => { - let paymentId: string - let accountBalance: bigint - - beforeEach( - async (): Promise => { - // Don't send invoice.paid webhook events - const invoiceService = await deps.use('invoiceService') - jest - .spyOn(invoiceService, 'handlePayment') - .mockImplementation(() => Promise.resolve()) - - paymentId = ( - await outgoingPaymentService.create({ - accountId, - invoiceUrl, - autoApprove: true - }) - ).id - - trackAmountDelivered(paymentId) - - await processNext(paymentId, PaymentState.Funding) - const scope = mockWebhookServer(paymentId, PaymentState.Funding) - await processNext(paymentId, PaymentState.Sending) - expect(scope.isDone()).toBe(true) - - if (error) { - jest.spyOn(Pay, 'pay').mockRejectedValueOnce(error) - } - const payment = await processNext(paymentId, state, error) - if (!payment.quote) throw 'no quote' - expect(payment.webhookId).not.toBeNull() - - if (state === PaymentState.Cancelled) { - accountBalance = payment.quote?.maxSourceAmount - await expectOutcome(payment, { - accountBalance, - amountSent: BigInt(0), - amountDelivered: BigInt(0) - }) - } else { - const amountSent = invoice.amount * BigInt(2) - accountBalance = payment.quote?.maxSourceAmount - amountSent - await expectOutcome(payment, { - accountBalance, - amountSent, - amountDelivered: invoice.amount - }) - } - } - ) + describe('fund', (): void => { + let payment: OutgoingPayment + let quoteAmount: bigint - it(`${state} (liquidity withdrawal)`, async (): Promise => { - const scope = mockWebhookServer(paymentId, state) - const payment = await processNext(paymentId, state, error) - expect(scope.isDone()).toBe(true) - expect(payment.webhookId).toBeNull() - await expect(accountingService.getBalance(paymentId)).resolves.toEqual( - BigInt(0) - ) - // Payment is done being processed - await expect( - outgoingPaymentService.processNext() - ).resolves.toBeUndefined() + beforeEach(async (): Promise => { + const { id: paymentId } = await outgoingPaymentService.create({ + accountId, + paymentPointer, + amountToSend: BigInt(123), + autoApprove: false }) + payment = await processNext(paymentId, PaymentState.Funding) + assert.ok(payment.quote) + quoteAmount = payment.quote.maxSourceAmount + await expectOutcome(payment, { accountBalance: BigInt(0) }) + }, 10_000) - it(`${state} (webhook with empty balance)`, async (): Promise => { - jest - .spyOn(accountingService, 'getBalance') - .mockResolvedValueOnce(BigInt(0)) - const withdrawSpy = jest.spyOn(accountingService, 'createWithdrawal') - const scope = mockWebhookServer(paymentId, state) - const payment = await processNext(paymentId, state, error) - expect(scope.isDone()).toBe(true) - expect(withdrawSpy).not.toHaveBeenCalled() - expect(payment.webhookId).toBeNull() - - // Payment is done being processed - await expect( - outgoingPaymentService.processNext() - ).resolves.toBeUndefined() - }) + it('fails when no payment exists', async (): Promise => { + await expect( + outgoingPaymentService.fund({ + id: uuid(), + amount: quoteAmount, + transferId: uuid() + }) + ).resolves.toEqual(FundingError.UnknownPayment) + }) - it(`${state} (webhook error)`, async (): Promise => { - const scope = mockWebhookServer(paymentId, state, 504) - const payment = await processNext(paymentId, state, error) - expect(scope.isDone()).toBe(true) - expect(payment.webhookId).not.toBeNull() - await expect(accountingService.getBalance(paymentId)).resolves.toEqual( - accountBalance - ) + it('transitions a Funding payment to Sending state', async (): Promise => { + await expect( + outgoingPaymentService.fund({ + id: payment.id, + amount: quoteAmount, + transferId: uuid() + }) + ).resolves.toMatchObject({ + id: payment.id, + state: PaymentState.Sending }) - it(`${state} (webhook timeout)`, async (): Promise => { - const scope = nock(webhookUrl.origin) - .post(webhookUrl.pathname) - .delayConnection(Config.webhookTimeout + 1) - .reply(200) - const payment = await processNext(paymentId, state, error) - expect(scope.isDone()).toBe(true) - expect(payment.webhookId).not.toBeNull() - await expect(accountingService.getBalance(paymentId)).resolves.toEqual( - accountBalance - ) - }) + const after = await outgoingPaymentService.get(payment.id) + expect(after?.state).toBe(PaymentState.Sending) + await expectOutcome(payment, { accountBalance: quoteAmount }) + }) - if (state === PaymentState.Cancelled) { - it('QUOTING (withdraw + requote)', async (): Promise => { - const scope = mockWebhookServer(paymentId, state, 205) - await processNext(paymentId, PaymentState.Quoting) - expect(scope.isDone()).toBe(true) - await expect( - accountingService.getBalance(paymentId) - ).resolves.toEqual(BigInt(0)) + it('fails for invalid funding amount', async (): Promise => { + await expect( + outgoingPaymentService.fund({ + id: payment.id, + amount: quoteAmount - BigInt(1), + transferId: uuid() }) - } + ).resolves.toEqual(FundingError.InvalidAmount) + + const after = await outgoingPaymentService.get(payment.id) + expect(after?.state).toBe(PaymentState.Funding) + await expectOutcome(payment, { accountBalance: BigInt(0) }) + }) + + Object.values(PaymentState).forEach((startState) => { + if (startState === PaymentState.Funding) return + it(`does not fund a ${startState} payment`, async (): Promise => { + await payment.$query().patch({ state: startState }) + await expect( + outgoingPaymentService.fund({ + id: payment.id, + amount: quoteAmount, + transferId: uuid() + }) + ).resolves.toEqual(FundingError.WrongState) + + const after = await outgoingPaymentService.get(payment.id) + expect(after?.state).toBe(startState) + }) }) }) diff --git a/packages/backend/src/outgoing_payment/service.ts b/packages/backend/src/outgoing_payment/service.ts index 3a558d5911..912b7bc7e5 100644 --- a/packages/backend/src/outgoing_payment/service.ts +++ b/packages/backend/src/outgoing_payment/service.ts @@ -2,17 +2,20 @@ import { ForeignKeyViolationError, TransactionOrKnex } from 'objection' import * as Pay from '@interledger/pay' import { BaseService } from '../shared/baseService' +import { FundingError, LifecycleError } from './errors' import { OutgoingPayment, PaymentIntent, PaymentState } from './model' import { AccountingService } from '../accounting/service' import { AccountService } from '../open_payments/account/service' import { RatesService } from '../rates/service' -import { WebhookService } from '../webhook/service' import { IlpPlugin, IlpPluginOptions } from './ilp_plugin' import * as worker from './worker' export interface OutgoingPaymentService { get(id: string): Promise create(options: CreateOutgoingPaymentOptions): Promise + fund( + options: FundOutgoingPaymentOptions + ): Promise processNext(): Promise getAccountPage( accountId: string, @@ -32,7 +35,6 @@ export interface ServiceDependencies extends BaseService { accountingService: AccountingService accountService: AccountService ratesService: RatesService - webhookService: WebhookService makeIlpPlugin: (options: IlpPluginOptions) => IlpPlugin } @@ -47,6 +49,7 @@ export async function createOutgoingPaymentService( get: (id) => getOutgoingPayment(deps, id), create: (options: CreateOutgoingPaymentOptions) => createOutgoingPayment(deps, options), + fund: (options) => fundPayment(deps, options), processNext: () => worker.processPendingPayment(deps), getAccountPage: (accountId, pagination) => getAccountPage(deps, accountId, pagination) @@ -140,6 +143,41 @@ async function createOutgoingPayment( } } +export interface FundOutgoingPaymentOptions { + id: string + amount: bigint + transferId: string +} + +async function fundPayment( + deps: ServiceDependencies, + { id, amount, transferId }: FundOutgoingPaymentOptions +): Promise { + return deps.knex.transaction(async (trx) => { + const payment = await OutgoingPayment.query(trx) + .findById(id) + .forUpdate() + .withGraphFetched('account.asset') + if (!payment) return FundingError.UnknownPayment + if (payment.state !== PaymentState.Funding) { + return FundingError.WrongState + } + if (!payment.quote) throw LifecycleError.MissingQuote + if (amount !== payment.quote.maxSourceAmount) + return FundingError.InvalidAmount + const error = await deps.accountingService.createDeposit({ + id: transferId, + account: payment, + amount + }) + if (error) { + return error + } + await payment.$query(trx).patch({ state: PaymentState.Sending }) + return payment + }) +} + interface Pagination { after?: string // Forward pagination: cursor. before?: string // Backward pagination: cursor. diff --git a/packages/backend/src/outgoing_payment/worker.ts b/packages/backend/src/outgoing_payment/worker.ts index 9acb953f1a..df17ef1487 100644 --- a/packages/backend/src/outgoing_payment/worker.ts +++ b/packages/backend/src/outgoing_payment/worker.ts @@ -52,11 +52,7 @@ export async function getPendingPayment( .forUpdate() // Don't wait for a payment that is already being processed. .skipLocked() - .where((builder: knex.QueryBuilder) => { - builder - .whereIn('state', [PaymentState.Quoting, PaymentState.Sending]) - .orWhereNotNull('webhookId') - }) + .whereIn('state', [PaymentState.Quoting, PaymentState.Sending]) // Back off between retries. .andWhere((builder: knex.QueryBuilder) => { builder @@ -100,10 +96,7 @@ export async function handlePaymentLifecycle( { state: payment.state, error, stateAttempts }, 'payment lifecycle failed; cancelling' ) - await payment.$query(deps.knex).patch({ - state: PaymentState.Cancelled, - error - }) + await lifecycle.handleCancelled(deps, payment, error) } } @@ -145,9 +138,6 @@ export async function handlePaymentLifecycle( ) }) }) - case PaymentState.Cancelled: - case PaymentState.Completed: - return lifecycle.handleCancelledOrCompleted(deps, payment).catch(onError) default: deps.logger.warn('unexpected payment in lifecycle') break diff --git a/packages/backend/src/webhook/model.ts b/packages/backend/src/webhook/model.ts new file mode 100644 index 0000000000..51b18e23e9 --- /dev/null +++ b/packages/backend/src/webhook/model.ts @@ -0,0 +1,63 @@ +import { Model, Pojo } from 'objection' + +import { Asset } from '../asset/model' +import { BaseModel } from '../shared/baseModel' + +const fieldPrefixes = ['withdrawal'] + +export class WebhookEvent extends BaseModel { + public static get tableName(): string { + return 'webhookEvents' + } + + public type!: string + public data!: Record + public attempts!: number + public statusCode?: number + public processAt!: Date | null + + public withdrawal?: { + accountId: string + assetId: string + amount: bigint + } + + static relationMappings = { + withdrawalAsset: { + relation: Model.HasOneRelation, + modelClass: Asset, + join: { + from: 'webhookEvents.withdrawalAssetId', + to: 'assets.id' + } + } + } + + $formatDatabaseJson(json: Pojo): Pojo { + for (const prefix of fieldPrefixes) { + if (!json[prefix]) continue + for (const key in json[prefix]) { + json[prefix + key.charAt(0).toUpperCase() + key.slice(1)] = + json[prefix][key] + } + delete json[prefix] + } + return super.$formatDatabaseJson(json) + } + + $parseDatabaseJson(json: Pojo): Pojo { + json = super.$parseDatabaseJson(json) + for (const key in json) { + const prefix = fieldPrefixes.find((prefix) => key.startsWith(prefix)) + if (!prefix) continue + if (json[key] !== null) { + if (!json[prefix]) json[prefix] = {} + json[prefix][ + key.charAt(prefix.length).toLowerCase() + key.slice(prefix.length + 1) + ] = json[key] + } + delete json[key] + } + return json + } +} diff --git a/packages/backend/src/webhook/service.test.ts b/packages/backend/src/webhook/service.test.ts index ecb38e3200..ce5d6da35d 100644 --- a/packages/backend/src/webhook/service.test.ts +++ b/packages/backend/src/webhook/service.test.ts @@ -4,37 +4,50 @@ import { URL } from 'url' import Knex from 'knex' import { v4 as uuid } from 'uuid' +import { WebhookEvent } from './model' import { - EventType, - isPaymentEventType, WebhookService, generateWebhookSignature, - invoiceToData, - paymentToData + RETRY_BACKOFF_MS } from './service' +import { AccountingService } from '../accounting/service' import { createTestApp, TestContainer } from '../tests/app' +import { AccountFactory } from '../tests/accountFactory' import { randomAsset } from '../tests/asset' import { truncateTables } from '../tests/tableManager' import { Config } from '../config/app' import { IocContract } from '@adonisjs/fold' import { initIocContainer } from '../' import { AppServices } from '../app' -import { Invoice } from '../open_payments/invoice/model' -import { OutgoingPayment } from '../outgoing_payment/model' describe('Webhook Service', (): void => { let deps: IocContract let appContainer: TestContainer let webhookService: WebhookService + let accountingService: AccountingService let knex: Knex - let invoice: Invoice - let payment: OutgoingPayment - let amountReceived: bigint - let amountSent: bigint - let balance: bigint let webhookUrl: URL + let event: WebhookEvent const WEBHOOK_SECRET = 'test secret' + async function makeWithdrawalEvent(event: WebhookEvent): Promise { + const assetService = await deps.use('assetService') + const accountFactory = new AccountFactory(accountingService) + const asset = await assetService.getOrCreate(randomAsset()) + const amount = BigInt(10) + const account = await accountFactory.build({ + asset, + balance: amount + }) + await event.$query(knex).patch({ + withdrawal: { + accountId: account.id, + assetId: account.asset.id, + amount + } + }) + } + beforeAll( async (): Promise => { Config.webhookSecret = WEBHOOK_SECRET @@ -42,100 +55,130 @@ describe('Webhook Service', (): void => { appContainer = await createTestApp(deps) knex = await deps.use('knex') webhookService = await deps.use('webhookService') - const accountService = await deps.use('accountService') - const { id: accountId } = await accountService.create({ - asset: randomAsset() - }) - const invoiceService = await deps.use('invoiceService') - invoice = await invoiceService.create({ - accountId, - amount: BigInt(56), - expiresAt: new Date(Date.now() + 60 * 1000), - description: 'description!' - }) - const outgoingPaymentService = await deps.use('outgoingPaymentService') - const config = await deps.use('config') - const invoiceUrl = `${config.publicHost}/invoices/${invoice.id}` - payment = await outgoingPaymentService.create({ - accountId, - invoiceUrl, - autoApprove: false + accountingService = await deps.use('accountingService') + webhookUrl = new URL(Config.webhookUrl) + } + ) + + beforeEach( + async (): Promise => { + event = await WebhookEvent.query(knex).insertAndFetch({ + id: uuid(), + type: 'account.test_event', + data: { + account: { + id: uuid() + } + } }) - amountReceived = BigInt(5) - amountSent = BigInt(10) - balance = BigInt(0) - webhookUrl = new URL(config.webhookUrl) + } + ) + + afterEach( + async (): Promise => { + jest.useRealTimers() + await truncateTables(knex) } ) afterAll( async (): Promise => { await appContainer.shutdown() - await truncateTables(knex) } ) - describe('Send Event', (): void => { - it.each(Object.values(EventType).map((type) => [type]))( - '%s', - async (type): Promise => { - const id = uuid() - nock(webhookUrl.origin) - .post(webhookUrl.pathname, function (this: Definition, body) { - assert.ok(this.headers) - const signature = this.headers['rafiki-signature'] - expect( - generateWebhookSignature( - body, - WEBHOOK_SECRET, - Config.signatureVersion - ) - ).toEqual(signature) - expect(body.id).toEqual(id) - expect(body.type).toEqual(type) - if (isPaymentEventType(type)) { - expect(body.data).toEqual( - paymentToData(payment, amountSent, balance) - ) - } else { - expect(body.data).toEqual(invoiceToData(invoice, amountReceived)) - } - return true - }) - .reply(200) + describe('Get Webhook Event', (): void => { + test('A webhook event can be fetched', async (): Promise => { + await expect(webhookService.getEvent(event.id)).resolves.toEqual(event) + }) - if (isPaymentEventType(type)) { - await webhookService.send({ - id, - type, - payment, - amountSent, - balance - }) - } else { - await webhookService.send({ - id, - type, - invoice, - amountReceived + test('A withdrawal webhook event can be fetched', async (): Promise => { + await makeWithdrawalEvent(event) + assert.ok(event.withdrawal) + await expect(webhookService.getEvent(event.id)).resolves.toEqual(event) + }) + + test('Cannot fetch a bogus webhook event', async (): Promise => { + await expect(webhookService.getEvent(uuid())).resolves.toBeUndefined() + }) + }) + + describe('processNext', (): void => { + function mockWebhookServer(status = 200): nock.Scope { + return nock(webhookUrl.origin) + .post(webhookUrl.pathname, function (this: Definition, body) { + assert.ok(this.headers) + const signature = this.headers['rafiki-signature'] + expect( + generateWebhookSignature( + body, + WEBHOOK_SECRET, + Config.signatureVersion + ) + ).toEqual(signature) + expect(body).toMatchObject({ + id: event.id, + type: event.type, + data: event.data }) - } - } - ) + return true + }) + .reply(status) + } - it('throws for failed request', async (): Promise => { - const scope = nock(webhookUrl.origin).post(webhookUrl.pathname).reply(500) + test('Does not process events not scheduled to be sent', async (): Promise => { + await event.$query(knex).patch({ + processAt: new Date(Date.now() + 30_000) + }) + await expect(webhookService.getEvent(event.id)).resolves.toEqual(event) + await expect(webhookService.processNext()).resolves.toBeUndefined() + }) - await expect( - webhookService.send({ - id: uuid(), - type: EventType.InvoicePaid, - invoice, - amountReceived + test('Sends webhook event', async (): Promise => { + const scope = mockWebhookServer() + await expect(webhookService.processNext()).resolves.toEqual(event.id) + expect(scope.isDone()).toBe(true) + await expect(webhookService.getEvent(event.id)).resolves.toMatchObject({ + attempts: 1, + statusCode: 200, + processAt: null + }) + }) + + test.each([[201], [400], [504]])( + 'Schedules retry if request fails (%i)', + async (status): Promise => { + const scope = mockWebhookServer(status) + await expect(webhookService.processNext()).resolves.toEqual(event.id) + expect(scope.isDone()).toBe(true) + const updatedEvent = await webhookService.getEvent(event.id) + assert.ok(updatedEvent?.processAt) + expect(updatedEvent).toMatchObject({ + attempts: 1, + statusCode: status }) - ).rejects.toThrowError('Request failed with status code 500') + expect(updatedEvent.processAt.getTime()).toBeGreaterThanOrEqual( + event.createdAt.getTime() + RETRY_BACKOFF_MS + ) + } + ) + test('Schedules retry if request times out', async (): Promise => { + const scope = nock(webhookUrl.origin) + .post(webhookUrl.pathname) + .delayConnection(Config.webhookTimeout + 1) + .reply(200) + await expect(webhookService.processNext()).resolves.toEqual(event.id) expect(scope.isDone()).toBe(true) + const updatedEvent = await webhookService.getEvent(event.id) + assert.ok(updatedEvent?.processAt) + expect(updatedEvent).toMatchObject({ + attempts: 1, + statusCode: null + }) + expect(updatedEvent.processAt.getTime()).toBeGreaterThanOrEqual( + event.createdAt.getTime() + RETRY_BACKOFF_MS + ) }) }) }) diff --git a/packages/backend/src/webhook/service.ts b/packages/backend/src/webhook/service.ts index ca15c50a08..cef93c1251 100644 --- a/packages/backend/src/webhook/service.ts +++ b/packages/backend/src/webhook/service.ts @@ -1,123 +1,23 @@ +import assert from 'assert' +import axios from 'axios' import { createHmac } from 'crypto' -import axios, { AxiosResponse } from 'axios' -import { PaymentType } from '@interledger/pay' -import { Logger } from 'pino' +import { WebhookEvent } from './model' import { IAppConfig } from '../config/app' -import { Invoice } from '../open_payments/invoice/model' -import { OutgoingPayment, PaymentState } from '../outgoing_payment/model' +import { BaseService } from '../shared/baseService' -enum InvoiceEventType { - InvoiceExpired = 'invoice.expired', - InvoicePaid = 'invoice.paid' -} - -enum PaymentEventType { - PaymentFunding = 'outgoing_payment.funding', - PaymentCancelled = 'outgoing_payment.cancelled', - PaymentCompleted = 'outgoing_payment.completed' -} - -export const EventType = { ...InvoiceEventType, ...PaymentEventType } -export type EventType = InvoiceEventType | PaymentEventType - -// eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/explicit-module-boundary-types -export const isPaymentEventType = (type: any): type is PaymentEventType => - Object.values(PaymentEventType).includes(type) - -interface InvoiceEvent { - id: string - type: InvoiceEventType - invoice: Invoice - payment?: never - amountReceived: bigint - amountSent?: never - balance?: never -} - -interface PaymentEvent { - id: string - type: PaymentEventType - invoice?: never - payment: OutgoingPayment - amountReceived?: never - amountSent: bigint - balance: bigint -} - -export type EventOptions = InvoiceEvent | PaymentEvent - -interface InvoiceData { - invoice: { - id: string - accountId: string - active: boolean - description?: string - createdAt: string - expiresAt: string - amount: string - received: string - } - payment?: never -} - -interface PaymentData { - invoice?: never - payment: { - id: string - accountId: string - createdAt: string - state: PaymentState - error?: string - stateAttempts: number - intent: { - paymentPointer?: string - invoiceUrl?: string - amountToSend?: string - autoApprove: boolean - } - - quote?: { - timestamp: string - activationDeadline: string - targetType: PaymentType - minDeliveryAmount: string - maxSourceAmount: string - maxPacketAmount: string - minExchangeRate: number - lowExchangeRateEstimate: number - highExchangeRateEstimate: number - } - destinationAccount: { - scale: number - code: string - url?: string - } - outcome: { - amountSent: string - } - balance: string - } -} - -interface WebhookEvent { - id: string - type: EventType - data: InvoiceData | PaymentData -} - -// eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/explicit-module-boundary-types -export const isPaymentEvent = (event: any): event is PaymentEvent => - Object.values(PaymentEventType).includes(event.type) +// First retry waits 10 seconds +// Second retry waits 20 (more) seconds +// Third retry waits 30 (more) seconds, etc. up to 60 seconds +export const RETRY_BACKOFF_MS = 10_000 export interface WebhookService { - send(options: EventOptions): Promise - readonly timeout: number + getEvent(id: string): Promise + processNext(): Promise } -interface ServiceDependencies { +interface ServiceDependencies extends BaseService { config: IAppConfig - logger: Logger } export async function createWebhookService( @@ -128,41 +28,104 @@ export async function createWebhookService( }) const deps = { ...deps_, logger } return { - send: (options) => sendWebhook(deps, options), - timeout: deps.config.webhookTimeout + getEvent: (id) => getWebhookEvent(deps, id), + processNext: () => processNextWebhookEvent(deps) } } -async function sendWebhook( +async function getWebhookEvent( deps: ServiceDependencies, - options: EventOptions -): Promise { - const event = { - id: options.id, - type: options.type, - data: isPaymentEvent(options) - ? paymentToData(options.payment, options.amountSent, options.balance) - : invoiceToData(options.invoice, options.amountReceived) - } + id: string +): Promise { + return WebhookEvent.query(deps.knex).findById(id) +} - const requestHeaders = { - 'Content-Type': 'application/json' - } +// Fetch (and lock) a webhook event for work. +// Returns the id of the processed event (if any). +async function processNextWebhookEvent( + deps_: ServiceDependencies +): Promise { + assert.ok(deps_.knex, 'Knex undefined') + return deps_.knex.transaction(async (trx) => { + const now = Date.now() + const events = await WebhookEvent.query(trx) + .limit(1) + // Ensure the webhook event cannot be processed concurrently by multiple workers. + .forUpdate() + // If a webhook event is locked, don't wait — just come back for it later. + .skipLocked() + .where('processAt', '<=', new Date(now).toISOString()) + + const event = events[0] + if (!event) return + + const deps = { + ...deps_, + knex: trx, + logger: deps_.logger.child({ + event: event.id + }) + } - if (deps.config.webhookSecret) { - requestHeaders['Rafiki-Signature'] = generateWebhookSignature( - event, - deps.config.webhookSecret, - deps.config.signatureVersion - ) - } + await sendWebhookEvent(deps, event) - return await axios.post(deps.config.webhookUrl, event, { - timeout: deps.config.webhookTimeout, - headers: requestHeaders + return event.id }) } +async function sendWebhookEvent( + deps: ServiceDependencies, + event: WebhookEvent +): Promise { + try { + const requestHeaders = { + 'Content-Type': 'application/json' + } + + if (deps.config.webhookSecret) { + requestHeaders['Rafiki-Signature'] = generateWebhookSignature( + event, + deps.config.webhookSecret, + deps.config.signatureVersion + ) + } + + const body = { + id: event.id, + type: event.type, + data: event.data + } + + await axios.post(deps.config.webhookUrl, body, { + timeout: deps.config.webhookTimeout, + headers: requestHeaders, + validateStatus: (status) => status === 200 + }) + + await event.$query(deps.knex).patch({ + attempts: event.attempts + 1, + statusCode: 200, + processAt: null + }) + } catch (err) { + const attempts = event.attempts + 1 + const error = err.message + deps.logger.warn( + { + attempts, + error + }, + 'webhook request failed' + ) + + await event.$query(deps.knex).patch({ + attempts, + statusCode: err.isAxiosError && err.response?.status, + processAt: new Date(Date.now() + Math.min(attempts, 6) * RETRY_BACKOFF_MS) + }) + } +} + export function generateWebhookSignature( event: WebhookEvent, secret: string, @@ -177,58 +140,3 @@ export function generateWebhookSignature( return `t=${timestamp}, v${version}=${digest}` } - -export function invoiceToData( - invoice: Invoice, - amountReceived: bigint -): InvoiceData { - return { - invoice: { - id: invoice.id, - accountId: invoice.accountId, - active: invoice.active, - amount: invoice.amount.toString(), - description: invoice.description, - expiresAt: invoice.expiresAt.toISOString(), - createdAt: new Date(+invoice.createdAt).toISOString(), - received: amountReceived.toString() - } - } -} - -export function paymentToData( - payment: OutgoingPayment, - amountSent: bigint, - balance: bigint -): PaymentData { - return { - payment: { - id: payment.id, - accountId: payment.accountId, - state: payment.state, - error: payment.error || undefined, - stateAttempts: payment.stateAttempts, - intent: { - ...payment.intent, - amountToSend: payment.intent.amountToSend?.toString() - }, - quote: payment.quote && { - ...payment.quote, - timestamp: payment.quote.timestamp.toISOString(), - activationDeadline: payment.quote.activationDeadline.toISOString(), - minDeliveryAmount: payment.quote.minDeliveryAmount.toString(), - maxSourceAmount: payment.quote.maxSourceAmount.toString(), - maxPacketAmount: payment.quote.maxPacketAmount.toString(), - minExchangeRate: payment.quote.minExchangeRate.valueOf(), - lowExchangeRateEstimate: payment.quote.lowExchangeRateEstimate.valueOf(), - highExchangeRateEstimate: payment.quote.highExchangeRateEstimate.valueOf() - }, - destinationAccount: payment.destinationAccount, - createdAt: new Date(+payment.createdAt).toISOString(), - outcome: { - amountSent: amountSent.toString() - }, - balance: balance.toString() - } - } -}