diff --git a/CONFIGURATION.md b/CONFIGURATION.md index f734f33f..4ed48671 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -13,6 +13,7 @@ | TREE_UPDATE_PARAMS_PATH | Local path to tree update circuit parameters | string | | TRANSFER_PARAMS_PATH | Local path to transfer circuit parameters | string | | TX_VK_PATH | Local path to transaction circuit verification key | string | +| RELAYER_REQUEST_LOG_PATH | Path to a file where all HTTP request logs will be saved. Default `./zp.log`. | string | | STATE_DIR_PATH | Path to persistent state files related to tree and transactions storage. Default: `./POOL_STATE` | string | | GAS_PRICE_FALLBACK | Default fallback gas price | integer | | GAS_PRICE_ESTIMATION_TYPE | Gas price estimation type | `web3` / `gas-price-oracle` / `eip1559-gas-estimation` / `polygon-gasstation-v2` | @@ -32,7 +33,12 @@ | RELAYER_RPC_SYNC_STATE_CHECK_INTERVAL | Interval in milliseconds for checking JSON RPC sync state, by requesting the latest block number. Relayer will switch to the fallback JSON RPC in case sync process is stuck. If this variable is `0` sync state check is disabled. Defaults to `0` | integer | | INSUFFICIENT_BALANCE_CHECK_TIMEOUT | Interval in milliseconds to check for relayer balance update if transaction send failed with insufficient balance error. Default `60000` | integer | | SENT_TX_DELAY | Delay in milliseconds for sentTxWorker to verify submitted transactions | integer | +| SENT_TX_ERROR_THRESHOLD | Maximum number of re-sends which is considered to be normal. After this threshold each re-send will log a corresponding error (but re-send loop will continue). Defaults to `3`. | integer | | PERMIT_DEADLINE_THRESHOLD_INITIAL | Minimum time threshold in seconds for permit signature deadline to be valid (before initial transaction submission) | integer | | PERMIT_DEADLINE_THRESHOLD_RESEND | Minimum time threshold in seconds for permit signature deadline to be valid (for re-send attempts) | integer | | RELAYER_REQUIRE_TRACE_ID | If set to `true`, then requests to relayer (except `/info`, `/version`, `/params/hash/tree`, `/params/hash/tx`) without `zkbob-support-id` header will be rejected. | boolean | -| RELAYER_REQUIRE_HTTPS | If set to `true`, then RPC URL(s) must be in HTTPS format. HTTP RPC URL(s) should be used in test environment only. | boolean | \ No newline at end of file +| RELAYER_REQUIRE_HTTPS | If set to `true`, then RPC URL(s) must be in HTTPS format. HTTP RPC URL(s) should be used in test environment only. | boolean | +| RELAYER_LOG_IGNORE_ROUTES | List of space separated relayer endpoints for which request logging will be suppressed. E.g. `/fee /version` | string(s) | +| RELAYER_LOG_HEADER_BLACKLIST | List of space separated HTTP headers which will be suppressed in request logs. E.g. `content-length content-type` | string(s) | +| RELAYER_SCREENER_URL | Screener service URL | URL | +| RELAYER_SCREENER_TOKEN | Authorization token for screener service | string | diff --git a/README.md b/README.md index 5265470a..98faaa64 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,11 @@ You can use a pre-built [image](https://github.com/zkBob/zeropool-relayer/pkgs/c cd zp-relayer && yarn test:unit ``` +**Worker tests** +To run worker tests you need Docker and docker-compose installed locally. +```bash +cd zp-relayer && yarn test:worker +``` ## Workflow diff --git a/zp-relayer/config.ts b/zp-relayer/config.ts index e2016814..b8aace22 100644 --- a/zp-relayer/config.ts +++ b/zp-relayer/config.ts @@ -7,6 +7,9 @@ const relayerAddress = new Web3().eth.accounts.privateKeyToAccount( process.env.RELAYER_ADDRESS_PRIVATE_KEY as string ).address +const defaultHeaderBlacklist = + 'accept accept-language accept-encoding connection content-length content-type postman-token referer upgrade-insecure-requests' + const config = { relayerRef: process.env.RELAYER_REF || null, relayerSHA: process.env.RELAYER_SHA || null, @@ -21,6 +24,7 @@ const config = { treeUpdateParamsPath: process.env.TREE_UPDATE_PARAMS_PATH || './params/tree_params.bin', transferParamsPath: process.env.TRANSFER_PARAMS_PATH || './params/transfer_params.bin', txVKPath: process.env.TX_VK_PATH || './params/transfer_verification_key.json', + requestLogPath: process.env.RELAYER_REQUEST_LOG_PATH || './zp.log', stateDirPath: process.env.STATE_DIR_PATH || './POOL_STATE', gasPriceFallback: process.env.GAS_PRICE_FALLBACK as string, gasPriceEstimationType: (process.env.GAS_PRICE_ESTIMATION_TYPE as EstimationType) || 'web3', @@ -38,15 +42,23 @@ const config = { rpcUrls: (process.env.RPC_URL as string).split(' ').filter(url => url.length > 0), relayerTxRedundancy: process.env.RELAYER_TX_REDUNDANCY === 'true', sentTxDelay: parseInt(process.env.SENT_TX_DELAY || '30000'), + sentTxLogErrorThreshold: parseInt(process.env.SENT_TX_ERROR_THRESHOLD || '3'), rpcRequestTimeout: parseInt(process.env.RPC_REQUEST_TIMEOUT || '1000'), insufficientBalanceCheckTimeout: parseInt(process.env.INSUFFICIENT_BALANCE_CHECK_TIMEOUT || '60000'), rpcSyncCheckInterval: parseInt(process.env.RELAYER_RPC_SYNC_STATE_CHECK_INTERVAL || '0'), permitDeadlineThresholdInitial: parseInt(process.env.PERMIT_DEADLINE_THRESHOLD_INITIAL || '300'), - relayerJsonRpcErrorCodes: (process.env.RELAYER_JSONRPC_ERROR_CODES || '-32603,-32002,-32005') - .split(',') + relayerJsonRpcErrorCodes: (process.env.RELAYER_JSONRPC_ERROR_CODES || '-32603 -32002 -32005') + .split(' ') + .filter(s => s.length > 0) .map(s => parseInt(s, 10)), requireTraceId: process.env.RELAYER_REQUIRE_TRACE_ID === 'true', requireHTTPS: process.env.RELAYER_REQUIRE_HTTPS === 'true', + logIgnoreRoutes: (process.env.RELAYER_LOG_IGNORE_ROUTES || '').split(' ').filter(r => r.length > 0), + logHeaderBlacklist: (process.env.RELAYER_LOG_HEADER_BLACKLIST || defaultHeaderBlacklist) + .split(' ') + .filter(r => r.length > 0), + screenerUrl: process.env.RELAYER_SCREENER_URL || null, + screenerToken: process.env.RELAYER_SCREENER_TOKEN || null, } export default config diff --git a/zp-relayer/index.ts b/zp-relayer/index.ts index 3cffa0b8..4f3ef5e4 100644 --- a/zp-relayer/index.ts +++ b/zp-relayer/index.ts @@ -2,13 +2,14 @@ import './env' import express from 'express' import router from './router' import { logger } from './services/appLogger' -import { createLoggerMiddleware } from './services/loggerMiddleware' +import { createConsoleLoggerMiddleware, createPersistentLoggerMiddleware } from './services/loggerMiddleware' import config from './config' import { init } from './init' const app = express() -app.use(createLoggerMiddleware('zp.log')) +app.use(createPersistentLoggerMiddleware(config.requestLogPath)) +app.use(createConsoleLoggerMiddleware()) app.use(router) diff --git a/zp-relayer/pool.ts b/zp-relayer/pool.ts index f6c345dd..25ca8603 100644 --- a/zp-relayer/pool.ts +++ b/zp-relayer/pool.ts @@ -4,7 +4,7 @@ import crypto from 'crypto' import BN from 'bn.js' import PoolAbi from './abi/pool-abi.json' import { AbiItem, toBN } from 'web3-utils' -import { Contract } from 'web3-eth-contract' +import type { Contract } from 'web3-eth-contract' import config from './config' import { web3 } from './services/web3' import { logger } from './services/appLogger' @@ -14,8 +14,8 @@ import { getBlockNumber, getEvents, getTransaction } from './utils/web3' import { Helpers, Params, Proof, SnarkProof, VK } from 'libzkbob-rs-node' import { PoolState } from './state/PoolState' -import { TxType } from 'zp-memo-parser' -import { numToHex, toTxType, truncateHexPrefix, truncateMemoTxPrefix } from './utils/helpers' +import type { TxType } from 'zp-memo-parser' +import { contractCallRetry, numToHex, toTxType, truncateHexPrefix, truncateMemoTxPrefix } from './utils/helpers' import { PoolCalldataParser } from './utils/PoolCalldataParser' import { OUTPLUSONE } from './utils/constants' @@ -73,6 +73,7 @@ class Pool { public state: PoolState public optimisticState: PoolState public denominator: BN = toBN(1) + public poolId: BN = toBN(0) public isInitialized = false constructor() { @@ -105,6 +106,8 @@ class Pool { if (this.isInitialized) return this.denominator = toBN(await this.PoolInstance.methods.denominator().call()) + this.poolId = toBN(await this.PoolInstance.methods.pool_id().call()) + await this.syncState(config.startBlock) this.isInitialized = true } @@ -216,7 +219,7 @@ class Pool { } async getContractIndex() { - const poolIndex = await this.PoolInstance.methods.pool_index().call() + const poolIndex = await contractCallRetry(this.PoolInstance, 'pool_index') return Number(poolIndex) } @@ -225,12 +228,12 @@ class Pool { index = await this.getContractIndex() logger.info('CONTRACT INDEX %d', index) } - const root = await this.PoolInstance.methods.roots(index).call() + const root = await contractCallRetry(this.PoolInstance, 'roots', [index]) return root.toString() } async getLimitsFor(address: string): Promise { - const limits = await this.PoolInstance.methods.getLimitsFor(address).call() + const limits = await contractCallRetry(this.PoolInstance, 'getLimitsFor', [address]) return { tvlCap: toBN(limits.tvlCap), tvl: toBN(limits.tvl), diff --git a/zp-relayer/router.ts b/zp-relayer/router.ts index 51a74343..c7d91764 100644 --- a/zp-relayer/router.ts +++ b/zp-relayer/router.ts @@ -52,14 +52,14 @@ router.get('/params/hash/tree', wrapErr(endpoints.getParamsHash('tree'))) router.get('/params/hash/tx', wrapErr(endpoints.getParamsHash('transfer'))) // Error handler middleware -router.use((err: any, req: Request, res: Response, next: NextFunction) => { - if (err instanceof ValidationError) { - const validationErrors = err.validationErrors - logger.info('Request errors: %o', validationErrors, { path: req.path }) +router.use((error: any, req: Request, res: Response) => { + if (error instanceof ValidationError) { + const validationErrors = error.validationErrors + logger.warn('Validation errors', { errors: validationErrors, path: req.path }) res.status(400).json(validationErrors) - next() } else { - next(err) + logger.error('Internal error', { error, path: req.path }) + res.status(500).send('Internal server error') } }) diff --git a/zp-relayer/services/loggerMiddleware.ts b/zp-relayer/services/loggerMiddleware.ts index 3969fb90..de268ef5 100644 --- a/zp-relayer/services/loggerMiddleware.ts +++ b/zp-relayer/services/loggerMiddleware.ts @@ -1,9 +1,21 @@ -import winston from 'winston' +import { format, transports } from 'winston' import expressWinston from 'express-winston' +import config from '@/config' +import { logger } from './appLogger' -export function createLoggerMiddleware(filename: string = 'zp.log') { +export function createPersistentLoggerMiddleware(filename: string = 'zp.log') { return expressWinston.logger({ - transports: [new winston.transports.File({ filename })], - format: winston.format.combine(winston.format.json()), + transports: [new transports.File({ filename })], + format: format.combine(format.json()), + }) +} + +export function createConsoleLoggerMiddleware() { + return expressWinston.logger({ + winstonInstance: logger, + level: 'debug', + ignoredRoutes: config.logIgnoreRoutes, + headerBlacklist: config.logHeaderBlacklist, + requestWhitelist: ['headers', 'httpVersion'], }) } diff --git a/zp-relayer/services/providers/HttpListProvider.ts b/zp-relayer/services/providers/HttpListProvider.ts index c3723d49..b94d306e 100644 --- a/zp-relayer/services/providers/HttpListProvider.ts +++ b/zp-relayer/services/providers/HttpListProvider.ts @@ -109,7 +109,7 @@ export default class HttpListProvider extends BaseHttpProvider { } private async trySend(payload: any, initialIndex: number) { - const errors: any = [] + const errors: Error[] = [] for (let count = 0; count < this.urls.length; count++) { const index = (initialIndex + count) % this.urls.length @@ -122,9 +122,12 @@ export default class HttpListProvider extends BaseHttpProvider { const url = this.urls[index] try { const result = await this._send(url, payload, this.options) + if (errors.length > 0) { + logger.warn('RPCs request errors', { errors: errors.map(e => e.message) }) + } return [result, index] } catch (e) { - errors.push(e) + errors.push(e as Error) } } diff --git a/zp-relayer/test/worker-tests/poolWorker.test.ts b/zp-relayer/test/worker-tests/poolWorker.test.ts index 952f4944..f8f32bcf 100644 --- a/zp-relayer/test/worker-tests/poolWorker.test.ts +++ b/zp-relayer/test/worker-tests/poolWorker.test.ts @@ -99,7 +99,7 @@ describe('poolWorker', () => { gasPriceService.stop() }) - async function expectJobFinished(job: Job) { + async function expectJobFinished(job: Job) { const [[initialHash, sentId]] = await job.waitUntilFinished(poolQueueEvents) expect(initialHash.length).eq(66) diff --git a/zp-relayer/utils/helpers.ts b/zp-relayer/utils/helpers.ts index d0cc8f45..9953d6a4 100644 --- a/zp-relayer/utils/helpers.ts +++ b/zp-relayer/utils/helpers.ts @@ -1,4 +1,5 @@ import type Web3 from 'web3' +import type { Contract } from 'web3-eth-contract' import type BN from 'bn.js' import { padLeft, toBN } from 'web3-utils' import { logger } from '@/services/appLogger' @@ -6,6 +7,7 @@ import type { SnarkProof } from 'libzkbob-rs-node' import { TxType } from 'zp-memo-parser' import type { Mutex } from 'async-mutex' import promiseRetry from 'promise-retry' +import { isContractCallError } from './web3Errors' const S_MASK = toBN('0x7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff') const S_MAX = toBN('0x7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF5D576E7357A4501DDFE92F46681B20A0') @@ -103,11 +105,14 @@ export async function setIntervalAndRun(f: () => Promise | void, interval: return handler } -export function withMutex(mutex: Mutex, f: () => Promise): () => Promise { - return async () => { +export function withMutex any>( + mutex: Mutex, + f: F +): (...args: Parameters) => Promise>> { + return async (...args) => { const release = await mutex.acquire() try { - return await f() + return await f(...args) } finally { release() } @@ -136,12 +141,16 @@ function sleep(ms: number) { return new Promise(resolve => setTimeout(resolve, ms)) } -export function withLoop(f: () => Promise, timeout: number, suppressedErrors: string[] = []): () => Promise { - // @ts-ignore +export function withLoop any>( + f: F, + timeout: number, + suppressedErrors: string[] = [] +): () => Promise>> { return async () => { + let i = 1 while (1) { try { - return await f() + return await f(i++) } catch (e) { const err = e as Error let isSuppressed = false @@ -169,7 +178,7 @@ export function waitForFunds( address: string, cb: (balance: BN) => void, minimumBalance: BN, - timeout: number, + timeout: number ) { return promiseRetry( async retry => { @@ -204,3 +213,25 @@ export function checkHTTPS(isRequired: boolean) { } } } + +export function contractCallRetry(contract: Contract, method: string, args: any[] = []) { + return promiseRetry( + async retry => { + try { + return await contract.methods[method](...args).call() + } catch (e) { + if (isContractCallError(e as Error)) { + logger.warn('Retrying failed contract call', { method, args }) + retry(e) + } else { + throw e + } + } + }, + { + retries: 2, + minTimeout: 500, + maxTimeout: 500, + } + ) +} diff --git a/zp-relayer/utils/web3Errors.ts b/zp-relayer/utils/web3Errors.ts index b4b0bc42..a74a822e 100644 --- a/zp-relayer/utils/web3Errors.ts +++ b/zp-relayer/utils/web3Errors.ts @@ -28,3 +28,8 @@ export function isInsufficientBalanceError(e: Error) { const message = e.message.toLowerCase() return message.includes('insufficient funds') } + +export function isContractCallError(e: Error) { + const message = e.message.toLowerCase() + return message.includes('did it run out of gas') +} diff --git a/zp-relayer/validateTx.ts b/zp-relayer/validateTx.ts index 7911d723..2194b26e 100644 --- a/zp-relayer/validateTx.ts +++ b/zp-relayer/validateTx.ts @@ -8,9 +8,9 @@ import type { Limits, Pool } from './pool' import type { NullifierSet } from './state/nullifierSet' import TokenAbi from './abi/token-abi.json' import { web3 } from './services/web3' -import { numToHex, unpackSignature } from './utils/helpers' +import { contractCallRetry, numToHex, unpackSignature } from './utils/helpers' import { recoverSaltedPermit } from './utils/EIP712SaltedPermit' -import { ZERO_ADDRESS } from './utils/constants' +import { ZERO_ADDRESS, TRACE_ID } from './utils/constants' import { TxPayload } from './queue/poolTxQueue' import { getTxProofField, parseDelta } from './utils/proofInputs' import type { PoolState } from './state/PoolState' @@ -39,7 +39,7 @@ export function checkSize(data: string, size: number) { } export async function checkBalance(address: string, minBalance: string) { - const balance = await tokenContract.methods.balanceOf(address).call() + const balance = await contractCallRetry(tokenContract, 'balanceOf', [address]) const res = toBN(balance).gte(toBN(minBalance)) if (!res) { return new TxValidationError('Not enough balance for deposit') @@ -175,7 +175,7 @@ async function getRecoveredAddress( const { deadline, holder } = txData as PermittableDepositTxData const owner = web3.utils.toChecksumAddress(web3.utils.bytesToHex(Array.from(holder))) const spender = web3.utils.toChecksumAddress(config.poolAddress as string) - const nonce = await tokenContract.methods.nonces(owner).call() + const nonce = await contractCallRetry(tokenContract, 'nonces', [owner]) const message = { owner, @@ -196,7 +196,7 @@ async function getRecoveredAddress( return recoveredAddress } -async function checkRoot(proofIndex: BN, proofRoot: string, state: PoolState) { +function checkRoot(proofIndex: BN, proofRoot: string, state: PoolState) { const index = proofIndex.toNumber() const stateRoot = state.getMerkleRootAt(index) @@ -207,7 +207,50 @@ async function checkRoot(proofIndex: BN, proofRoot: string, state: PoolState) { return null } -export async function validateTx({ txType, rawMemo, txProof, depositSignature }: TxPayload, pool: Pool) { +function checkPoolId(deltaPoolId: BN, contractPoolId: BN) { + if (deltaPoolId.eq(contractPoolId)) { + return null + } + return new TxValidationError(`Incorrect poolId: given ${deltaPoolId}, expected ${contractPoolId}`) +} + +async function checkScreener(address: string, traceId?: string) { + if (config.screenerUrl === null || config.screenerToken === null) { + return null + } + + const ACC_VALIDATION_FAILED = 'Internal account validation failed' + + const headers: Record = { + 'Content-type': 'application/json', + 'Authorization': `Bearer ${config.screenerToken}`, + } + + if (traceId) headers[TRACE_ID] = traceId + + try { + const rawResponse = await fetch(config.screenerUrl, { + method: 'POST', + headers, + body: JSON.stringify({ address }), + }) + const response = await rawResponse.json() + if (response.result === true) { + return new TxValidationError(ACC_VALIDATION_FAILED) + } + } catch (e) { + logger.error('Request to screener failed', { error: (e as Error).message }) + return new TxValidationError(ACC_VALIDATION_FAILED) + } + + return null +} + +export async function validateTx( + { txType, rawMemo, txProof, depositSignature }: TxPayload, + pool: Pool, + traceId?: string +) { const buf = Buffer.from(rawMemo, 'hex') const txData = getTxData(buf, txType) @@ -223,44 +266,42 @@ export async function validateTx({ txType, rawMemo, txProof, depositSignature }: fee.toString(10) ) - // prettier-ignore - await checkAssertion(() => checkRoot( - delta.transferIndex, - root, - pool.optimisticState, - )) + await checkAssertion(() => checkPoolId(delta.poolId, pool.poolId)) + await checkAssertion(() => checkRoot(delta.transferIndex, root, pool.optimisticState)) await checkAssertion(() => checkNullifier(nullifier, pool.state.nullifiers)) await checkAssertion(() => checkNullifier(nullifier, pool.optimisticState.nullifiers)) await checkAssertion(() => checkTransferIndex(toBN(pool.optimisticState.getNextIndex()), delta.transferIndex)) - await checkAssertion(() => checkFee(fee)) - - if (txType === TxType.WITHDRAWAL) { - const { nativeAmount, receiver } = txData as WithdrawTxData - const receiverAddress = web3.utils.bytesToHex(Array.from(receiver)) - logger.info('Withdraw address: %s', receiverAddress) - await checkAssertion(() => checkNonZeroWithdrawAddress(receiverAddress)) - await checkAssertion(() => checkNativeAmount(toBN(nativeAmount))) - } - await checkAssertion(() => checkProof(txProof, (p, i) => pool.verifyProof(p, i))) const tokenAmountWithFee = delta.tokenAmount.add(fee) await checkAssertion(() => checkTxSpecificFields(txType, tokenAmountWithFee, delta.energyAmount)) - const requiredTokenAmount = tokenAmountWithFee.mul(pool.denominator) let userAddress = ZERO_ADDRESS - if (txType === TxType.DEPOSIT || txType === TxType.PERMITTABLE_DEPOSIT) { + + if (txType === TxType.WITHDRAWAL) { + const { nativeAmount, receiver } = txData as WithdrawTxData + userAddress = web3.utils.bytesToHex(Array.from(receiver)) + logger.info('Withdraw address: %s', userAddress) + await checkAssertion(() => checkNonZeroWithdrawAddress(userAddress)) + await checkAssertion(() => checkNativeAmount(toBN(nativeAmount))) + } else if (txType === TxType.DEPOSIT || txType === TxType.PERMITTABLE_DEPOSIT) { + const requiredTokenAmount = tokenAmountWithFee.mul(pool.denominator) userAddress = await getRecoveredAddress(txType, nullifier, txData, requiredTokenAmount, depositSignature) logger.info('Deposit address: %s', userAddress) await checkAssertion(() => checkDepositEnoughBalance(userAddress, requiredTokenAmount)) } + + const limits = await pool.getLimitsFor(userAddress) + await checkAssertion(() => checkLimits(limits, delta.tokenAmount)) + if (txType === TxType.PERMITTABLE_DEPOSIT) { const { deadline } = txData as PermittableDepositTxData logger.info('Deadline: %s', deadline) await checkAssertion(() => checkDeadline(toBN(deadline), config.permitDeadlineThresholdInitial)) } - const limits = await pool.getLimitsFor(userAddress) - await checkAssertion(() => checkLimits(limits, delta.tokenAmount)) + if (txType === TxType.DEPOSIT || txType === TxType.PERMITTABLE_DEPOSIT || txType === TxType.WITHDRAWAL) { + await checkAssertion(() => checkScreener(userAddress, traceId)) + } } diff --git a/zp-relayer/workers/poolTxWorker.ts b/zp-relayer/workers/poolTxWorker.ts index 0553c1dd..39ae4cf1 100644 --- a/zp-relayer/workers/poolTxWorker.ts +++ b/zp-relayer/workers/poolTxWorker.ts @@ -21,7 +21,7 @@ import { TxValidationError } from '@/validateTx' export async function createPoolTxWorker( gasPrice: GasPrice, - validateTx: (tx: TxPayload, pool: Pool) => Promise, + validateTx: (tx: TxPayload, pool: Pool, traceId?: string) => Promise, mutex: Mutex, redis: Redis ) { @@ -53,7 +53,7 @@ export async function createPoolTxWorker( for (const tx of txs) { const { gas, amount, rawMemo, txType, txProof } = tx - await validateTx(tx, pool) + await validateTx(tx, pool, traceId) const { data, commitIndex, rootAfter } = await processTx(tx) @@ -138,10 +138,11 @@ export async function createPoolTxWorker( const poolTxWorker = new Worker( TX_QUEUE_NAME, - job => withErrorLog( - withMutex(mutex, () => poolTxWorkerProcessor(job)), - [TxValidationError] - ), + job => + withErrorLog( + withMutex(mutex, () => poolTxWorkerProcessor(job)), + [TxValidationError] + ), WORKER_OPTIONS ) diff --git a/zp-relayer/workers/sentTxWorker.ts b/zp-relayer/workers/sentTxWorker.ts index 6d5439fd..42bd80b5 100644 --- a/zp-relayer/workers/sentTxWorker.ts +++ b/zp-relayer/workers/sentTxWorker.ts @@ -233,8 +233,8 @@ export async function createSentTxWorker(gasPrice: Gas return [null, true] } - const sentTxWorkerProcessor = async (job: Job) => { - const jobLogger = workerLogger.child({ jobId: job.id, traceId: job.data.traceId }) + const sentTxWorkerProcessor = async (job: Job, resendNum: number = 1) => { + const jobLogger = workerLogger.child({ jobId: job.id, traceId: job.data.traceId, resendNum }) jobLogger.info('Verifying job %s', job.data.poolJobId) const { prevAttempts, txConfig } = job.data @@ -245,12 +245,17 @@ export async function createSentTxWorker(gasPrice: Gas if (shouldReprocess) { // TODO: handle this case later + jobLogger.warn('Ambiguity detected: nonce increased but no respond that transaction was mined') // Error should be caught by `withLoop` to re-run job - throw new Error('Ambiguity detected: nonce increased but no respond that transaction was mined') + throw new Error(RECHECK_ERROR) } if (!tx) { // Resend with updated gas price + if (resendNum > config.sentTxLogErrorThreshold) { + jobLogger.error('Too many unsuccessful re-sends') + } + await handleResend(txConfig, gasPrice, job, jobLogger) // Tx re-send successful @@ -271,7 +276,7 @@ export async function createSentTxWorker(gasPrice: Gas job => withErrorLog( withLoop( - withMutex(mutex, () => sentTxWorkerProcessor(job)), + withMutex(mutex, (i: number) => sentTxWorkerProcessor(job, i)), config.sentTxDelay, [RECHECK_ERROR] )