Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backend): manage webhook event lifecycle in webhook service #228

Merged
merged 15 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 16 additions & 19 deletions docs/transaction-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,47 +87,45 @@ An expired invoice that has never received money is deleted.

Rafiki sends webhook events to notify the wallet of payment lifecycle states that require liquidity to be added or removed.

Webhook event handlers must be idempotent.
Webhook event handlers must be idempotent and return `200` on success. Rafiki will retry unsuccessful webhook requests for up to one day.

### `EventType`

#### `invoice.expired`

Invoice has expired.

Credit `invoice.received` to the wallet balance for `invoice.accountId` and return `200`.
Credit `invoice.received` to the wallet balance for `invoice.accountId`, and call `Mutation.withdrawEventLiquidity` with the event id.

#### `invoice.paid`

Invoice has received its specified `amount`.

Credit `invoice.received` to the wallet balance for `invoice.accountId` and return `200`.
Credit `invoice.received` to the wallet balance for `invoice.accountId`, and call `Mutation.withdrawEventLiquidity` with the event id.

#### `outgoing_payment.funding`

Payment needs liquidity in order to send quoted amount.

To fund the payment, deduct `quote.maxSourceAmount` from the wallet balance for `payment.accountId` and return `200`.

To cancel the payment, return `403`.
To fund the payment, deduct `quote.maxSourceAmount` from the wallet balance for `payment.accountId` and call `Mutation.depositEventLiquidity` with the event id.

#### `outgoing_payment.cancelled`

Payment was cancelled.

Credit `payment.balance` to the wallet balance for `payment.accountId` and return `200` or `205` to retry the payment.
Credit `payment.balance` to the wallet balance for `payment.accountId`, and call `Mutation.withdrawEventLiquidity` with the event id.

#### `outgoing_payment.completed`

Payment completed sending the quoted amount.

Credit `payment.balance` to the wallet balance for `payment.accountId` and return `200`.
Credit `payment.balance` to the wallet balance for `payment.accountId`, and call `Mutation.withdrawEventLiquidity` with the event id.

### Webhook Event

