Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge the devel branch into the main branch, v2.1.0 #103

Merged
merged 4 commits into from
Nov 21, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
| GAS_PRICE_SPEED_TYPE | This parameter specifies the desirable transaction speed | `instant` / `fast` / `standard` / `low` |
| GAS_PRICE_FACTOR | A value that will multiply the gas price of the oracle to convert it to gwei. If the oracle API returns gas prices in gwei then this can be set to `1`. Also, it could be used to intentionally pay more gas than suggested by the oracle to guarantee the transaction verification. E.g. `1.25` or `1.5`. | integer |
| GAS_PRICE_UPDATE_INTERVAL | Interval in milliseconds used to get the updated gas price value using specified estimation type | integer |
| GAS_PRICE_SURPLUS | A surplus to be added to fetched `gasPrice` on initial transaction submission. Default `0.1`. | float |
| MIN_GAS_PRICE_BUMP_FACTOR | Minimum `gasPrice` bump factor to meet RPC node requirements. Default `0.1`. | float |
| MAX_FEE_PER_GAS_LIMIT | Max limit on `maxFeePerGas` parameter for each transaction in wei | integer |
| MAX_SENT_QUEUE_SIZE | Maximum number of jobs waiting in the `sentTxQueue` at a time. | integer |
| START_BLOCK | The block number used to start searching for events when the relayer instance is run for the first time | integer
| EVENTS_PROCESSING_BATCH_SIZE | Batch size for one `eth_getLogs` request when reprocessing old logs. Defaults to `10000` | integer
| RELAYER_LOG_LEVEL | Log level | Winston log level |
Expand Down
457 changes: 242 additions & 215 deletions yarn.lock

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions zp-relayer/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@ const config = {
gasPriceSpeedType: (process.env.GAS_PRICE_SPEED_TYPE as GasPriceKey) || 'fast',
gasPriceFactor: parseInt(process.env.GAS_PRICE_FACTOR || '1'),
gasPriceUpdateInterval: parseInt(process.env.GAS_PRICE_UPDATE_INTERVAL || '5000'),
gasPriceSurplus: parseFloat(process.env.GAS_PRICE_SURPLUS || '0.1'),
minGasPriceBumpFactor: parseFloat(process.env.MIN_GAS_PRICE_BUMP_FACTOR || '0.1'),
maxFeeLimit: process.env.MAX_FEE_PER_GAS_LIMIT ? toBN(process.env.MAX_FEE_PER_GAS_LIMIT) : null,
maxSentQueueSize: parseInt(process.env.MAX_SENT_QUEUE_SIZE || '20'),
startBlock: parseInt(process.env.START_BLOCK || '0'),
eventsProcessingBatchSize: parseInt(process.env.EVENTS_PROCESSING_BATCH_SIZE || '10000'),
logLevel: process.env.RELAYER_LOG_LEVEL || 'debug',
redisUrl: process.env.RELAYER_REDIS_URL,
redisUrl: process.env.RELAYER_REDIS_URL as string,
rpcUrl: process.env.RPC_URL as string,
sentTxDelay: parseInt(process.env.SENT_TX_DELAY || '30000'),
permitDeadlineThresholdInitial: parseInt(process.env.PERMIT_DEADLINE_THRESHOLD_INITIAL || '300'),
permitDeadlineThresholdResend: parseInt(process.env.PERMIT_DEADLINE_THRESHOLD_RESEND || '10'),
}

export default config
108 changes: 93 additions & 15 deletions zp-relayer/endpoints.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
checkSendTransactionErrors,
checkSendTransactionsErrors,
} from './validation/validation'
import { sentTxQueue, SentTxState } from './queue/sentTxQueue'

