From e47695c9611f756a70aa630b847d83030dd34462 Mon Sep 17 00:00:00 2001 From: Brandon Wilson Date: Wed, 9 Feb 2022 14:29:20 -0600 Subject: [PATCH] chore(backend): specify webhook event withdrawals If invoice event withdrawal hasn't been created, update the amount. Otherwise, create new invoice event for subsequent received amounts. --- ...20131110501_create_webhook_events_table.js | 4 +- ...> 20220209103122_create_invoices_table.js} | 2 + packages/backend/src/accounting/service.ts | 4 +- .../src/graphql/resolvers/liquidity.test.ts | 9 +- .../src/open_payments/invoice/model.ts | 106 ++++++++++++------ .../src/open_payments/invoice/service.test.ts | 84 +++++++++++--- .../backend/src/outgoing_payment/lifecycle.ts | 24 ++-- .../src/outgoing_payment/service.test.ts | 40 +++---- packages/backend/src/webhook/service.test.ts | 5 +- packages/backend/src/webhook/service.ts | 1 + 10 files changed, 170 insertions(+), 109 deletions(-) rename packages/backend/migrations/{20211022210203_create_invoices_table.js => 20220209103122_create_invoices_table.js} (93%) diff --git a/packages/backend/migrations/20220131110501_create_webhook_events_table.js b/packages/backend/migrations/20220131110501_create_webhook_events_table.js index 2b5254baf7..7fda4c60fe 100644 --- a/packages/backend/migrations/20220131110501_create_webhook_events_table.js +++ b/packages/backend/migrations/20220131110501_create_webhook_events_table.js @@ -3,7 +3,7 @@ exports.up = function (knex) { table.uuid('id').notNullable().primary() table.string('type').notNullable() - table.jsonb('data').notNullable() + table.json('data').notNullable() table.integer('attempts').notNullable().defaultTo(0) table.string('error').nullable() @@ -12,7 +12,7 @@ exports.up = function (knex) { table.foreign('withdrawalAssetId').references('assets.id') table.bigInteger('withdrawalAmount').nullable() - table.timestamp('processAt').notNullable() + table.timestamp('processAt').notNullable().defaultTo(knex.fn.now()) table.timestamp('createdAt').defaultTo(knex.fn.now()) table.timestamp('updatedAt').defaultTo(knex.fn.now()) diff --git a/packages/backend/migrations/20211022210203_create_invoices_table.js b/packages/backend/migrations/20220209103122_create_invoices_table.js similarity index 93% rename from packages/backend/migrations/20211022210203_create_invoices_table.js rename to packages/backend/migrations/20220209103122_create_invoices_table.js index b8d4c932d8..b8a8454d5a 100644 --- a/packages/backend/migrations/20211022210203_create_invoices_table.js +++ b/packages/backend/migrations/20220209103122_create_invoices_table.js @@ -9,6 +9,8 @@ exports.up = function (knex) { table.string('description').nullable() table.timestamp('expiresAt').notNullable() table.bigInteger('amount').notNullable() + table.uuid('eventId').nullable() + table.foreign('eventId').references('webhookEvents.id') table.timestamp('createdAt').defaultTo(knex.fn.now()) table.timestamp('updatedAt').defaultTo(knex.fn.now()) diff --git a/packages/backend/src/accounting/service.ts b/packages/backend/src/accounting/service.ts index 3a4b10a549..5424fdfc59 100644 --- a/packages/backend/src/accounting/service.ts +++ b/packages/backend/src/accounting/service.ts @@ -44,8 +44,8 @@ export interface LiquidityAccount { id: string unit: number } - onCredit?: (balance: bigint) => Promise - onDebit?: (balance: bigint) => Promise + onCredit?: (balance: bigint) => Promise + onDebit?: (balance: bigint) => Promise } export interface Deposit { diff --git a/packages/backend/src/graphql/resolvers/liquidity.test.ts b/packages/backend/src/graphql/resolvers/liquidity.test.ts index 0f9d47d322..260db18fa4 100644 --- a/packages/backend/src/graphql/resolvers/liquidity.test.ts +++ b/packages/backend/src/graphql/resolvers/liquidity.test.ts @@ -1635,8 +1635,7 @@ describe('Liquidity Resolvers', (): void => { data: payment.toData({ amountSent: BigInt(0), balance: BigInt(0) - }), - processAt: new Date() + }) }) } ) @@ -1778,7 +1777,6 @@ describe('Liquidity Resolvers', (): void => { id: eventId, type, data, - processAt: new Date(), withdrawal: { accountId: account.id, assetId: account.asset.id, @@ -1873,13 +1871,12 @@ describe('Liquidity Resolvers', (): void => { test('Returns error for non-existent webhook event withdrawal', async (): Promise => { const webhookService = await deps.use('webhookService') - const { type, data, processAt } = (await webhookService.getEvent( + const { type, data } = (await webhookService.getEvent( eventId )) as WebhookEvent const event = await WebhookEvent.query(knex).insertAndFetch({ type, - data, - processAt + data }) const response = await appContainer.apolloClient .mutate({ diff --git a/packages/backend/src/open_payments/invoice/model.ts b/packages/backend/src/open_payments/invoice/model.ts index 570590e8a5..d9295a0ceb 100644 --- a/packages/backend/src/open_payments/invoice/model.ts +++ b/packages/backend/src/open_payments/invoice/model.ts @@ -6,6 +6,29 @@ import { ConnectorAccount } from '../../connector/core/rafiki' import { BaseModel } from '../../shared/baseModel' import { WebhookEvent } from '../../webhook/model' +export enum InvoiceEventType { + InvoiceExpired = 'invoice.expired', + InvoicePaid = 'invoice.paid' +} + +export type InvoiceData = { + invoice: { + id: string + accountId: string + description?: string + createdAt: string + expiresAt: string + amount: string + received: string + } + payment?: never +} + +export class InvoiceEvent extends WebhookEvent { + public type!: InvoiceEventType + public data!: InvoiceData +} + export class Invoice extends BaseModel implements ConnectorAccount, LiquidityAccount { @@ -21,6 +44,14 @@ export class Invoice from: 'invoices.accountId', to: 'accounts.id' } + }, + event: { + relation: Model.HasOneRelation, + modelClass: InvoiceEvent, + join: { + from: 'invoices.eventId', + to: 'webhookEvents.id' + } } } @@ -31,6 +62,8 @@ export class Invoice public description?: string public expiresAt!: Date public readonly amount!: bigint + public eventId?: string + public event?: InvoiceEvent public processAt!: Date | null @@ -38,22 +71,46 @@ export class Invoice return this.account.asset } - public async onCredit(balance: bigint): Promise { - if (balance >= this.amount) { - return await Invoice.transaction(async (trx) => { + public async onCredit(balance: bigint): Promise { + if (this.active && balance < this.amount) { + return this + } + return await Invoice.transaction(async (trx) => { + this.event = await this.$relatedQuery('event', trx) + // Ensure the event cannot be processed concurrently. + .forUpdate() + .first() + if (!this.event || this.event.attempts) { + this.event = await InvoiceEvent.query(trx).insertAndFetch({ + type: InvoiceEventType.InvoicePaid, + data: this.toData(balance), + // Add 30 seconds to allow a prepared (but not yet fulfilled/rejected) packet to finish before creating withdrawal. + processAt: new Date(Date.now() + 30_000), + withdrawal: { + accountId: this.id, + assetId: this.account.assetId, + amount: balance + } + }) await this.$query(trx).patch({ - active: false + active: false, + eventId: this.event.id }) - await InvoiceEvent.query(trx).insertAndFetch({ - type: InvoiceEventType.InvoicePaid, + } else { + // Update the event withdrawal amount if the withdrawal hasn't been created (event.attempts === 0). + await this.event.$query(trx).patchAndFetch({ data: this.toData(balance), - // TODO: - // Add 30 seconds to allow a prepared (but not yet fulfilled/rejected) packet to finish before being deactivated. - // But balance is fixed in the webhook event data. - processAt: new Date() + // Add 30 seconds to allow additional prepared packets to finish before creating withdrawal. + processAt: new Date(Date.now() + 30_000), + withdrawal: { + accountId: this.id, + assetId: this.account.assetId, + amount: balance + } }) - }) - } + } + return this + }) } public toData(amountReceived: bigint): InvoiceData { @@ -61,7 +118,6 @@ export class Invoice invoice: { id: this.id, accountId: this.accountId, - active: this.active, amount: this.amount.toString(), description: this.description, expiresAt: this.expiresAt.toISOString(), @@ -71,27 +127,3 @@ export class Invoice } } } - -export enum InvoiceEventType { - InvoiceExpired = 'invoice.expired', - InvoicePaid = 'invoice.paid' -} - -export type InvoiceData = { - invoice: { - id: string - accountId: string - active: boolean - description?: string - createdAt: string - expiresAt: string - amount: string - received: string - } - payment?: never -} - -export class InvoiceEvent extends WebhookEvent { - public type!: InvoiceEventType - public data!: InvoiceData -} diff --git a/packages/backend/src/open_payments/invoice/service.test.ts b/packages/backend/src/open_payments/invoice/service.test.ts index 1083594a18..6e89370b17 100644 --- a/packages/backend/src/open_payments/invoice/service.test.ts +++ b/packages/backend/src/open_payments/invoice/service.test.ts @@ -1,3 +1,4 @@ +import assert from 'assert' import Knex from 'knex' import { WorkerUtils, makeWorkerUtils } from 'graphile-worker' import { v4 as uuid } from 'uuid' @@ -127,35 +128,84 @@ describe('Invoice Service', (): void => { ) test('Does not deactivate a partially paid invoice', async (): Promise => { - await invoice.onCredit(invoice.amount - BigInt(1)) + await expect( + invoice.onCredit(invoice.amount - BigInt(1)) + ).resolves.toEqual(invoice) await expect(invoiceService.get(invoice.id)).resolves.toMatchObject({ active: true }) }) test('Deactivates fully paid invoice', async (): Promise => { - await invoice.onCredit(invoice.amount) + await expect(invoice.onCredit(invoice.amount)).resolves.toMatchObject({ + id: invoice.id, + active: false + }) await expect(invoiceService.get(invoice.id)).resolves.toMatchObject({ active: false }) }) test('Creates invoice.paid webhook event', async (): Promise => { - await expect( - InvoiceEvent.query(knex).where({ - type: InvoiceEventType.InvoicePaid - }) - ).resolves.toHaveLength(0) - await invoice.onCredit(invoice.amount) - await expect( - InvoiceEvent.query(knex) - .whereJsonSupersetOf('data:invoice', { - id: invoice.id - }) - .where({ - type: InvoiceEventType.InvoicePaid - }) - ).resolves.toHaveLength(1) + jest.useFakeTimers('modern') + const now = Date.now() + jest.setSystemTime(new Date(now)) + expect(invoice.eventId).toBeNull() + await expect(invoice.onCredit(invoice.amount)).resolves.toMatchObject({ + event: { + type: InvoiceEventType.InvoicePaid, + data: invoice.toData(invoice.amount), + processAt: new Date(now + 30_000), + withdrawal: { + accountId: invoice.id, + assetId: invoice.account.assetId, + amount: invoice.amount + } + } + }) + }) + + test('Updates invoice.paid webhook event withdrawal amount', async (): Promise => { + const { eventId } = await invoice.onCredit(invoice.amount) + const amount = invoice.amount + BigInt(2) + jest.useFakeTimers('modern') + const now = Date.now() + jest.setSystemTime(new Date(now)) + await expect(invoice.onCredit(amount)).resolves.toMatchObject({ + event: { + id: eventId, + type: InvoiceEventType.InvoicePaid, + data: invoice.toData(amount), + processAt: new Date(now + 30_000), + withdrawal: { + accountId: invoice.id, + assetId: invoice.account.assetId, + amount + } + } + }) + }) + + test('Creates subsequent invoice.paid webhook event for leftover amount', async (): Promise => { + invoice = await invoice.onCredit(invoice.amount) + assert.ok(invoice.event) + await invoice.event.$query(knex).patch({ attempts: 1 }) + const amount = BigInt(1) + jest.useFakeTimers('modern') + const now = Date.now() + jest.setSystemTime(new Date(now)) + await expect(invoice.onCredit(amount)).resolves.toMatchObject({ + event: { + type: InvoiceEventType.InvoicePaid, + data: invoice.toData(amount), + processAt: new Date(now + 30_000), + withdrawal: { + accountId: invoice.id, + assetId: invoice.account.assetId, + amount + } + } + }) }) }) diff --git a/packages/backend/src/outgoing_payment/lifecycle.ts b/packages/backend/src/outgoing_payment/lifecycle.ts index fffba8916a..16ae77411a 100644 --- a/packages/backend/src/outgoing_payment/lifecycle.ts +++ b/packages/backend/src/outgoing_payment/lifecycle.ts @@ -1,5 +1,4 @@ import * as Pay from '@interledger/pay' -import assert from 'assert' import { LifecycleError } from './errors' import { @@ -10,7 +9,6 @@ import { } from './model' import { ServiceDependencies } from './service' import { IlpPlugin } from './ilp_plugin' -import { RETRY_LIMIT_MS } from '../webhook/service' const MAX_INT64 = BigInt('9223372036854775807') @@ -344,20 +342,16 @@ const sendWebhookEvent = async ( throw LifecycleError.MissingBalance } - const event = await PaymentEvent.query(deps.knex).insertAndFetch({ + const withdrawal = balance + ? { + accountId: payment.id, + assetId: payment.account.assetId, + amount: balance + } + : undefined + await PaymentEvent.query(deps.knex).insertAndFetch({ type, data: payment.toData({ amountSent, balance }), - processAt: new Date() + withdrawal }) - - if (balance) { - assert.ok(type !== PaymentEventType.PaymentFunding) - const error = await deps.accountingService.createWithdrawal({ - id: event.id, - account: payment, - amount: balance, - timeout: BigInt(RETRY_LIMIT_MS) * BigInt(1e6) // ms -> ns - }) - if (error) throw new Error(error) - } } diff --git a/packages/backend/src/outgoing_payment/service.test.ts b/packages/backend/src/outgoing_payment/service.test.ts index aafb06e221..1e465fb6af 100644 --- a/packages/backend/src/outgoing_payment/service.test.ts +++ b/packages/backend/src/outgoing_payment/service.test.ts @@ -23,7 +23,6 @@ import { LifecycleError } from './errors' import { RETRY_BACKOFF_SECONDS } from './worker' import { isTransferError } from '../accounting/errors' import { AccountingService, TransferOptions } from '../accounting/service' -import { uuidToBigInt } from '../accounting/utils' import { AssetOptions } from '../asset/service' import { Invoice } from '../open_payments/invoice/model' import { RatesService } from '../rates/service' @@ -67,11 +66,9 @@ describe('OutgoingPaymentService', (): void => { const type = webhookTypes[payment.state] if (type) { await expect( - PaymentEvent.query(knex) - .whereJsonSupersetOf('data:payment', { - id: payment.id - }) - .andWhere({ type }) + PaymentEvent.query(knex).where({ + type + }) ).resolves.not.toHaveLength(0) } return payment @@ -164,14 +161,12 @@ describe('OutgoingPaymentService', (): void => { ).resolves.toEqual(invoiceReceived) } if (withdrawAmount !== undefined) { - const tigerbeetle = await deps.use('tigerbeetle') await expect( - tigerbeetle.lookupAccounts([uuidToBigInt(payment.id)]) - ).resolves.toMatchObject([ - { - debits_reserved: withdrawAmount - } - ]) + PaymentEvent.query(knex).where({ + withdrawalAccountId: payment.id, + withdrawalAmount: withdrawAmount + }) + ).resolves.toHaveLength(1) } } @@ -549,15 +544,6 @@ describe('OutgoingPaymentService', (): void => { }) describe('SENDING→', (): void => { - beforeEach( - async (): Promise => { - // Don't send invoice.paid webhook events - jest - .spyOn(invoice, 'onCredit') - .mockImplementation(() => Promise.resolve()) - } - ) - async function setup( opts: Pick< PaymentIntent, @@ -610,7 +596,7 @@ describe('OutgoingPaymentService', (): void => { if (!payment.quote) throw 'no quote' const amountSent = invoice.amount * BigInt(2) await expectOutcome(payment, { - accountBalance: BigInt(0), + accountBalance: payment.quote.maxSourceAmount - amountSent, amountSent, amountDelivered: invoice.amount, invoiceReceived: invoice.amount, @@ -629,7 +615,7 @@ describe('OutgoingPaymentService', (): void => { if (!payment.quote) throw 'no quote' const amountSent = (invoice.amount - amountAlreadyDelivered) * BigInt(2) await expectOutcome(payment, { - accountBalance: BigInt(0), + accountBalance: payment.quote.maxSourceAmount - amountSent, amountSent, amountDelivered: invoice.amount - amountAlreadyDelivered, invoiceReceived: invoice.amount, @@ -670,7 +656,7 @@ describe('OutgoingPaymentService', (): void => { expect(payment.stateAttempts).toBe(0) // "mockPay" allows a small amount of money to be paid every attempt. await expectOutcome(payment, { - accountBalance: BigInt(0), + accountBalance: BigInt(123 - 10 * 5), amountSent: BigInt(10 * 5), amountDelivered: BigInt(5 * 5), withdrawAmount: BigInt(123 - 10 * 5) @@ -696,7 +682,7 @@ describe('OutgoingPaymentService', (): void => { Pay.PaymentError.ReceiverProtocolViolation ) await expectOutcome(payment, { - accountBalance: BigInt(0), + accountBalance: BigInt(123 - 10), amountSent: BigInt(10), amountDelivered: BigInt(5), withdrawAmount: BigInt(123 - 10) @@ -766,7 +752,7 @@ describe('OutgoingPaymentService', (): void => { const payment = await processNext(paymentId, PaymentState.Completed) if (!payment.quote) throw 'no quote' await expectOutcome(payment, { - accountBalance: BigInt(0), + accountBalance: payment.quote.maxSourceAmount, amountSent: BigInt(0), amountDelivered: BigInt(0), invoiceReceived: invoice.amount, diff --git a/packages/backend/src/webhook/service.test.ts b/packages/backend/src/webhook/service.test.ts index d740363752..5010c7f8af 100644 --- a/packages/backend/src/webhook/service.test.ts +++ b/packages/backend/src/webhook/service.test.ts @@ -72,8 +72,7 @@ describe('Webhook Service', (): void => { account: { id: uuid() } - }, - processAt: new Date() + } }) } ) @@ -165,7 +164,7 @@ describe('Webhook Service', (): void => { await expect(webhookService.processNext()).resolves.toEqual(event.id) expect(scope.isDone()).toBe(true) await expect(webhookService.getEvent(event.id)).resolves.toMatchObject({ - attempts: 0, + attempts: 1, error: null, processAt: new Date(event.createdAt.getTime() + RETENTION_LIMIT_MS) }) diff --git a/packages/backend/src/webhook/service.ts b/packages/backend/src/webhook/service.ts index 805f4d8d0b..10b28c5381 100644 --- a/packages/backend/src/webhook/service.ts +++ b/packages/backend/src/webhook/service.ts @@ -132,6 +132,7 @@ async function sendWebhookEvent( }) await event.$query(deps.knex).patch({ + attempts: event.attempts + 1, error: null, processAt: new Date(event.createdAt.getTime() + RETENTION_LIMIT_MS) })