| Name | Optional | Type | Description |
| :----- | :------- | :------------------------------------------------------------- | :------------------------------------------------ |
| `id` | No | `ID` | Unique ID of the `data` object. |
| `id` | No | `ID` | Unique ID of the webhook event. |
| `type` | No | [`EventType`](#eventtype) | Description of the event. |
| `data` | No | [`Invoice`](#invoice) or [`OutgoingPayment`](#outgoingpayment) | Object containing data associated with the event. |

Expand Down Expand Up @@ -187,13 +185,12 @@ The intent must include `invoiceUrl` xor (`paymentPointer` and `amountToSend`).

### `Invoice`

| Name | Optional | Type | Description |
| :------------ | :------- | :-------- | :----------------------------------------------------------------------------------------------------------------------------- |
| `id` | No | `ID` | Unique ID for this invoice, randomly generated by Rafiki. |
| `accountId` | No | `String` | Id of the recipient's Open Payments account. |
| `amount` | No | `UInt64` | The amount that must be paid at the time the invoice is created, in base units of the account asset. |
| `received` | No | `UInt64` | The total amount received, in base units of the account asset. |
| `active` | No | `Boolean` | If `true`, the invoice may receive funds. If `false`, the invoice is either expired or has already received `amount` of funds. |
| `description` | Yes | `String` | Human readable description of the invoice. |
| `createdAt` | No | `String` | |
| `expiresAt` | No | `String` | |
| Name | Optional | Type | Description |
| :------------ | :------- | :------- | :--------------------------------------------------------------------------------------------------- |
| `id` | No | `ID` | Unique ID for this invoice, randomly generated by Rafiki. |
| `accountId` | No | `String` | Id of the recipient's Open Payments account. |
| `amount` | No | `UInt64` | The amount that must be paid at the time the invoice is created, in base units of the account asset. |
| `received` | No | `UInt64` | The total amount received, in base units of the account asset. |
| `description` | Yes | `String` | Human readable description of the invoice. |
| `createdAt` | No | `String` | |
| `expiresAt` | No | `String` | |
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ exports.up = function (knex) {

table.timestamp('processAt').nullable()

table.integer('webhookAttempts').notNullable().defaultTo(0)

table.timestamp('createdAt').defaultTo(knex.fn.now())
table.timestamp('updatedAt').defaultTo(knex.fn.now())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ exports.up = function (knex) {
table.string('state').notNullable().index() // PaymentState
table.string('error').nullable()
table.integer('stateAttempts').notNullable().defaultTo(0)
table.string('webhookId').nullable()

table.string('intentPaymentPointer').nullable()
table.string('intentInvoiceUrl').nullable()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
exports.up = function (knex) {
return knex.schema.createTable('webhookEvents', function (table) {
table.uuid('id').notNullable().primary()

table.string('type').notNullable()
table.json('data').notNullable()
table.integer('attempts').notNullable().defaultTo(0)
table.integer('statusCode').nullable()

table.uuid('withdrawalAccountId').nullable()
table.uuid('withdrawalAssetId').nullable()
table.foreign('withdrawalAssetId').references('assets.id')
table.bigInteger('withdrawalAmount').nullable()

table.timestamp('processAt').nullable().defaultTo(knex.fn.now())

table.timestamp('createdAt').defaultTo(knex.fn.now())
table.timestamp('updatedAt').defaultTo(knex.fn.now())

table.index('processAt')
})
}

exports.down = function (knex) {
return knex.schema.dropTableIfExists('webhookEvents')
}
20 changes: 15 additions & 5 deletions packages/backend/src/accounting/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ describe('Accounting Service', (): void => {
let accountingService: AccountingService
let accountFactory: AccountFactory
let tigerbeetleContainer: StartedTestContainer
const timeout = BigInt(10e9) // 10 seconds
const timeout = BigInt(10_000) // 10 seconds
const messageProducer = new GraphileProducer()
const mockMessageProducer = {
send: jest.fn()
Expand Down Expand Up @@ -530,7 +530,15 @@ describe('Accounting Service', (): void => {
}
)

describe('Create', (): void => {
describe.each`
timeout | description
${undefined} | ${'single-phase'}
${timeout} | ${'two-phase'}
`('Create ($description)', ({ timeout }): void => {
beforeEach((): void => {
withdrawal.timeout = timeout
})

test('A withdrawal can be created', async (): Promise<void> => {
await expect(
accountingService.createWithdrawal(withdrawal)
Expand All @@ -540,7 +548,9 @@ describe('Accounting Service', (): void => {
).resolves.toEqual(startingBalance - withdrawal.amount)
await expect(
accountingService.getSettlementBalance(withdrawal.account.asset.unit)
).resolves.toEqual(startingBalance)
).resolves.toEqual(
timeout ? startingBalance : startingBalance - withdrawal.amount
)
})

test('Cannot create withdrawal with invalid id', async (): Promise<void> => {
Expand Down Expand Up @@ -649,7 +659,7 @@ describe('Accounting Service', (): void => {
const expiringWithdrawal = {
...withdrawal,
id: uuid(),
timeout: BigInt(1) // nano-second
timeout: BigInt(1)
}
await expect(
accountingService.createWithdrawal(expiringWithdrawal)
Expand Down Expand Up @@ -716,7 +726,7 @@ describe('Accounting Service', (): void => {
const expiringWithdrawal = {
...withdrawal,
id: uuid(),
timeout: BigInt(1) // nano-second
timeout: BigInt(1)
}
await expect(
accountingService.createWithdrawal(expiringWithdrawal)
Expand Down
73 changes: 43 additions & 30 deletions packages/backend/src/accounting/service.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import assert from 'assert'
import {
Client,
CreateAccountError as CreateAccountErrorCode
Expand Down Expand Up @@ -43,6 +44,7 @@ export interface LiquidityAccount {
id: string
unit: number
}
onCredit?: (balance: bigint) => Promise<LiquidityAccount>
}

export interface Deposit {
Expand All @@ -52,15 +54,15 @@ export interface Deposit {
}

export interface Withdrawal extends Deposit {
timeout: bigint
timeout?: bigint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This (and other) timeout parameters should be named with the unit; e.g. timeoutNano, timeoutMilli.
JS timeouts are typically milliseconds (e.g. setTimeout) but TigerBeetle uses nanoseconds. Since we need to use both (in different contexts), I think its worth disambiguating them in the property name so that there's no confusion.

Copy link
Contributor Author

@wilsonianb wilsonianb Feb 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we kept the name timeout but keep it consistent as milliseconds and convert to nano-seconds where passing to tigerbeetle-node?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 That works.

}

export interface TransferOptions {
sourceAccount: LiquidityAccount
destinationAccount: LiquidityAccount
sourceAmount: bigint
destinationAmount?: bigint
timeout: bigint // nano-seconds
timeout: bigint
}

export interface Transaction {
Expand Down Expand Up @@ -219,37 +221,49 @@ export async function createTransfer(
}
const transfers: Required<CreateTransferOptions>[] = []

// Same asset
if (sourceAccount.asset.unit === destinationAccount.asset.unit) {
const addTransfer = ({
sourceAccountId,
destinationAccountId,
amount
}: {
sourceAccountId: string
destinationAccountId: string
amount: bigint
}) => {
transfers.push({
id: uuid(),
sourceAccountId,
destinationAccountId,
amount,
timeout
})
}

// Same asset
if (sourceAccount.asset.unit === destinationAccount.asset.unit) {
addTransfer({
sourceAccountId: sourceAccount.id,
destinationAccountId: destinationAccount.id,
amount:
destinationAmount && destinationAmount < sourceAmount
? destinationAmount
: sourceAmount,
timeout
: sourceAmount
})
// Same asset, different amounts
if (destinationAmount && sourceAmount !== destinationAmount) {
// Send excess source amount to liquidity account
if (destinationAmount < sourceAmount) {
transfers.push({
id: uuid(),
addTransfer({
sourceAccountId: sourceAccount.id,
destinationAccountId: sourceAccount.asset.id,
amount: sourceAmount - destinationAmount,
timeout
amount: sourceAmount - destinationAmount
})
// Deliver excess destination amount from liquidity account
} else {
transfers.push({
id: uuid(),
addTransfer({
sourceAccountId: destinationAccount.asset.id,
destinationAccountId: destinationAccount.id,
amount: destinationAmount - sourceAmount,
timeout
amount: destinationAmount - sourceAmount
})
}
}
Expand All @@ -261,22 +275,16 @@ export async function createTransfer(
}
// Send to source liquidity account
// Deliver from destination liquidity account
transfers.push(
{
id: uuid(),
sourceAccountId: sourceAccount.id,
destinationAccountId: sourceAccount.asset.id,
amount: sourceAmount,
timeout
},
{
id: uuid(),
sourceAccountId: destinationAccount.asset.id,
destinationAccountId: destinationAccount.id,
amount: destinationAmount,
timeout
}
)
addTransfer({
sourceAccountId: sourceAccount.id,
destinationAccountId: sourceAccount.asset.id,
amount: sourceAmount
})
addTransfer({
sourceAccountId: destinationAccount.asset.id,
destinationAccountId: destinationAccount.id,
amount: destinationAmount
})
}
const error = await createTransfers(deps, transfers)
if (error) {
Expand Down Expand Up @@ -308,6 +316,11 @@ export async function createTransfer(
if (error) {
return error.error
}
if (destinationAccount.onCredit) {
const balance = await getAccountBalance(deps, destinationAccount.id)
assert.ok(balance !== undefined)
await destinationAccount.onCredit(balance)
}
},
rollback: async (): Promise<void | TransferError> => {
const error = await rollbackTransfers(
Expand Down
4 changes: 2 additions & 2 deletions packages/backend/src/accounting/transfers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export interface CreateTransferOptions {
sourceAccountId: AccountId
destinationAccountId: AccountId
amount: bigint
timeout?: bigint // nano-seconds
timeout?: bigint
}

export async function createTransfers(
Expand Down Expand Up @@ -57,7 +57,7 @@ export async function createTransfers(
reserved: TRANSFER_RESERVED,
code: 0,
flags,
timeout: transfer.timeout || BigInt(0),
timeout: transfer.timeout ? transfer.timeout * BigInt(10e6) : BigInt(0), // ms -> ns
timestamp: BigInt(0)
})
}
Expand Down
21 changes: 21 additions & 0 deletions packages/backend/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ export class App {
for (let i = 0; i < this.config.invoiceWorkers; i++) {
process.nextTick(() => this.processInvoice())
}
for (let i = 0; i < this.config.webhookWorkers; i++) {
process.nextTick(() => this.processWebhook())
}
}
}

Expand Down Expand Up @@ -288,4 +291,22 @@ export class App {
).unref()
})
}

private async processWebhook(): Promise<void> {
const webhookService = await this.container.use('webhookService')
return webhookService
.processNext()
.catch((err) => {
this.logger.warn({ error: err.message }, 'processWebhook error')
return true
})
Comment on lines +299 to +302
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a single one fails, does this whole process crash?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think returning true in the .catch causes the .then 👇 to be called

.then((hasMoreWork) => {
if (hasMoreWork) process.nextTick(() => this.processWebhook())
else
setTimeout(
() => this.processWebhook(),
this.config.webhookWorkerIdle
).unref()
})
}
}
2 changes: 2 additions & 0 deletions packages/backend/src/config/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ export const Config = {
invoiceWorkers: envInt('INVOICE_WORKERS', 1),
invoiceWorkerIdle: envInt('INVOICE_WORKER_IDLE', 200), // milliseconds

webhookWorkers: envInt('WEBHOOK_WORKERS', 1),
webhookWorkerIdle: envInt('WEBHOOK_WORKER_IDLE', 200), // milliseconds
webhookUrl: envString('WEBHOOK_URL', 'http://127.0.0.1:4001/webhook'),
webhookSecret: process.env.WEBHOOK_SECRET, // optional
webhookTimeout: envInt('WEBHOOK_TIMEOUT', 2000), // milliseconds
Expand Down
6 changes: 5 additions & 1 deletion packages/backend/src/connector/core/factories/account.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ const accountAttrs = {
id: Faker.datatype.uuid(),
code: assetCode,
scale: assetScale,
unit: Faker.datatype.number()
unit: Faker.datatype.number(),
asset: {
id: Faker.datatype.uuid(),
unit: Faker.datatype.number()
}
},
balance: 0n
}
Expand Down
2 changes: 0 additions & 2 deletions packages/backend/src/connector/core/middleware/balance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ export function createBalanceMiddleware(): ILPMiddleware {

if (response.fulfill) {
await trxOrError.commit()
// TODO: move handlePayment inside accountingServices's trxOrError.commit()
await services.invoices.handlePayment(accounts.outgoing.id)
} else {
await trxOrError.rollback()
}
Expand Down
Loading