Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backend): manage webhook event lifecycle in webhook service #228

Merged
merged 15 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions docs/transaction-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,47 +87,45 @@ An expired invoice that has never received money is deleted.

Rafiki sends webhook events to notify the wallet of payment lifecycle states that require liquidity to be added or removed.

Webhook event handlers must be idempotent.
Webhook event handlers must be idempotent and return `200`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be "return 200 on success"? What happens if they don't return 200; is the webhook retried by Rafiki?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes and yes


### `EventType`

#### `invoice.expired`

Invoice has expired.

Credit `invoice.received` to the wallet balance for `invoice.accountId` and return `200`.
Credit `invoice.received` to the wallet balance for `invoice.accountId`, and call `Mutation.withdrawEventLiquidity` with the event id.

#### `invoice.paid`

Invoice has received its specified `amount`.

Credit `invoice.received` to the wallet balance for `invoice.accountId` and return `200`.
Credit `invoice.received` to the wallet balance for `invoice.accountId`, and call `Mutation.withdrawEventLiquidity` with the event id.

#### `outgoing_payment.funding`

Payment needs liquidity in order to send quoted amount.

To fund the payment, deduct `quote.maxSourceAmount` from the wallet balance for `payment.accountId` and return `200`.

To cancel the payment, return `403`.
To fund the payment, deduct `quote.maxSourceAmount` from the wallet balance for `payment.accountId` and call `Mutation.depositEventLiquidity` with the event id.

#### `outgoing_payment.cancelled`

Payment was cancelled.

Credit `payment.balance` to the wallet balance for `payment.accountId` and return `200` or `205` to retry the payment.
Credit `payment.balance` to the wallet balance for `payment.accountId`, and call `Mutation.withdrawEventLiquidity` with the event id.

#### `outgoing_payment.completed`

Payment completed sending the quoted amount.

Credit `payment.balance` to the wallet balance for `payment.accountId` and return `200`.
Credit `payment.balance` to the wallet balance for `payment.accountId`, and call `Mutation.withdrawEventLiquidity` with the event id.

### Webhook Event