async function sendTransactions(req: Request, res: Response, next: NextFunction) {
const errors = checkSendTransactionsErrors(req.body)
Expand Down Expand Up @@ -108,22 +109,99 @@ async function getTransactionsV2(req: Request, res: Response, next: NextFunction
}

async function getJob(req: Request, res: Response) {
enum JobStatus {
WAITING = 'waiting',
FAILED = 'failed',
SENT = 'sent',
REVERTED = 'reverted',
COMPLETED = 'completed',
}

interface GetJobResponse {
resolvedJobId: string
createdOn: number
failedReason: null | string
finishedOn: null | number
state: JobStatus
txHash: null | string
}

const jobId = req.params.id
const job = await poolTxQueue.getJob(jobId)
if (job) {
const state = await job.getState()
const txHash = job.returnvalue
const failedReason = job.failedReason
const createdOn = job.timestamp
const finishedOn = job.finishedOn

res.json({
state,
txHash,
failedReason,
createdOn,
finishedOn,
})

async function getPoolJobState(requestedJobId: string): Promise<GetJobResponse | null> {
const jobId = await pool.state.jobIdsMapping.get(requestedJobId)
let job = await poolTxQueue.getJob(jobId)
if (!job) return null

// Default result object
let result: GetJobResponse = {
resolvedJobId: jobId,
createdOn: job.timestamp,
failedReason: null,
finishedOn: null,
state: JobStatus.WAITING,
txHash: null,
}

const poolJobState = await job.getState()
if (poolJobState === 'completed') {
// Transaction was included in optimistic state, waiting to be mined
if (job.returnvalue === null) {
job = await poolTxQueue.getJob(jobId)
// Sanity check
if (!job || job.returnvalue === null) throw new Error('Internal job inconsistency')
}
const sentJobId = job.returnvalue[0][1]
let sentJob = await sentTxQueue.getJob(sentJobId)
// Should not happen here, but need to verify to be sure
if (!sentJob) throw new Error('Sent job not found')

const sentJobState = await sentJob.getState()
if (sentJobState === 'waiting' || sentJobState === 'active' || sentJobState === 'delayed') {
// Transaction is in re-send loop
const txHash = sentJob.data.prevAttempts.at(-1)?.[0]
result.state = JobStatus.SENT
result.txHash = txHash || null
} else if (sentJobState === 'completed') {
if (sentJob.returnvalue === null) {
sentJob = await sentTxQueue.getJob(sentJobId)
// Sanity check
if (!sentJob || sentJob.returnvalue === null) throw new Error('Internal job inconsistency')
}
const [txState, txHash] = sentJob.returnvalue
if (txState === SentTxState.MINED) {
// Transaction mined successfully
result.state = JobStatus.COMPLETED
result.txHash = txHash
result.finishedOn = sentJob.finishedOn || null
} else if (txState === SentTxState.REVERT) {
// Transaction reverted
result.state = JobStatus.REVERTED
result.txHash = txHash
result.finishedOn = sentJob.finishedOn || null
}
}
} else if (poolJobState === 'failed') {
// Either validation or tx sendind failed
if (!job.finishedOn) {
job = await poolTxQueue.getJob(jobId)
// Sanity check
if (!job || !job.finishedOn) throw new Error('Internal job inconsistency')
}
result.state = JobStatus.FAILED
result.failedReason = job.failedReason
result.finishedOn = job.finishedOn || null
}
// Other states mean that transaction is either waiting in queue
// or being processed by worker
// So, no need to update `result` object

return result
}

const jobState = await getPoolJobState(jobId)
if (jobState) {
res.json(jobState)
} else {
res.json(`Job ${jobId} not found`)
}
Expand Down
6 changes: 4 additions & 2 deletions zp-relayer/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { Mutex } from 'async-mutex'
import { createPoolTxWorker } from './workers/poolTxWorker'
import { createSentTxWorker } from './workers/sentTxWorker'
import { initializeDomain } from './utils/EIP712SaltedPermit'
import { redis } from './services/redisClient'
import { validateTx } from './validateTx'

export async function init() {
await initializeDomain(web3)
Expand All @@ -19,6 +21,6 @@ export async function init() {
})
await gasPriceService.start()
const workerMutex = new Mutex()
;(await createPoolTxWorker(gasPriceService, workerMutex)).run()
;(await createSentTxWorker(gasPriceService, workerMutex)).run()
;(await createPoolTxWorker(gasPriceService, validateTx, workerMutex, redis)).run()
;(await createSentTxWorker(gasPriceService, workerMutex, redis)).run()
}
12 changes: 8 additions & 4 deletions zp-relayer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,21 @@
"dev:worker": "ts-node poolTxWorker.ts",
"start:dev": "ts-node index.ts",
"start:prod": "node index.js",
"test": "ts-mocha --paths --timeout 1000000 test/**/*.test.ts"
"test:unit": "ts-mocha -r dotenv/config --paths --timeout 1000000 test/unit-tests/*.test.ts"
},
"dependencies": {
"@metamask/eth-sig-util": "^4.0.1",
"@mycrypto/gas-estimation": "^1.1.0",
"ajv": "8.11.0",
"async-mutex": "^0.3.2",
"bullmq": "1.83.0",
"bignumber.js": "9.1.0",
"bullmq": "3.0.0",
"cors": "^2.8.5",
"dotenv": "^10.0.0",
"express": "^4.17.1",
"express-winston": "4.2.0",
"gas-price-oracle": "0.5.1",
"ioredis": "4.27.10",
"ioredis": "5.2.4",
"libzkbob-rs-node": "0.1.27",
"node-fetch": "^2.6.1",
"promise-retry": "^2.0.1",
Expand All @@ -34,6 +35,7 @@
},
"devDependencies": {
"@types/chai": "^4.2.21",
"@types/chai-as-promised": "^7.1.5",
"@types/cors": "^2.8.12",
"@types/expect": "^24.3.0",
"@types/express": "^4.17.13",
Expand All @@ -43,8 +45,10 @@
"@types/node-fetch": "^2.5.12",
"@types/promise-retry": "^1.1.3",
"chai": "^4.3.4",
"chai-as-promised": "7.1.1",
"concurrently": "7.1.0",
"mocha": "^9.0.3",
"docker-compose": "0.23.17",
"mocha": "10.1.0",
"nodemon": "^2.0.12",
"npm-run-all": "^4.1.5",
"ts-mocha": "^8.0.0",
Expand Down
7 changes: 5 additions & 2 deletions zp-relayer/queue/poolTxQueue.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Queue } from 'bullmq'
import { redis } from '@/services/redisClient'
import { TX_QUEUE_NAME } from '@/utils/constants'
import { Proof } from 'libzkbob-rs-node'
import type { Proof } from 'libzkbob-rs-node'
import { TxType } from 'zp-memo-parser'

export interface TxPayload {
Expand All @@ -12,6 +12,9 @@ export interface TxPayload {
rawMemo: string
depositSignature: string | null
}
export const poolTxQueue = new Queue<TxPayload[], string>(TX_QUEUE_NAME, {

export type PoolTxResult = [string, string]

export const poolTxQueue = new Queue<TxPayload[], PoolTxResult[]>(TX_QUEUE_NAME, {
connection: redis,
})
24 changes: 9 additions & 15 deletions zp-relayer/queue/sentTxQueue.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,30 @@
import { Queue, QueueScheduler } from 'bullmq'
import { Queue } from 'bullmq'
import { redis } from '@/services/redisClient'
import { SENT_TX_QUEUE_NAME } from '@/utils/constants'
import type { TransactionConfig } from 'web3-core'
import { GasPriceValue } from '@/services/gas-price'
import { TxData, TxType } from 'zp-memo-parser'
import { TxPayload } from './poolTxQueue'

export type SendAttempt = [string, GasPriceValue]
export interface SentTxPayload {
txType: TxType
poolJobId: string
root: string
outCommit: string
commitIndex: number
txHash: string
prefixedMemo: string
truncatedMemo: string
txConfig: TransactionConfig
nullifier: string
gasPriceOptions: GasPriceValue
txData: TxData
txPayload: TxPayload
prevAttempts: SendAttempt[]
}

export enum SentTxState {
MINED = 'MINED',
REVERT = 'REVERT',
RESEND = 'RESEND',
FAILED = 'FAILED',
SKIPPED = 'SKIPPED',
}

export type SentTxResult = [SentTxState, string]

// Required for delayed jobs processing
const sentTxQueueScheduler = new QueueScheduler(SENT_TX_QUEUE_NAME, {
connection: redis,
})
export type SentTxResult = [SentTxState, string, string[]]

export const sentTxQueue = new Queue<SentTxPayload, SentTxResult>(SENT_TX_QUEUE_NAME, {
connection: redis,
Expand Down
Loading