Skip to content

Commit

Permalink
chore(backend): specify webhook event withdrawals
Browse files Browse the repository at this point in the history
If invoice event withdrawal hasn't been created, update the amount.
Otherwise, create new invoice event for subsequent received amounts.
  • Loading branch information
wilsonianb committed Feb 9, 2022
1 parent 1786e7d commit e47695c
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ exports.up = function (knex) {
table.uuid('id').notNullable().primary()

table.string('type').notNullable()
table.jsonb('data').notNullable()
table.json('data').notNullable()
table.integer('attempts').notNullable().defaultTo(0)
table.string('error').nullable()

Expand All @@ -12,7 +12,7 @@ exports.up = function (knex) {
table.foreign('withdrawalAssetId').references('assets.id')
table.bigInteger('withdrawalAmount').nullable()

table.timestamp('processAt').notNullable()
table.timestamp('processAt').notNullable().defaultTo(knex.fn.now())

table.timestamp('createdAt').defaultTo(knex.fn.now())
table.timestamp('updatedAt').defaultTo(knex.fn.now())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ exports.up = function (knex) {
table.string('description').nullable()
table.timestamp('expiresAt').notNullable()
table.bigInteger('amount').notNullable()
table.uuid('eventId').nullable()
table.foreign('eventId').references('webhookEvents.id')

table.timestamp('createdAt').defaultTo(knex.fn.now())
table.timestamp('updatedAt').defaultTo(knex.fn.now())
Expand Down
4 changes: 2 additions & 2 deletions packages/backend/src/accounting/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ export interface LiquidityAccount {
id: string
unit: number
}
onCredit?: (balance: bigint) => Promise<void>
onDebit?: (balance: bigint) => Promise<void>
onCredit?: (balance: bigint) => Promise<LiquidityAccount>
onDebit?: (balance: bigint) => Promise<LiquidityAccount>
}

export interface Deposit {
Expand Down
9 changes: 3 additions & 6 deletions packages/backend/src/graphql/resolvers/liquidity.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1635,8 +1635,7 @@ describe('Liquidity Resolvers', (): void => {
data: payment.toData({
amountSent: BigInt(0),
balance: BigInt(0)
}),
processAt: new Date()
})
})
}
)
Expand Down Expand Up @@ -1778,7 +1777,6 @@ describe('Liquidity Resolvers', (): void => {
id: eventId,
type,
data,
processAt: new Date(),
withdrawal: {
accountId: account.id,
assetId: account.asset.id,
Expand Down Expand Up @@ -1873,13 +1871,12 @@ describe('Liquidity Resolvers', (): void => {

test('Returns error for non-existent webhook event withdrawal', async (): Promise<void> => {
const webhookService = await deps.use('webhookService')
const { type, data, processAt } = (await webhookService.getEvent(
const { type, data } = (await webhookService.getEvent(
eventId
)) as WebhookEvent
const event = await WebhookEvent.query(knex).insertAndFetch({
type,
data,
processAt
data
})
const response = await appContainer.apolloClient
.mutate({
Expand Down
106 changes: 69 additions & 37 deletions packages/backend/src/open_payments/invoice/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,29 @@ import { ConnectorAccount } from '../../connector/core/rafiki'
import { BaseModel } from '../../shared/baseModel'
import { WebhookEvent } from '../../webhook/model'

export enum InvoiceEventType {
InvoiceExpired = 'invoice.expired',
InvoicePaid = 'invoice.paid'
}

export type InvoiceData = {
invoice: {
id: string
accountId: string
description?: string
createdAt: string
expiresAt: string
amount: string
received: string
}
payment?: never
}

export class InvoiceEvent extends WebhookEvent {
public type!: InvoiceEventType
public data!: InvoiceData
}

export class Invoice
extends BaseModel
implements ConnectorAccount, LiquidityAccount {
Expand All @@ -21,6 +44,14 @@ export class Invoice
from: 'invoices.accountId',
to: 'accounts.id'
}
},
event: {
relation: Model.HasOneRelation,
modelClass: InvoiceEvent,
join: {
from: 'invoices.eventId',
to: 'webhookEvents.id'
}
}
}

Expand All @@ -31,37 +62,62 @@ export class Invoice
public description?: string
public expiresAt!: Date
public readonly amount!: bigint
public eventId?: string
public event?: InvoiceEvent

public processAt!: Date | null

public get asset(): Asset {
return this.account.asset
}

public async onCredit(balance: bigint): Promise<void> {
if (balance >= this.amount) {
return await Invoice.transaction(async (trx) => {
public async onCredit(balance: bigint): Promise<Invoice> {
if (this.active && balance < this.amount) {
return this
}
return await Invoice.transaction(async (trx) => {
this.event = await this.$relatedQuery('event', trx)
// Ensure the event cannot be processed concurrently.
.forUpdate()
.first()
if (!this.event || this.event.attempts) {
this.event = await InvoiceEvent.query(trx).insertAndFetch({
type: InvoiceEventType.InvoicePaid,
data: this.toData(balance),
// Add 30 seconds to allow a prepared (but not yet fulfilled/rejected) packet to finish before creating withdrawal.
processAt: new Date(Date.now() + 30_000),
withdrawal: {
accountId: this.id,
assetId: this.account.assetId,
amount: balance
}
})
await this.$query(trx).patch({
active: false
active: false,
eventId: this.event.id
})
await InvoiceEvent.query(trx).insertAndFetch({
type: InvoiceEventType.InvoicePaid,
} else {
// Update the event withdrawal amount if the withdrawal hasn't been created (event.attempts === 0).
await this.event.$query(trx).patchAndFetch({
data: this.toData(balance),
// TODO:
// Add 30 seconds to allow a prepared (but not yet fulfilled/rejected) packet to finish before being deactivated.
// But balance is fixed in the webhook event data.
processAt: new Date()
// Add 30 seconds to allow additional prepared packets to finish before creating withdrawal.
processAt: new Date(Date.now() + 30_000),
withdrawal: {
accountId: this.id,
assetId: this.account.assetId,
amount: balance
}
})
})
}
}
return this
})
}

public toData(amountReceived: bigint): InvoiceData {
return {
invoice: {
id: this.id,
accountId: this.accountId,
active: this.active,
amount: this.amount.toString(),
description: this.description,
expiresAt: this.expiresAt.toISOString(),
Expand All @@ -71,27 +127,3 @@ export class Invoice
}
}
}

export enum InvoiceEventType {
InvoiceExpired = 'invoice.expired',
InvoicePaid = 'invoice.paid'
}

export type InvoiceData = {
invoice: {
id: string
accountId: string
active: boolean
description?: string
createdAt: string
expiresAt: string
amount: string
received: string
}
payment?: never
}

export class InvoiceEvent extends WebhookEvent {
public type!: InvoiceEventType
public data!: InvoiceData
}
84 changes: 67 additions & 17 deletions packages/backend/src/open_payments/invoice/service.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import assert from 'assert'
import Knex from 'knex'
import { WorkerUtils, makeWorkerUtils } from 'graphile-worker'
import { v4 as uuid } from 'uuid'
Expand Down Expand Up @@ -127,35 +128,84 @@ describe('Invoice Service', (): void => {
)

test('Does not deactivate a partially paid invoice', async (): Promise<void> => {
await invoice.onCredit(invoice.amount - BigInt(1))
await expect(
invoice.onCredit(invoice.amount - BigInt(1))
).resolves.toEqual(invoice)
await expect(invoiceService.get(invoice.id)).resolves.toMatchObject({
active: true
})
})

test('Deactivates fully paid invoice', async (): Promise<void> => {
await invoice.onCredit(invoice.amount)
await expect(invoice.onCredit(invoice.amount)).resolves.toMatchObject({
id: invoice.id,
active: false
})
await expect(invoiceService.get(invoice.id)).resolves.toMatchObject({
active: false
})
})

test('Creates invoice.paid webhook event', async (): Promise<void> => {
await expect(
InvoiceEvent.query(knex).where({
type: InvoiceEventType.InvoicePaid
})
).resolves.toHaveLength(0)
await invoice.onCredit(invoice.amount)
await expect(
InvoiceEvent.query(knex)
.whereJsonSupersetOf('data:invoice', {
id: invoice.id
})
.where({
type: InvoiceEventType.InvoicePaid
})
).resolves.toHaveLength(1)
jest.useFakeTimers('modern')
const now = Date.now()
jest.setSystemTime(new Date(now))
expect(invoice.eventId).toBeNull()
await expect(invoice.onCredit(invoice.amount)).resolves.toMatchObject({
event: {
type: InvoiceEventType.InvoicePaid,
data: invoice.toData(invoice.amount),
processAt: new Date(now + 30_000),
withdrawal: {
accountId: invoice.id,
assetId: invoice.account.assetId,
amount: invoice.amount
}
}
})
})

test('Updates invoice.paid webhook event withdrawal amount', async (): Promise<void> => {
const { eventId } = await invoice.onCredit(invoice.amount)
const amount = invoice.amount + BigInt(2)
jest.useFakeTimers('modern')
const now = Date.now()
jest.setSystemTime(new Date(now))
await expect(invoice.onCredit(amount)).resolves.toMatchObject({
event: {
id: eventId,
type: InvoiceEventType.InvoicePaid,
data: invoice.toData(amount),
processAt: new Date(now + 30_000),
withdrawal: {
accountId: invoice.id,
assetId: invoice.account.assetId,
amount
}
}
})
})

test('Creates subsequent invoice.paid webhook event for leftover amount', async (): Promise<void> => {
invoice = await invoice.onCredit(invoice.amount)
assert.ok(invoice.event)
await invoice.event.$query(knex).patch({ attempts: 1 })
const amount = BigInt(1)
jest.useFakeTimers('modern')
const now = Date.now()
jest.setSystemTime(new Date(now))
await expect(invoice.onCredit(amount)).resolves.toMatchObject({
event: {
type: InvoiceEventType.InvoicePaid,
data: invoice.toData(amount),
processAt: new Date(now + 30_000),
withdrawal: {
accountId: invoice.id,
assetId: invoice.account.assetId,
amount
}
}
})
})
})

Expand Down
24 changes: 9 additions & 15 deletions packages/backend/src/outgoing_payment/lifecycle.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import * as Pay from '@interledger/pay'
import assert from 'assert'

import { LifecycleError } from './errors'
import {
Expand All @@ -10,7 +9,6 @@ import {
} from './model'
import { ServiceDependencies } from './service'
import { IlpPlugin } from './ilp_plugin'
import { RETRY_LIMIT_MS } from '../webhook/service'

const MAX_INT64 = BigInt('9223372036854775807')

Expand Down Expand Up @@ -344,20 +342,16 @@ const sendWebhookEvent = async (
throw LifecycleError.MissingBalance
}

const event = await PaymentEvent.query(deps.knex).insertAndFetch({
const withdrawal = balance
? {
accountId: payment.id,
assetId: payment.account.assetId,
amount: balance
}
: undefined
await PaymentEvent.query(deps.knex).insertAndFetch({
type,
data: payment.toData({ amountSent, balance }),
processAt: new Date()
withdrawal
})

if (balance) {
assert.ok(type !== PaymentEventType.PaymentFunding)
const error = await deps.accountingService.createWithdrawal({
id: event.id,
account: payment,
amount: balance,
timeout: BigInt(RETRY_LIMIT_MS) * BigInt(1e6) // ms -> ns
})
if (error) throw new Error(error)
}
}
Loading

0 comments on commit e47695c

Please sign in to comment.