| Name | Optional | Type | Description |
| :----- | :------- | :------------------------------------------------------------- | :------------------------------------------------ |
| `id` | No | `ID` | Unique ID of the `data` object. |
| `id` | No | `ID` | Unique ID of the webhook event. |
| `type` | No | [`EventType`](#eventtype) | Description of the event. |
| `data` | No | [`Invoice`](#invoice) or [`OutgoingPayment`](#outgoingpayment) | Object containing data associated with the event. |

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ exports.up = function (knex) {
table.string('state').notNullable().index() // PaymentState
table.string('error').nullable()
table.integer('stateAttempts').notNullable().defaultTo(0)
table.string('webhookId').nullable()

table.string('intentPaymentPointer').nullable()
table.string('intentInvoiceUrl').nullable()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
exports.up = function (knex) {
return knex.schema.createTable('webhookEvents', function (table) {
table.uuid('id').notNullable().primary()

table.string('type').notNullable()
table.json('data').notNullable()
table.integer('attempts').notNullable().defaultTo(0)
table.string('error').nullable()

table.uuid('withdrawalAccountId').nullable()
table.uuid('withdrawalAssetId').nullable()
table.foreign('withdrawalAssetId').references('assets.id')
table.bigInteger('withdrawalAmount').nullable()

table.timestamp('processAt').notNullable().defaultTo(knex.fn.now())

table.timestamp('createdAt').defaultTo(knex.fn.now())
table.timestamp('updatedAt').defaultTo(knex.fn.now())

table.index('processAt')
})
}

exports.down = function (knex) {
return knex.schema.dropTableIfExists('webhookEvents')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
exports.up = function (knex) {
return knex.schema.createTable('invoices', function (table) {
table.uuid('id').notNullable().primary()

// Open payments account id
table.uuid('accountId').notNullable()
table.foreign('accountId').references('accounts.id')
table.boolean('active').notNullable()
table.string('description').nullable()
table.timestamp('expiresAt').notNullable()
table.bigInteger('amount').notNullable()
table.uuid('eventId').nullable()
table.foreign('eventId').references('webhookEvents.id')

table.timestamp('createdAt').defaultTo(knex.fn.now())
table.timestamp('updatedAt').defaultTo(knex.fn.now())

table.index(['accountId', 'createdAt', 'id'])

table.index('expiresAt')
/*
TODO: The latest version of knex supports "partial indexes", which would be more efficient for the deactivateInvoice use case. Unfortunately, the only version of 'objection' that supports this version of knex is still in alpha.
// This is a 'partial index' -- expiresAt is only indexed when active=true.
table.index('expiresAt', 'idx_active_expiresAt', {
// Knex partial indices are a very new feature in Knex and appear to be buggy.
//
// Use a 'raw' condition because "knex.where('active',true)" fails with:
// > migration failed with error: create index "idx_active_expiresAt" on "invoices" ("expiresAt") where "active" = $1 - there is no parameter $1
predicate: knex.whereRaw('active = TRUE')
})
*/
})
}

exports.down = function (knex) {
return knex.schema.dropTableIfExists('invoices')
}
7 changes: 4 additions & 3 deletions packages/backend/src/accounting/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { v4 as uuid } from 'uuid'
import {
AccountingService,
LiquidityAccount,
TransferAccount,
Deposit,
Withdrawal
} from './service'
Expand All @@ -24,7 +25,7 @@ import {
startTigerbeetleContainer,
TIGERBEETLE_PORT
} from '../tests/tigerbeetle'
import { AccountFactory, FactoryAccount } from '../tests/accountFactory'
import { AccountFactory } from '../tests/accountFactory'

describe('Accounting Service', (): void => {
let deps: IocContract<AppServices>
Expand Down Expand Up @@ -214,8 +215,8 @@ describe('Accounting Service', (): void => {
${true} | ${'same asset'}
${false} | ${'cross-currency'}
`('$description', ({ sameAsset }): void => {
let sourceAccount: LiquidityAccount
let destinationAccount: FactoryAccount
let sourceAccount: TransferAccount
let destinationAccount: TransferAccount
const startingSourceBalance = BigInt(10)
const startingDestinationLiquidity = BigInt(100)

Expand Down
101 changes: 67 additions & 34 deletions packages/backend/src/accounting/service.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import assert from 'assert'
import {
Client,
CreateAccountError as CreateAccountErrorCode
Expand Down Expand Up @@ -43,6 +44,8 @@ export interface LiquidityAccount {
id: string
unit: number
}
onCredit?: (balance: bigint) => Promise<LiquidityAccount>
onDebit?: (balance: bigint) => Promise<LiquidityAccount>
}

export interface Deposit {
Expand All @@ -55,9 +58,15 @@ export interface Withdrawal extends Deposit {
timeout: bigint
}

export interface TransferAccount extends LiquidityAccount {
asset: LiquidityAccount['asset'] & {
asset: LiquidityAccount['asset']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason for the nested asset property?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needed for createTransfer to populate source/destination accounts an the asset liquidity account in order to call onCredit/onDebit with a LiquidityAccount.
https://github.com/interledger/rafiki/pull/228/files#diff-72a5530c2c89f105f75e071479a382e8f8408459d37968fb93b546cca4783b31R233-R251
(It happens to also prevent createTransfer from being called with an asset liquidity account.)
That said we're not currently calling onCredit/onDebit for asset liquidity accounts...

}
}

export interface TransferOptions {
sourceAccount: LiquidityAccount
destinationAccount: LiquidityAccount
sourceAccount: TransferAccount
destinationAccount: TransferAccount
sourceAmount: bigint
destinationAmount?: bigint
timeout: bigint // nano-seconds
Expand Down Expand Up @@ -218,38 +227,54 @@ export async function createTransfer(
return TransferError.InvalidDestinationAmount
}
const transfers: Required<CreateTransferOptions>[] = []
const sourceAccounts: LiquidityAccount[] = []
const destinationAccounts: LiquidityAccount[] = []

// Same asset
if (sourceAccount.asset.unit === destinationAccount.asset.unit) {
const addTransfer = ({
sourceAccount,
destinationAccount,
amount
}: {
sourceAccount: LiquidityAccount
destinationAccount: LiquidityAccount
amount: bigint
}) => {
transfers.push({
id: uuid(),
sourceAccountId: sourceAccount.id,
destinationAccountId: destinationAccount.id,
amount,
timeout
})
sourceAccounts.push(sourceAccount)
destinationAccounts.push(destinationAccount)
}

// Same asset
if (sourceAccount.asset.unit === destinationAccount.asset.unit) {
addTransfer({
sourceAccount,
destinationAccount,
amount:
destinationAmount && destinationAmount < sourceAmount
? destinationAmount
: sourceAmount,
timeout
: sourceAmount
})
// Same asset, different amounts
if (destinationAmount && sourceAmount !== destinationAmount) {
// Send excess source amount to liquidity account
if (destinationAmount < sourceAmount) {
transfers.push({
id: uuid(),
sourceAccountId: sourceAccount.id,
destinationAccountId: sourceAccount.asset.id,
amount: sourceAmount - destinationAmount,
timeout
addTransfer({
sourceAccount,
destinationAccount: sourceAccount.asset,
amount: sourceAmount - destinationAmount
})
// Deliver excess destination amount from liquidity account
} else {
transfers.push({
id: uuid(),
sourceAccountId: destinationAccount.asset.id,
destinationAccountId: destinationAccount.id,
amount: destinationAmount - sourceAmount,
timeout
addTransfer({
sourceAccount: destinationAccount.asset,
destinationAccount,
amount: destinationAmount - sourceAmount
})
}
}
Expand All @@ -261,22 +286,16 @@ export async function createTransfer(
}
// Send to source liquidity account
// Deliver from destination liquidity account
transfers.push(
{
id: uuid(),
sourceAccountId: sourceAccount.id,
destinationAccountId: sourceAccount.asset.id,
amount: sourceAmount,
timeout
},
{
id: uuid(),
sourceAccountId: destinationAccount.asset.id,
destinationAccountId: destinationAccount.id,
amount: destinationAmount,
timeout
}
)
addTransfer({
sourceAccount,
destinationAccount: sourceAccount.asset,
amount: sourceAmount
})
addTransfer({
sourceAccount: destinationAccount.asset,
destinationAccount,
amount: destinationAmount
})
}
const error = await createTransfers(deps, transfers)
if (error) {
Expand Down Expand Up @@ -308,6 +327,20 @@ export async function createTransfer(
if (error) {
return error.error
}
for (const account of sourceAccounts) {
if (account.onDebit) {
const balance = await getAccountBalance(deps, account.id)
assert.ok(balance !== undefined)
await account.onDebit(balance)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is onDebit implemented? I grepped the code and didn't find it. Or is it just for symmetry with onCredit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will likely be used for asset and peer liquidity webhooks, but is currently unused and could be removed.

}
}
for (const account of destinationAccounts) {
if (account.onCredit) {
const balance = await getAccountBalance(deps, account.id)
assert.ok(balance !== undefined)
await account.onCredit(balance)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if a getAccountBalance or onCredit fails?

Right now:

  • commit's promise will reject.
  • Any other onCredit functions later in the loop were never executed.

Is that safe here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

07661cf makes this only call onCredit for createTransfer's single destinationAccount.
@matdehaast mentioned elsewhere that we don't need balance-triggered liquidity webhook events for assets or peers in the short term. This should suffice for invoice and web monetization events for now.

}
}
},
rollback: async (): Promise<void | TransferError> => {
const error = await rollbackTransfers(
Expand Down
21 changes: 21 additions & 0 deletions packages/backend/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ export class App {
for (let i = 0; i < this.config.invoiceWorkers; i++) {
process.nextTick(() => this.processInvoice())
}
for (let i = 0; i < this.config.webhookWorkers; i++) {
process.nextTick(() => this.processWebhook())
}
}
}

Expand Down Expand Up @@ -288,4 +291,22 @@ export class App {
).unref()
})
}

private async processWebhook(): Promise<void> {
const webhookService = await this.container.use('webhookService')
return webhookService
.processNext()
.catch((err) => {
this.logger.warn({ error: err.message }, 'processWebhook error')
return true
})
Comment on lines +299 to +302
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a single one fails, does this whole process crash?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think returning true in the .catch causes the .then 👇 to be called

.then((hasMoreWork) => {
if (hasMoreWork) process.nextTick(() => this.processWebhook())
else
setTimeout(
() => this.processWebhook(),
this.config.webhookWorkerIdle
).unref()
})
}
}
2 changes: 2 additions & 0 deletions packages/backend/src/config/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ export const Config = {
invoiceWorkers: envInt('INVOICE_WORKERS', 1),
invoiceWorkerIdle: envInt('INVOICE_WORKER_IDLE', 200), // milliseconds

webhookWorkers: envInt('WEBHOOK_WORKERS', 1),
webhookWorkerIdle: envInt('WEBHOOK_WORKER_IDLE', 200), // milliseconds
webhookUrl: envString('WEBHOOK_URL', 'http://127.0.0.1:4001/webhook'),
webhookSecret: process.env.WEBHOOK_SECRET, // optional
webhookTimeout: envInt('WEBHOOK_TIMEOUT', 2000), // milliseconds
Expand Down
6 changes: 5 additions & 1 deletion packages/backend/src/connector/core/factories/account.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ const accountAttrs = {
id: Faker.datatype.uuid(),
code: assetCode,
scale: assetScale,
unit: Faker.datatype.number()
unit: Faker.datatype.number(),
asset: {
id: Faker.datatype.uuid(),
unit: Faker.datatype.number()
}
},
balance: 0n
}
Expand Down
Loading