From c0befcee8b5f98f513b338fe4dfa2f2f2a65cf0b Mon Sep 17 00:00:00 2001 From: Leonid Tyurin Date: Mon, 2 Jan 2023 12:38:30 +0400 Subject: [PATCH 01/10] Change RootSet approach to getRootAt (#119) --- zp-relayer/pool.ts | 11 --------- zp-relayer/state/PoolState.ts | 14 ++++++++--- zp-relayer/state/rootSet.ts | 23 ------------------ zp-relayer/validateTx.ts | 39 ++++++------------------------ zp-relayer/workers/poolTxWorker.ts | 5 ---- zp-relayer/workers/sentTxWorker.ts | 9 ------- 6 files changed, 18 insertions(+), 83 deletions(-) delete mode 100644 zp-relayer/state/rootSet.ts diff --git a/zp-relayer/pool.ts b/zp-relayer/pool.ts index 3d639970..20389dcf 100644 --- a/zp-relayer/pool.ts +++ b/zp-relayer/pool.ts @@ -147,11 +147,6 @@ class Pool { return } - // Set initial root - await this.state.roots.add({ - 0: INIT_ROOT, - }) - const numTxs = Math.floor((contractIndex - localIndex) / OUTPLUSONE) const missedIndices = Array(numTxs) for (let i = 0; i < numTxs; i++) { @@ -206,12 +201,6 @@ class Pool { // Save nullifier in confirmed state const nullifier = parser.getField('nullifier') await this.state.nullifiers.add([web3.utils.hexToNumberString(nullifier)]) - - // Save root in confirmed state - const root = this.state.getMerkleRoot() - await this.state.roots.add({ - [newPoolIndex]: root, - }) } } diff --git a/zp-relayer/state/PoolState.ts b/zp-relayer/state/PoolState.ts index 90542312..5b025a55 100644 --- a/zp-relayer/state/PoolState.ts +++ b/zp-relayer/state/PoolState.ts @@ -3,21 +3,18 @@ import { logger } from '@/services/appLogger' import { OUTPLUSONE } from '@/utils/constants' import { MerkleTree, TxStorage, MerkleProof, Constants, Helpers } from 'libzkbob-rs-node' import { NullifierSet } from './nullifierSet' -import { RootSet } from './rootSet' import { JobIdsMapping } from './jobIdsMapping' export class PoolState { private tree: MerkleTree private txs: TxStorage public nullifiers: NullifierSet - public roots: RootSet public jobIdsMapping: JobIdsMapping constructor(private name: string, redis: Redis, path: string) { this.tree = new MerkleTree(`${path}/${name}Tree.db`) this.txs = new TxStorage(`${path}/${name}Txs.db`) this.nullifiers = new NullifierSet(`${name}-nullifiers`, redis) - this.roots = new RootSet(`${name}-roots`, redis) // This structure can be shared among different pool states // So, use constant name this.jobIdsMapping = new JobIdsMapping('job-id-mapping', redis) @@ -87,7 +84,16 @@ export class PoolState { return [outCommit, memo] } - getMerkleRoot(): string { + getMerkleRootAt(index: number): string | null { + try { + return this.tree.getRootAt(index) + } catch (e) { + logger.error(`Error getting Merkle root`, { index, error: (e as Error).message }) + return null + } + } + + getMerkleRoot() { return this.tree.getRoot() } diff --git a/zp-relayer/state/rootSet.ts b/zp-relayer/state/rootSet.ts deleted file mode 100644 index 3636aa9b..00000000 --- a/zp-relayer/state/rootSet.ts +++ /dev/null @@ -1,23 +0,0 @@ -import type { Redis } from 'ioredis' - -export class RootSet { - constructor(public name: string, private redis: Redis) {} - - async add(roots: Record) { - if (Object.keys(roots).length === 0) return - await this.redis.hset(this.name, roots) - } - - async remove(indices: string[]) { - if (indices.length === 0) return - await this.redis.hdel(this.name, ...indices) - } - - async get(index: string) { - return this.redis.hget(this.name, index) - } - - async clear() { - await this.redis.del(this.name) - } -} diff --git a/zp-relayer/validateTx.ts b/zp-relayer/validateTx.ts index 11cd499a..2f16e4e2 100644 --- a/zp-relayer/validateTx.ts +++ b/zp-relayer/validateTx.ts @@ -5,7 +5,7 @@ import { Proof, SnarkProof } from 'libzkbob-rs-node' import { logger } from './services/appLogger' import config from './config' import type { Limits, Pool } from './pool' -import { NullifierSet } from './state/nullifierSet' +import type { NullifierSet } from './state/nullifierSet' import TokenAbi from './abi/token-abi.json' import { web3 } from './services/web3' import { numToHex, unpackSignature } from './utils/helpers' @@ -13,7 +13,7 @@ import { recoverSaltedPermit } from './utils/EIP712SaltedPermit' import { ZERO_ADDRESS } from './utils/constants' import { TxPayload } from './queue/poolTxQueue' import { getTxProofField, parseDelta } from './utils/proofInputs' -import { RootSet } from './state/rootSet' +import type { PoolState } from './state/PoolState' const tokenContract = new web3.eth.Contract(TokenAbi as AbiItem[], config.tokenAddress) @@ -195,33 +195,12 @@ async function getRecoveredAddress( return recoveredAddress } -async function checkRoot( - proofIndex: BN, - proofRoot: string, - poolSet: RootSet, - optimisticSet: RootSet, - contractFallback: (i: string) => Promise -) { - const indexStr = proofIndex.toString(10) - - // Lookup root in cache - let isPresent = true - let root = (await poolSet.get(indexStr)) || (await optimisticSet.get(indexStr)) - - // Get root from contract if not found in cache - if (root === null) { - logger.info('Getting root from contract...') - root = await contractFallback(indexStr) - isPresent = false - } - - if (root !== proofRoot) { - return new TxValidationError(`Incorrect root at index ${indexStr}: given ${proofRoot}, expected ${root}`) - } +async function checkRoot(proofIndex: BN, proofRoot: string, state: PoolState) { + const index = proofIndex.toNumber() - // If received correct root from contract update cache (only confirmed state) - if (!isPresent) { - await poolSet.add({ [proofIndex.toNumber()]: root }) + const stateRoot = state.getMerkleRootAt(index) + if (stateRoot !== proofRoot) { + return new TxValidationError(`Incorrect root at index ${index}: given ${proofRoot}, expected ${stateRoot}`) } return null @@ -247,9 +226,7 @@ export async function validateTx({ txType, rawMemo, txProof, depositSignature }: await checkAssertion(() => checkRoot( delta.transferIndex, root, - pool.state.roots, - pool.optimisticState.roots, - i => pool.getContractMerkleRoot(i) + pool.optimisticState, )) await checkAssertion(() => checkNullifier(nullifier, pool.state.nullifiers)) await checkAssertion(() => checkNullifier(nullifier, pool.optimisticState.nullifiers)) diff --git a/zp-relayer/workers/poolTxWorker.ts b/zp-relayer/workers/poolTxWorker.ts index a50cc137..0340a2fd 100644 --- a/zp-relayer/workers/poolTxWorker.ts +++ b/zp-relayer/workers/poolTxWorker.ts @@ -91,11 +91,6 @@ export async function createPoolTxWorker( pool.optimisticState.updateState(commitIndex, outCommit, prefixedMemo) logger.debug('Adding nullifier %s to OS', nullifier) await pool.optimisticState.nullifiers.add([nullifier]) - const poolIndex = (commitIndex + 1) * OUTPLUSONE - logger.debug('Adding root %s at %s to OS', rootAfter, poolIndex) - await pool.optimisticState.roots.add({ - [poolIndex]: rootAfter, - }) const sentJob = await sentTxQueue.add( txHash, diff --git a/zp-relayer/workers/sentTxWorker.ts b/zp-relayer/workers/sentTxWorker.ts index 3975d214..00591fbf 100644 --- a/zp-relayer/workers/sentTxWorker.ts +++ b/zp-relayer/workers/sentTxWorker.ts @@ -33,8 +33,6 @@ async function clearOptimisticState() { pool.optimisticState.rollbackTo(pool.state) logger.info('Clearing optimistic nullifiers...') await pool.optimisticState.nullifiers.clear() - logger.info('Clearing optimistic roots...') - await pool.optimisticState.roots.clear() const root1 = pool.state.getMerkleRoot() const root2 = pool.optimisticState.getMerkleRoot() @@ -112,13 +110,6 @@ export async function createSentTxWorker(gasPrice: Gas logger.info('Removing nullifier %s from OS', nullifier) await pool.optimisticState.nullifiers.remove([nullifier]) - // Add root to confirmed state and remove from optimistic one - const poolIndex = ((commitIndex + 1) * OUTPLUSONE).toString(10) - logger.info('Adding root %s %s to PS', poolIndex, root) - await pool.state.roots.add({ [poolIndex]: root }) - logger.info('Removing root %s %s from OS', poolIndex, root) - await pool.optimisticState.roots.remove([poolIndex]) - const node1 = pool.state.getCommitment(commitIndex) const node2 = pool.optimisticState.getCommitment(commitIndex) logger.info(`Assert commitments are equal: ${node1}, ${node2}`) From b5289bb10559968c09c8b56bf96fd3a44b61ad1b Mon Sep 17 00:00:00 2001 From: Leonid Tyurin Date: Mon, 2 Jan 2023 12:44:02 +0400 Subject: [PATCH 02/10] New endpoint to get the relayer version (#122) --- .github/workflows/main.yml | 3 +++ README.md | 10 ++++++++++ docker/Dockerfile.relayer | 5 +++++ zp-relayer/config.ts | 2 ++ zp-relayer/endpoints.ts | 8 ++++++++ zp-relayer/router.ts | 1 + 6 files changed, 29 insertions(+) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 56619527..0e2838da 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -34,5 +34,8 @@ jobs: file: ./docker/Dockerfile.relayer tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} + build-args: | + RELAYER_REF=${{ github.ref_name }} + RELAYER_SHA=${{ github.sha }} cache-from: type=gha cache-to: type=gha diff --git a/README.md b/README.md index aacaa9c2..4e939da4 100644 --- a/README.md +++ b/README.md @@ -143,6 +143,16 @@ For a detailed description of each method's payload you can refer to [`zp-relaye } ``` +- `/version` - currently used relayer version + + **Response** + ``` + { + ref // Branch or tag + commitHash // Commit hash + } + ``` + - `/params/hash/tree` - hash of pool tree proving parameters. - `/params/hash/tx` - hash of pool transaction proving parameters. diff --git a/docker/Dockerfile.relayer b/docker/Dockerfile.relayer index 3c3e843b..d8c5ab5a 100644 --- a/docker/Dockerfile.relayer +++ b/docker/Dockerfile.relayer @@ -31,6 +31,11 @@ RUN yarn install --frozen-lockfile --production FROM node:18 +ARG RELAYER_REF +ARG RELAYER_SHA +ENV RELAYER_REF=${RELAYER_REF} +ENV RELAYER_SHA=${RELAYER_SHA} + WORKDIR /app COPY --from=build /app/zp-relayer/build ./zp-relayer diff --git a/zp-relayer/config.ts b/zp-relayer/config.ts index f0dd96c5..1c2a2cff 100644 --- a/zp-relayer/config.ts +++ b/zp-relayer/config.ts @@ -8,6 +8,8 @@ const relayerAddress = new Web3().eth.accounts.privateKeyToAccount( ).address const config = { + relayerRef: process.env.RELAYER_REF || null, + relayerSHA: process.env.RELAYER_SHA || null, port: parseInt(process.env.PORT || '8000'), relayerAddress, relayerPrivateKey: process.env.RELAYER_ADDRESS_PRIVATE_KEY as string, diff --git a/zp-relayer/endpoints.ts b/zp-relayer/endpoints.ts index 368748bf..8780958a 100644 --- a/zp-relayer/endpoints.ts +++ b/zp-relayer/endpoints.ts @@ -278,6 +278,13 @@ function getParamsHash(type: 'tree' | 'transfer') { } } +function relayerVersion(req: Request, res: Response) { + res.json({ + ref: config.relayerRef, + commitHash: config.relayerSHA, + }) +} + function root(req: Request, res: Response) { return res.sendStatus(200) } @@ -294,5 +301,6 @@ export default { getLimits, getSiblings, getParamsHash, + relayerVersion, root, } diff --git a/zp-relayer/router.ts b/zp-relayer/router.ts index 20177c0f..f52cb298 100644 --- a/zp-relayer/router.ts +++ b/zp-relayer/router.ts @@ -29,6 +29,7 @@ router.use((err: any, req: Request, res: Response, next: NextFunction) => { }) router.get('/', endpoints.root) +router.get('/version', endpoints.relayerVersion) router.post('/sendTransaction', wrapErr(endpoints.sendTransaction)) router.post('/sendTransactions', wrapErr(endpoints.sendTransactions)) router.get('/transactions', wrapErr(endpoints.getTransactions)) From c263c4131f8cf7115638b3b31854e569aff11d8f Mon Sep 17 00:00:00 2001 From: Leonid Tyurin Date: Mon, 2 Jan 2023 12:52:07 +0400 Subject: [PATCH 03/10] Add HTTPS URL validation (#124) --- zp-relayer/config.ts | 1 + .../services/providers/HttpListProvider.ts | 1 + zp-relayer/services/web3.ts | 2 ++ zp-relayer/utils/helpers.ts | 30 ++++++++++++++----- zp-relayer/utils/web3.ts | 2 +- zp-relayer/validateTx.ts | 1 + zp-relayer/workers/poolTxWorker.ts | 6 +++- 7 files changed, 34 insertions(+), 9 deletions(-) diff --git a/zp-relayer/config.ts b/zp-relayer/config.ts index 1c2a2cff..b10c07dc 100644 --- a/zp-relayer/config.ts +++ b/zp-relayer/config.ts @@ -43,6 +43,7 @@ const config = { relayerJsonRpcErrorCodes: (process.env.RELAYER_JSONRPC_ERROR_CODES || '-32603,-32002,-32005') .split(',') .map(s => parseInt(s, 10)), + requireHTTPS: process.env.RELAYER_REQUIRE_HTTPS === 'true', } export default config diff --git a/zp-relayer/services/providers/HttpListProvider.ts b/zp-relayer/services/providers/HttpListProvider.ts index 686a7696..a003424f 100644 --- a/zp-relayer/services/providers/HttpListProvider.ts +++ b/zp-relayer/services/providers/HttpListProvider.ts @@ -6,6 +6,7 @@ import { logger } from '../appLogger' import BaseHttpProvider, { ProviderOptions } from './BaseHttpProvider' export class HttpListProviderError extends Error { + name = 'HttpListProviderError' errors: Error[] constructor(message: string, errors: Error[]) { super(message) diff --git a/zp-relayer/services/web3.ts b/zp-relayer/services/web3.ts index 2a81919a..8db89e96 100644 --- a/zp-relayer/services/web3.ts +++ b/zp-relayer/services/web3.ts @@ -4,12 +4,14 @@ import type { HttpProvider } from 'web3-core' import { RETRY_CONFIG } from '@/utils/constants' import HttpListProvider from './providers/HttpListProvider' import RedundantHttpListProvider from './providers/RedundantHttpListProvider' +import { checkHTTPS } from '@/utils/helpers' const providerOptions = { requestTimeout: config.rpcRequestTimeout, retry: RETRY_CONFIG, } +config.rpcUrls.forEach(checkHTTPS(config.requireHTTPS)) const provider = new HttpListProvider(config.rpcUrls, providerOptions) const web3 = new Web3(provider as HttpProvider) diff --git a/zp-relayer/utils/helpers.ts b/zp-relayer/utils/helpers.ts index 2efe1716..c119807f 100644 --- a/zp-relayer/utils/helpers.ts +++ b/zp-relayer/utils/helpers.ts @@ -1,10 +1,9 @@ -import BN from 'bn.js' +import type BN from 'bn.js' import { padLeft, toBN } from 'web3-utils' import { logger } from '@/services/appLogger' -import { SnarkProof } from 'libzkbob-rs-node' +import type { SnarkProof } from 'libzkbob-rs-node' import { TxType } from 'zp-memo-parser' import type { Mutex } from 'async-mutex' -import { TxValidationError } from '@/validateTx' const S_MASK = toBN('0x7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff') const S_MAX = toBN('0x7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF5D576E7357A4501DDFE92F46681B20A0') @@ -113,14 +112,19 @@ export function withMutex(mutex: Mutex, f: () => Promise): () => Promise(f: () => Promise): Promise { +export async function withErrorLog( + f: () => Promise, + WarnErrors: (new (...args: any[]) => Error)[] = [] +): Promise { try { return await f() } catch (e) { - if (e instanceof TxValidationError) { - logger.warn('Validation error: %s', (e as Error).message) + const err = e as Error + const isWarn = WarnErrors.some(WarnError => err instanceof WarnError) + if (isWarn) { + logger.warn('%s: %s', err.name, err.message) } else { - logger.error('Found error: %s', (e as Error).message) + logger.error('Found error: %s', err.message) } throw e } @@ -157,3 +161,15 @@ export function withLoop(f: () => Promise, timeout: number, supressedError } } } + +export function checkHTTPS(isRequired: boolean) { + return (url: string) => { + if (!/^https.*/.test(url)) { + if (isRequired) { + throw new Error(`http is not allowed: ${url}`) + } else { + logger.warn('HTTP RPC URL is not recommended for production usage') + } + } + } +} diff --git a/zp-relayer/utils/web3.ts b/zp-relayer/utils/web3.ts index 7c02277e..35a9fb9f 100644 --- a/zp-relayer/utils/web3.ts +++ b/zp-relayer/utils/web3.ts @@ -1,4 +1,4 @@ -import Web3 from 'web3' +import type Web3 from 'web3' import { Contract, PastEventOptions } from 'web3-eth-contract' import { logger } from '@/services/appLogger' diff --git a/zp-relayer/validateTx.ts b/zp-relayer/validateTx.ts index 2f16e4e2..7911d723 100644 --- a/zp-relayer/validateTx.ts +++ b/zp-relayer/validateTx.ts @@ -20,6 +20,7 @@ const tokenContract = new web3.eth.Contract(TokenAbi as AbiItem[], config.tokenA const ZERO = toBN(0) export class TxValidationError extends Error { + name = 'TxValidationError' constructor(message: string) { super(message) } diff --git a/zp-relayer/workers/poolTxWorker.ts b/zp-relayer/workers/poolTxWorker.ts index 0340a2fd..81fcbcfe 100644 --- a/zp-relayer/workers/poolTxWorker.ts +++ b/zp-relayer/workers/poolTxWorker.ts @@ -16,6 +16,7 @@ import type { Mutex } from 'async-mutex' import { getChainId } from '@/utils/web3' import { getTxProofField } from '@/utils/proofInputs' import type { Redis } from 'ioredis' +import { TxValidationError } from '@/validateTx' export async function createPoolTxWorker( gasPrice: GasPrice, @@ -123,7 +124,10 @@ export async function createPoolTxWorker( const poolTxWorker = new Worker( TX_QUEUE_NAME, - job => withErrorLog(withMutex(mutex, () => poolTxWorkerProcessor(job))), + job => withErrorLog( + withMutex(mutex, () => poolTxWorkerProcessor(job)), + [TxValidationError] + ), WORKER_OPTIONS ) From db031e2a0d5fe1867b74105e1ca0439ef007d2e2 Mon Sep 17 00:00:00 2001 From: Leonid Tyurin Date: Mon, 2 Jan 2023 12:53:50 +0400 Subject: [PATCH 04/10] RPC sync check introduction to switch to fallback RPCs (#128) --- CONFIGURATION.md | 1 + zp-relayer/config.ts | 1 + .../services/providers/HttpListProvider.ts | 54 ++++++++++++++++--- zp-relayer/services/web3.ts | 1 + 4 files changed, 50 insertions(+), 7 deletions(-) diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 412b1c3b..1cf31e0c 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -29,6 +29,7 @@ | RELAYER_REDIS_URL | Url to redis instance | URL | | RPC_URL | The HTTPS URL(s) used to communicate to the RPC nodes. Several URLs can be specified, delimited by spaces. If the connection to one of these nodes is lost the next URL is used for connection. | URL | | RELAYER_TX_REDUNDANCY | If set to `true`, instructs relayer to send `eth_sendRawTransaction` requests through all available RPC urls defined in `RPC_URL` variables instead of using first available one. Defaults to `false` | boolean | +| RELAYER_RPC_SYNC_STATE_CHECK_INTERVAL | Interval for checking JSON RPC sync state, by requesting the latest block number. Relayer will switch to the fallback JSON RPC in case sync process is stuck. If this variable is `0` sync state check is disabled. Defaults to `0` | integer | | SENT_TX_DELAY | Delay in milliseconds for sentTxWorker to verify submitted transactions | integer | | PERMIT_DEADLINE_THRESHOLD_INITIAL | Minimum time threshold in seconds for permit signature deadline to be valid (before initial transaction submission) | integer | | PERMIT_DEADLINE_THRESHOLD_RESEND | Minimum time threshold in seconds for permit signature deadline to be valid (for re-send attempts) | integer | diff --git a/zp-relayer/config.ts b/zp-relayer/config.ts index b10c07dc..560f52f3 100644 --- a/zp-relayer/config.ts +++ b/zp-relayer/config.ts @@ -39,6 +39,7 @@ const config = { relayerTxRedundancy: process.env.RELAYER_TX_REDUNDANCY === 'true', sentTxDelay: parseInt(process.env.SENT_TX_DELAY || '30000'), rpcRequestTimeout: parseInt(process.env.RPC_REQUEST_TIMEOUT || '1000'), + rpcSyncCheckInterval: parseInt(process.env.RELAYER_RPC_SYNC_STATE_CHECK_INTERVAL || '0'), permitDeadlineThresholdInitial: parseInt(process.env.PERMIT_DEADLINE_THRESHOLD_INITIAL || '300'), relayerJsonRpcErrorCodes: (process.env.RELAYER_JSONRPC_ERROR_CODES || '-32603,-32002,-32005') .split(',') diff --git a/zp-relayer/services/providers/HttpListProvider.ts b/zp-relayer/services/providers/HttpListProvider.ts index a003424f..c3723d49 100644 --- a/zp-relayer/services/providers/HttpListProvider.ts +++ b/zp-relayer/services/providers/HttpListProvider.ts @@ -1,5 +1,6 @@ // Reference implementation: // https://github.com/omni/tokenbridge/blob/master/oracle/src/services/HttpListProvider.js +import { hexToNumber } from 'web3-utils' import promiseRetry from 'promise-retry' import { FALLBACK_RPC_URL_SWITCH_TIMEOUT } from '@/utils/constants' import { logger } from '../appLogger' @@ -18,6 +19,8 @@ export default class HttpListProvider extends BaseHttpProvider { urls: string[] currentIndex: number lastTimeUsedPrimary: number + latestBlock: number + syncStateCheckerIntervalId?: NodeJS.Timer constructor(urls: string[], options: Partial = {}) { if (!urls || !urls.length) { @@ -27,11 +30,54 @@ export default class HttpListProvider extends BaseHttpProvider { super(urls[0], options) this.currentIndex = 0 this.lastTimeUsedPrimary = 0 + this.latestBlock = 0 this.urls = urls } - private updateUrlIndex(index: number) { + startSyncStateChecker(syncCheckInterval: number) { + if (this.urls.length > 1 && syncCheckInterval > 0 && !this.syncStateCheckerIntervalId) { + this.syncStateCheckerIntervalId = setInterval(() => this.checkLatestBlock(), syncCheckInterval) + } + } + + checkLatestBlock() { + const payload = { jsonrpc: '2.0', id: 1, method: 'eth_blockNumber', params: [] } + this.send(payload, (error: any, result: any) => { + if (error) { + logger.warn('Failed to request latest block from all RPC urls', { oldBlock: this.latestBlock }) + } else if (result.error) { + logger.warn('Failed to make eth_blockNumber request due to unknown error', { + oldBlock: this.latestBlock, + error: result.error.message, + }) + this.updateUrlIndex() + } else { + const blockNumber = hexToNumber(result.result) + const blocksLog = { oldBlock: this.latestBlock, newBlock: blockNumber } + if (blockNumber > this.latestBlock) { + logger.debug('Updating latest block number', blocksLog) + this.latestBlock = blockNumber + } else { + logger.warn('Latest block on the node was not updated since last request', blocksLog) + this.updateUrlIndex() + } + } + }) + } + + private updateUrlIndex(index?: number) { + const prevIndex = this.currentIndex + if (!index) { + index = (prevIndex + 1) % this.urls.length + } + + if (prevIndex === index) { + return + } + + logger.info('Switching JSON-RPC URL: %s -> %s; Index: %d', this.urls[this.currentIndex], this.urls[index], index) + this.currentIndex = index this.host = this.urls[this.currentIndex] } @@ -54,12 +100,6 @@ export default class HttpListProvider extends BaseHttpProvider { // if some of URLs failed to respond, current URL index is updated to the first URL that responded if (currentIndex !== index) { - logger.info( - 'Switching to fallback JSON-RPC URL: %s -> %s; Index: %d', - this.urls[this.currentIndex], - this.urls[index], - index - ) this.updateUrlIndex(index) } callback(null, result) diff --git a/zp-relayer/services/web3.ts b/zp-relayer/services/web3.ts index 8db89e96..07a73289 100644 --- a/zp-relayer/services/web3.ts +++ b/zp-relayer/services/web3.ts @@ -13,6 +13,7 @@ const providerOptions = { config.rpcUrls.forEach(checkHTTPS(config.requireHTTPS)) const provider = new HttpListProvider(config.rpcUrls, providerOptions) +provider.startSyncStateChecker(config.rpcSyncCheckInterval) const web3 = new Web3(provider as HttpProvider) let web3Redundant = web3 From 36f00809c6130ba80e62cea180addac1d5ceadd5 Mon Sep 17 00:00:00 2001 From: Leonid Tyurin Date: Mon, 2 Jan 2023 13:22:27 +0400 Subject: [PATCH 05/10] Pause workers on insufficient balance error (#123) --- zp-relayer/config.ts | 1 + zp-relayer/services/gas-price/GasPrice.ts | 6 + zp-relayer/test.env | 1 + .../test/worker-tests/poolWorker.test.ts | 75 ++++++------ zp-relayer/test/worker-tests/utils.ts | 4 + zp-relayer/utils/helpers.ts | 43 ++++++- zp-relayer/utils/web3Errors.ts | 5 + zp-relayer/workers/poolTxWorker.ts | 115 ++++++++++-------- zp-relayer/workers/sentTxWorker.ts | 13 +- 9 files changed, 165 insertions(+), 98 deletions(-) diff --git a/zp-relayer/config.ts b/zp-relayer/config.ts index 560f52f3..a846ece2 100644 --- a/zp-relayer/config.ts +++ b/zp-relayer/config.ts @@ -39,6 +39,7 @@ const config = { relayerTxRedundancy: process.env.RELAYER_TX_REDUNDANCY === 'true', sentTxDelay: parseInt(process.env.SENT_TX_DELAY || '30000'), rpcRequestTimeout: parseInt(process.env.RPC_REQUEST_TIMEOUT || '1000'), + insufficientBalanceCheckTimeout: parseInt(process.env.INSUFFICIENT_BALANCE_CHECK_TIMEOUT || '60000'), rpcSyncCheckInterval: parseInt(process.env.RELAYER_RPC_SYNC_STATE_CHECK_INTERVAL || '0'), permitDeadlineThresholdInitial: parseInt(process.env.PERMIT_DEADLINE_THRESHOLD_INITIAL || '300'), relayerJsonRpcErrorCodes: (process.env.RELAYER_JSONRPC_ERROR_CODES || '-32603,-32002,-32005') diff --git a/zp-relayer/services/gas-price/GasPrice.ts b/zp-relayer/services/gas-price/GasPrice.ts index b4ee2959..448eff5a 100644 --- a/zp-relayer/services/gas-price/GasPrice.ts +++ b/zp-relayer/services/gas-price/GasPrice.ts @@ -38,6 +38,12 @@ function isEIP1559GasPrice(gp: GasPriceValue): gp is EIP1559GasPrice { return 'maxFeePerGas' in gp && 'maxPriorityFeePerGas' in gp } +export function getMaxRequiredGasPrice(gp: GasPriceValue): string { + if (isLegacyGasPrice(gp)) return gp.gasPrice + if (isEIP1559GasPrice(gp)) return gp.maxFeePerGas + throw new Error('Unknown gas price type') +} + export function chooseGasPriceOptions(a: GasPriceValue, b: GasPriceValue): GasPriceValue { if (isLegacyGasPrice(a) && isLegacyGasPrice(b)) { return { gasPrice: BN.max(toBN(a.gasPrice), toBN(b.gasPrice)).toString(10) } diff --git a/zp-relayer/test.env b/zp-relayer/test.env index 7af0a735..713a3514 100644 --- a/zp-relayer/test.env +++ b/zp-relayer/test.env @@ -11,6 +11,7 @@ TREE_UPDATE_PARAMS_PATH="./params/tree_params.bin" TX_VK_PATH="./params/transfer_verification_key.json" STATE_DIR_PATH="./test/STATE_DIR/" +INSUFFICIENT_BALANCE_CHECK_TIMEOUT=500 SENT_TX_DELAY=2000 GAS_PRICE_FALLBACK= GAS_PRICE_UPDATE_INTERVAL=100000 diff --git a/zp-relayer/test/worker-tests/poolWorker.test.ts b/zp-relayer/test/worker-tests/poolWorker.test.ts index e9421193..8eb2bfe9 100644 --- a/zp-relayer/test/worker-tests/poolWorker.test.ts +++ b/zp-relayer/test/worker-tests/poolWorker.test.ts @@ -16,7 +16,7 @@ import { GasPrice } from '../../services/gas-price' import { redis } from '../../services/redisClient' import { initializeDomain } from '../../utils/EIP712SaltedPermit' import { FlowOutputItem } from '../../../test-flow-generator/src/types' -import { disableMining, enableMining, evmRevert, evmSnapshot, mintTokens, newConnection } from './utils' +import { disableMining, enableMining, evmRevert, evmSnapshot, mintTokens, newConnection, setBalance } from './utils' import { validateTx } from '../../validateTx' import flow from '../flows/flow_independent_deposits_5.json' @@ -96,47 +96,31 @@ describe('poolWorker', () => { gasPriceService.stop() }) - it('executes a job', async () => { - const deposit = flow[0] - await mintTokens(deposit.txTypeData.from as string, parseInt(deposit.txTypeData.amount)) - - // @ts-ignore - const job = await submitJob(deposit) - - const [[txHash, sentId]] = await job.waitUntilFinished(poolQueueEvents) - expect(txHash.length).eq(66) + async function expectJobFinished(job: Job) { + const [[initialHash, sentId]] = await job.waitUntilFinished(poolQueueEvents) + expect(initialHash.length).eq(66) const sentJob = (await sentTxQueue.getJob(sentId)) as Job const [status, sentHash] = await sentJob.waitUntilFinished(sentQueueEvents) expect(status).eq(SentTxState.MINED) - expect(txHash).eq(sentHash) const r = await web3.eth.getTransactionReceipt(sentHash) expect(r.status).eq(true) - }) - it('should re-send tx', async () => { + return { + initialHash, + sentHash, + } + } + + it('executes a job', async () => { const deposit = flow[0] await mintTokens(deposit.txTypeData.from as string, parseInt(deposit.txTypeData.amount)) - await disableMining() - - sentWorker.on('progress', async () => { - await enableMining() - }) // @ts-ignore const job = await submitJob(deposit) - - const [[txHash, sentId]] = await job.waitUntilFinished(poolQueueEvents) - expect(txHash.length).eq(66) - - const sentJob = (await sentTxQueue.getJob(sentId)) as Job - const [status, sentHash] = await sentJob.waitUntilFinished(sentQueueEvents) - expect(status).eq(SentTxState.MINED) - expect(txHash).not.eq(sentHash) - - const r = await web3.eth.getTransactionReceipt(sentHash) - expect(r.status).eq(true) + const { initialHash, sentHash } = await expectJobFinished(job) + expect(initialHash).eq(sentHash) }) it('should re-submit optimistic txs after revert', async () => { @@ -227,7 +211,6 @@ describe('poolWorker', () => { const job = await submitJob(deposit) const [[txHash, sentId]] = await job.waitUntilFinished(poolQueueEvents) - expect(txHash.length).eq(66) const txBefore = await web3.eth.getTransaction(txHash) const gasPriceBefore = Number(txBefore.gasPrice) @@ -243,7 +226,6 @@ describe('poolWorker', () => { const txAfter = await web3.eth.getTransaction(sentHash) const gasPriceAfter = Number(txAfter.gasPrice) - console.log(gasPriceBefore + ' < ' + gasPriceAfter) expect(gasPriceBefore).lt(gasPriceAfter) }) @@ -255,16 +237,33 @@ describe('poolWorker', () => { // @ts-ignore const job = await submitJob(deposit) - const [[, sentId]] = await job.waitUntilFinished(poolQueueEvents) - - const sentJob = (await sentTxQueue.getJob(sentId)) as Job - const [, sentHash] = await sentJob.waitUntilFinished(sentQueueEvents) - - const r = await web3.eth.getTransactionReceipt(sentHash) - expect(r.status).eq(true) + await expectJobFinished(job) // @ts-ignore const withdrawJob = await submitJob(withdraw) await expect(withdrawJob.waitUntilFinished(poolQueueEvents)).rejectedWith('Withdraw address cannot be zero') }) + + it('should pause queues when relayer has insufficient funds', async () => { + let deposit = flow[0] + await mintTokens(deposit.txTypeData.from as string, parseInt(deposit.txTypeData.amount)) + const oldBalance = await web3.eth.getBalance(config.relayerAddress) + + await setBalance(config.relayerAddress, '0x0') + + // @ts-ignore + const failJob = await submitJob(deposit) + await expect(failJob.waitUntilFinished(poolQueueEvents)).rejectedWith('Insufficient funds for gas * price + value') + + // @ts-ignore + const job = await submitJob(deposit) + + expect(await poolTxQueue.count()).eq(1) + expect(await poolTxQueue.isPaused()).eq(true) + expect(await sentTxQueue.isPaused()).eq(true) + + await setBalance(config.relayerAddress, oldBalance) + + await expectJobFinished(job) + }) }) diff --git a/zp-relayer/test/worker-tests/utils.ts b/zp-relayer/test/worker-tests/utils.ts index 5384a2bd..b8774d14 100644 --- a/zp-relayer/test/worker-tests/utils.ts +++ b/zp-relayer/test/worker-tests/utils.ts @@ -56,6 +56,10 @@ export function dropTransaction(hash: string) { return callRpcMethod('anvil_dropTransaction', [hash]) } +export function setBalance(address: string, amount: string) { + return callRpcMethod('anvil_setBalance', [address, amount]) +} + export function evmSnapshot() { return callRpcMethod('evm_snapshot') as Promise } diff --git a/zp-relayer/utils/helpers.ts b/zp-relayer/utils/helpers.ts index c119807f..d0cc8f45 100644 --- a/zp-relayer/utils/helpers.ts +++ b/zp-relayer/utils/helpers.ts @@ -1,9 +1,11 @@ +import type Web3 from 'web3' import type BN from 'bn.js' import { padLeft, toBN } from 'web3-utils' import { logger } from '@/services/appLogger' import type { SnarkProof } from 'libzkbob-rs-node' import { TxType } from 'zp-memo-parser' import type { Mutex } from 'async-mutex' +import promiseRetry from 'promise-retry' const S_MASK = toBN('0x7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff') const S_MAX = toBN('0x7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF5D576E7357A4501DDFE92F46681B20A0') @@ -134,7 +136,7 @@ function sleep(ms: number) { return new Promise(resolve => setTimeout(resolve, ms)) } -export function withLoop(f: () => Promise, timeout: number, supressedErrors: string[] = []): () => Promise { +export function withLoop(f: () => Promise, timeout: number, suppressedErrors: string[] = []): () => Promise { // @ts-ignore return async () => { while (1) { @@ -142,15 +144,15 @@ export function withLoop(f: () => Promise, timeout: number, supressedError return await f() } catch (e) { const err = e as Error - let isSupressed = false - for (let supressedError of supressedErrors) { - if (err.message.includes(supressedError)) { - isSupressed = true + let isSuppressed = false + for (let suppressedError of suppressedErrors) { + if (err.message.includes(suppressedError)) { + isSuppressed = true break } } - if (isSupressed) { + if (isSuppressed) { logger.warn('%s', err.message) } else { logger.error('Found error %s', err.message) @@ -162,6 +164,35 @@ export function withLoop(f: () => Promise, timeout: number, supressedError } } +export function waitForFunds( + web3: Web3, + address: string, + cb: (balance: BN) => void, + minimumBalance: BN, + timeout: number, +) { + return promiseRetry( + async retry => { + logger.debug('Getting relayer balance') + const newBalance = toBN(await web3.eth.getBalance(address)) + const balanceLog = { balance: newBalance.toString(10), minimumBalance: minimumBalance.toString(10) } + if (newBalance.gte(minimumBalance)) { + logger.info('Relayer has minimum necessary balance', balanceLog) + cb(newBalance) + } else { + logger.warn('Relayer balance is still less than the minimum', balanceLog) + retry(new Error('Not enough balance')) + } + }, + { + forever: true, + factor: 1, + maxTimeout: timeout, + minTimeout: timeout, + } + ) +} + export function checkHTTPS(isRequired: boolean) { return (url: string) => { if (!/^https.*/.test(url)) { diff --git a/zp-relayer/utils/web3Errors.ts b/zp-relayer/utils/web3Errors.ts index 25cec2e4..b4b0bc42 100644 --- a/zp-relayer/utils/web3Errors.ts +++ b/zp-relayer/utils/web3Errors.ts @@ -23,3 +23,8 @@ export function isNonceError(e: Error) { message.includes(`the tx doesn't have the correct nonce`) ) } + +export function isInsufficientBalanceError(e: Error) { + const message = e.message.toLowerCase() + return message.includes('insufficient funds') +} diff --git a/zp-relayer/workers/poolTxWorker.ts b/zp-relayer/workers/poolTxWorker.ts index 81fcbcfe..cfd43e01 100644 --- a/zp-relayer/workers/poolTxWorker.ts +++ b/zp-relayer/workers/poolTxWorker.ts @@ -2,20 +2,21 @@ import { toBN, toWei } from 'web3-utils' import { Job, Worker } from 'bullmq' import { web3, web3Redundant } from '@/services/web3' import { logger } from '@/services/appLogger' -import { PoolTxResult, TxPayload } from '@/queue/poolTxQueue' +import { poolTxQueue, PoolTxResult, TxPayload } from '@/queue/poolTxQueue' import { TX_QUEUE_NAME, OUTPLUSONE } from '@/utils/constants' import { readNonce, updateField, RelayerKeys, updateNonce } from '@/utils/redisFields' -import { buildPrefixedMemo, truncateMemoTxPrefix, withErrorLog, withMutex } from '@/utils/helpers' +import { buildPrefixedMemo, truncateMemoTxPrefix, waitForFunds, withErrorLog, withMutex } from '@/utils/helpers' import { signTransaction, sendTransaction } from '@/tx/signAndSend' import { Pool, pool } from '@/pool' import { sentTxQueue } from '@/queue/sentTxQueue' import { processTx } from '@/txProcessor' import config from '@/config' -import { addExtraGasPrice, EstimationType, GasPrice } from '@/services/gas-price' +import { addExtraGasPrice, EstimationType, GasPrice, getMaxRequiredGasPrice } from '@/services/gas-price' import type { Mutex } from 'async-mutex' import { getChainId } from '@/utils/web3' import { getTxProofField } from '@/utils/proofInputs' import type { Redis } from 'ioredis' +import { isInsufficientBalanceError } from '@/utils/web3Errors' import { TxValidationError } from '@/validateTx' export async function createPoolTxWorker( @@ -64,59 +65,71 @@ export async function createPoolTxWorker( to: config.poolAddress, chainId: CHAIN_ID, } + const gasPriceValue = await gasPrice.fetchOnce() + const gasPriceWithExtra = addExtraGasPrice(gasPriceValue, config.gasPriceSurplus) + const [txHash, rawTransaction] = await signTransaction( + web3, + { + ...txConfig, + ...gasPriceWithExtra, + }, + config.relayerPrivateKey + ) try { - const gasPriceValue = await gasPrice.fetchOnce() - const gasPriceWithExtra = addExtraGasPrice(gasPriceValue, config.gasPriceSurplus) - const [txHash, rawTransaction] = await signTransaction( - web3, - { - ...txConfig, - ...gasPriceWithExtra, - }, - config.relayerPrivateKey - ) await sendTransaction(web3Redundant, rawTransaction) - - await updateNonce(++nonce) - - logger.info(`${logPrefix} TX hash ${txHash}`) - - await updateField(RelayerKeys.TRANSFER_NUM, commitIndex * OUTPLUSONE) - - const nullifier = getTxProofField(txProof, 'nullifier') - const outCommit = getTxProofField(txProof, 'out_commit') - - const truncatedMemo = truncateMemoTxPrefix(rawMemo, txType) - const prefixedMemo = buildPrefixedMemo(outCommit, txHash, truncatedMemo) - - pool.optimisticState.updateState(commitIndex, outCommit, prefixedMemo) - logger.debug('Adding nullifier %s to OS', nullifier) - await pool.optimisticState.nullifiers.add([nullifier]) - - const sentJob = await sentTxQueue.add( - txHash, - { - poolJobId: job.id as string, - root: rootAfter, - outCommit, - commitIndex, - truncatedMemo, - nullifier, - txConfig, - txPayload: tx, - prevAttempts: [[txHash, gasPriceWithExtra]], - }, - { - delay: config.sentTxDelay, - priority: nonce, - } - ) - - txHashes.push([txHash, sentJob.id as string]) } catch (e) { - logger.error(`${logPrefix} Send TX failed: ${e}`) + const err = e as Error + if (isInsufficientBalanceError(err)) { + const minimumBalance = toBN(gas).mul(toBN(getMaxRequiredGasPrice(gasPriceWithExtra))) + logger.error('Insufficient balance, waiting for funds', { minimumBalance: minimumBalance.toString(10) }) + await Promise.all([poolTxQueue.pause(), sentTxQueue.pause()]) + waitForFunds( + web3, + config.relayerAddress, + () => Promise.all([poolTxQueue.resume(), sentTxQueue.resume()]), + minimumBalance, + config.insufficientBalanceCheckTimeout + ) + } throw e } + + await updateNonce(++nonce) + + logger.info(`${logPrefix} TX hash ${txHash}`) + + await updateField(RelayerKeys.TRANSFER_NUM, commitIndex * OUTPLUSONE) + + const nullifier = getTxProofField(txProof, 'nullifier') + const outCommit = getTxProofField(txProof, 'out_commit') + + const truncatedMemo = truncateMemoTxPrefix(rawMemo, txType) + const prefixedMemo = buildPrefixedMemo(outCommit, txHash, truncatedMemo) + + pool.optimisticState.updateState(commitIndex, outCommit, prefixedMemo) + logger.debug('Adding nullifier %s to OS', nullifier) + await pool.optimisticState.nullifiers.add([nullifier]) + + const sentJob = await sentTxQueue.add( + txHash, + { + poolJobId: job.id as string, + root: rootAfter, + outCommit, + commitIndex, + truncatedMemo, + nullifier, + txConfig, + txPayload: tx, + prevAttempts: [[txHash, gasPriceWithExtra]], + }, + { + delay: config.sentTxDelay, + priority: nonce, + } + ) + + txHashes.push([txHash, sentJob.id as string]) } return txHashes diff --git a/zp-relayer/workers/sentTxWorker.ts b/zp-relayer/workers/sentTxWorker.ts index 00591fbf..39a493b0 100644 --- a/zp-relayer/workers/sentTxWorker.ts +++ b/zp-relayer/workers/sentTxWorker.ts @@ -1,15 +1,16 @@ import type Redis from 'ioredis' import type { Mutex } from 'async-mutex' +import { toBN } from 'web3-utils' import type { TransactionReceipt } from 'web3-core' import { Job, Worker } from 'bullmq' import config from '@/config' import { pool } from '@/pool' import { web3, web3Redundant } from '@/services/web3' import { logger } from '@/services/appLogger' -import { GasPrice, EstimationType, chooseGasPriceOptions, addExtraGasPrice } from '@/services/gas-price' -import { buildPrefixedMemo, withErrorLog, withLoop, withMutex } from '@/utils/helpers' +import { GasPrice, EstimationType, chooseGasPriceOptions, addExtraGasPrice, getMaxRequiredGasPrice } from '@/services/gas-price' +import { buildPrefixedMemo, waitForFunds, withErrorLog, withLoop, withMutex } from '@/utils/helpers' import { OUTPLUSONE, SENT_TX_QUEUE_NAME } from '@/utils/constants' -import { isGasPriceError, isSameTransactionError } from '@/utils/web3Errors' +import { isGasPriceError, isInsufficientBalanceError, isSameTransactionError } from '@/utils/web3Errors' import { SendAttempt, SentTxPayload, sentTxQueue, SentTxResult, SentTxState } from '@/queue/sentTxQueue' import { sendTransaction, signTransaction } from '@/tx/signAndSend' import { poolTxQueue } from '@/queue/poolTxQueue' @@ -194,6 +195,12 @@ export async function createSentTxWorker(gasPrice: Gas await job.update({ ...job.data, }) + } else if (isInsufficientBalanceError(err)) { + // We don't want to take into account last gasPrice increase + job.data.prevAttempts.at(-1)![1] = lastGasPrice + + const minimumBalance = toBN(txConfig.gas!).mul(toBN(getMaxRequiredGasPrice(newGasPrice))) + logger.error('Insufficient balance, waiting for funds', { minimumBalance: minimumBalance.toString(10) }) } // Error should be caught by `withLoop` to re-run job throw e From 4ed80d776215bb3b1754ba0165b1018d4676d093 Mon Sep 17 00:00:00 2001 From: Leonid Tyurin Date: Mon, 2 Jan 2023 13:49:23 +0400 Subject: [PATCH 06/10] Introduction of support id (#121) --- README.md | 11 +- zp-relayer/config.ts | 1 + zp-relayer/endpoints.ts | 109 ++++++------------ zp-relayer/pool.ts | 18 +-- zp-relayer/queue/poolTxQueue.ts | 7 +- zp-relayer/queue/sentTxQueue.ts | 1 + zp-relayer/router.ts | 24 +++- .../test/worker-tests/poolWorker.test.ts | 27 +++-- zp-relayer/txProcessor.ts | 8 +- zp-relayer/utils/constants.ts | 1 + zp-relayer/validation/validation.ts | 56 +++++---- zp-relayer/workers/poolTxWorker.ts | 30 ++--- zp-relayer/workers/sentTxWorker.ts | 49 ++++---- 13 files changed, 171 insertions(+), 171 deletions(-) diff --git a/README.md b/README.md index 4e939da4..5265470a 100644 --- a/README.md +++ b/README.md @@ -60,13 +60,10 @@ sequenceDiagram ## API For a detailed description of each method's payload you can refer to [`zp-relayer/validation/validation.ts`](zp-relayer/validation/validation.ts) file with JSON validation schemas. - -- `/sendTransaction` - submit a transaction to relayer (deprecated). +Note, that requests to all endpoints (except `/`, `/info`, `/params/hash/tree`, `/params/hash/tx`) also require a special `zkbob-support-id` header if `RELAYER_REQUIRE_TRACE_ID` env is set to `true`. This header should be automatically set by the client application. - `/sendTransactions` - submit batch of transaction to relayer. -- `/transactions?limit=${limit}&offset=${offset}&optimistic=${true|false}` - list of transaction memo blocks (deprecated). - - `/transactions/v2?limit=${limit}&offset=${offset}` - list of encoded transactions data in the following format `"${stateBit}${txHash}${outCommit}${memo}"`. `stateBit` is `1` if transaction is in confirmed state and `0` otherwise. - `/merkle/root/:index?` - get Merkle Tree root at specified index. @@ -120,11 +117,11 @@ For a detailed description of each method's payload you can refer to [`zp-relaye { deposit: { singleOperation // Limit for single pool operation - daylyForAddress: { // Daily deposit limits for address + dailyForAddress: { // Daily deposit limits for address total available }, - daylyForAll: { // Daily deposit limits for all users + dailyForAll: { // Daily deposit limits for all users total available }, @@ -134,7 +131,7 @@ For a detailed description of each method's payload you can refer to [`zp-relaye }, }, withdraw: { - daylyForAll: { // Daily withdraw limit for all users + dailyForAll: { // Daily withdraw limit for all users total available }, diff --git a/zp-relayer/config.ts b/zp-relayer/config.ts index a846ece2..e2016814 100644 --- a/zp-relayer/config.ts +++ b/zp-relayer/config.ts @@ -45,6 +45,7 @@ const config = { relayerJsonRpcErrorCodes: (process.env.RELAYER_JSONRPC_ERROR_CODES || '-32603,-32002,-32005') .split(',') .map(s => parseInt(s, 10)), + requireTraceId: process.env.RELAYER_REQUIRE_TRACE_ID === 'true', requireHTTPS: process.env.RELAYER_REQUIRE_HTTPS === 'true', } diff --git a/zp-relayer/endpoints.ts b/zp-relayer/endpoints.ts index 8780958a..ef4ad3d3 100644 --- a/zp-relayer/endpoints.ts +++ b/zp-relayer/endpoints.ts @@ -1,30 +1,30 @@ -import { Request, Response, NextFunction } from 'express' -import { pool } from './pool' -import { logger } from './services/appLogger' +import { Request, Response } from 'express' +import { pool, PoolTx } from './pool' import { poolTxQueue } from './queue/poolTxQueue' import config from './config' import { checkGetLimits, checkGetSiblings, - checkGetTransactions, checkGetTransactionsV2, checkMerkleRootErrors, - checkSendTransactionErrors, checkSendTransactionsErrors, + checkTraceId, + validateBatch, } from './validation/validation' import { sentTxQueue, SentTxState } from './queue/sentTxQueue' import type { Queue } from 'bullmq' +import { TRACE_ID } from './utils/constants' -async function sendTransactions(req: Request, res: Response, next: NextFunction) { - const errors = checkSendTransactionsErrors(req.body) - if (errors) { - logger.info('Request errors: %o', errors) - res.status(400).json({ errors }) - return - } +async function sendTransactions(req: Request, res: Response) { + validateBatch([ + [checkTraceId, req.headers], + [checkSendTransactionsErrors, req.body], + ]) - const rawTxs = req.body - const txs = rawTxs.map((tx: any) => { + const rawTxs = req.body as PoolTx[] + const traceId = req.headers[TRACE_ID] as string + + const txs = rawTxs.map(tx => { const { proof, memo, txType, depositSignature } = tx return { proof, @@ -33,59 +33,26 @@ async function sendTransactions(req: Request, res: Response, next: NextFunction) depositSignature, } }) - const jobId = await pool.transact(txs) + const jobId = await pool.transact(txs, traceId) res.json({ jobId }) } -async function sendTransaction(req: Request, res: Response, next: NextFunction) { - const errors = checkSendTransactionErrors(req.body) - if (errors) { - logger.info('Request errors: %o', errors) - res.status(400).json({ errors }) - return - } - - const { proof, memo, txType, depositSignature } = req.body - const tx = [{ proof, memo, txType, depositSignature }] - const jobId = await pool.transact(tx) - res.json({ jobId }) -} - -async function merkleRoot(req: Request, res: Response, next: NextFunction) { - const errors = checkMerkleRootErrors(req.params) - if (errors) { - logger.info('Request errors: %o', errors) - res.status(400).json({ errors }) - return - } +async function merkleRoot(req: Request, res: Response) { + validateBatch([ + [checkTraceId, req.headers], + [checkMerkleRootErrors, req.params], + ]) const index = req.params.index const root = await pool.getContractMerkleRoot(index) res.json(root) } -async function getTransactions(req: Request, res: Response, next: NextFunction) { - const errors = checkGetTransactions(req.query) - if (errors) { - logger.info('Request errors: %o', errors) - res.status(400).json({ errors }) - return - } - - const state = req.query.optimistic ? pool.optimisticState : pool.state - // Types checked in validation stage - // @ts-ignore - const { txs } = await state.getTransactions(req.query.limit, req.query.offset) - res.json(txs) -} - -async function getTransactionsV2(req: Request, res: Response, next: NextFunction) { - const errors = checkGetTransactionsV2(req.query) - if (errors) { - logger.info('Request errors: %o', errors) - res.status(400).json({ errors }) - return - } +async function getTransactionsV2(req: Request, res: Response) { + validateBatch([ + [checkTraceId, req.headers], + [checkGetTransactionsV2, req.query], + ]) const toV2Format = (prefix: string) => (tx: string) => { const outCommit = tx.slice(0, 64) @@ -128,6 +95,8 @@ async function getJob(req: Request, res: Response) { txHash: null | string } + validateBatch([[checkTraceId, req.headers]]) + const jobId = req.params.id async function getPoolJobState(requestedJobId: string): Promise { @@ -233,18 +202,18 @@ function relayerInfo(req: Request, res: Response) { } function getFee(req: Request, res: Response) { + validateBatch([[checkTraceId, req.headers]]) + res.json({ fee: config.relayerFee.toString(10), }) } async function getLimits(req: Request, res: Response) { - const errors = checkGetLimits(req.query) - if (errors) { - logger.info('Request errors: %o', errors) - res.status(400).json({ errors }) - return - } + validateBatch([ + [checkTraceId, req.headers], + [checkGetLimits, req.query], + ]) const address = req.query.address as unknown as string const limits = await pool.getLimitsFor(address) @@ -253,12 +222,10 @@ async function getLimits(req: Request, res: Response) { } function getSiblings(req: Request, res: Response) { - const errors = checkGetSiblings(req.query) - if (errors) { - logger.info('Request errors: %o', errors) - res.status(400).json({ errors }) - return - } + validateBatch([ + [checkTraceId, req.headers], + [checkGetSiblings, req.query], + ]) const index = req.query.index as unknown as number @@ -290,10 +257,8 @@ function root(req: Request, res: Response) { } export default { - sendTransaction, sendTransactions, merkleRoot, - getTransactions, getTransactionsV2, getJob, relayerInfo, diff --git a/zp-relayer/pool.ts b/zp-relayer/pool.ts index 20389dcf..b0ce3548 100644 --- a/zp-relayer/pool.ts +++ b/zp-relayer/pool.ts @@ -42,11 +42,11 @@ export interface Limits { export interface LimitsFetch { deposit: { singleOperation: string - daylyForAddress: { + dailyForAddress: { total: string available: string } - daylyForAll: { + dailyForAll: { total: string available: string } @@ -56,7 +56,7 @@ export interface LimitsFetch { } } withdraw: { - daylyForAll: { + dailyForAll: { total: string available: string } @@ -109,7 +109,7 @@ class Pool { this.isInitialized = true } - async transact(txs: PoolTx[]) { + async transact(txs: PoolTx[], traceId?: string) { const queueTxs = txs.map(({ proof, txType, memo, depositSignature }) => { return { amount: '0', @@ -120,8 +120,8 @@ class Pool { depositSignature, } }) - const job = await poolTxQueue.add('tx', queueTxs) - logger.debug(`Added job: ${job.id}`) + const job = await poolTxQueue.add('tx', { transactions: queueTxs, traceId }) + logger.debug(`Added poolTxWorker job: ${job.id}`) return job.id } @@ -249,11 +249,11 @@ class Pool { const limitsFetch = { deposit: { singleOperation: limits.depositCap.toString(10), - daylyForAddress: { + dailyForAddress: { total: limits.dailyUserDepositCap.toString(10), available: limits.dailyUserDepositCap.sub(limits.dailyUserDepositCapUsage).toString(10), }, - daylyForAll: { + dailyForAll: { total: limits.dailyDepositCap.toString(10), available: limits.dailyDepositCap.sub(limits.dailyDepositCapUsage).toString(10), }, @@ -263,7 +263,7 @@ class Pool { }, }, withdraw: { - daylyForAll: { + dailyForAll: { total: limits.dailyWithdrawalCap.toString(10), available: limits.dailyWithdrawalCap.sub(limits.dailyWithdrawalCapUsage).toString(10), }, diff --git a/zp-relayer/queue/poolTxQueue.ts b/zp-relayer/queue/poolTxQueue.ts index c8c293e2..dd4f658b 100644 --- a/zp-relayer/queue/poolTxQueue.ts +++ b/zp-relayer/queue/poolTxQueue.ts @@ -13,8 +13,13 @@ export interface TxPayload { depositSignature: string | null } +export interface BatchTx { + transactions: TxPayload[] + traceId?: string +} + export type PoolTxResult = [string, string] -export const poolTxQueue = new Queue(TX_QUEUE_NAME, { +export const poolTxQueue = new Queue(TX_QUEUE_NAME, { connection: redis, }) diff --git a/zp-relayer/queue/sentTxQueue.ts b/zp-relayer/queue/sentTxQueue.ts index a12e6839..ddad2634 100644 --- a/zp-relayer/queue/sentTxQueue.ts +++ b/zp-relayer/queue/sentTxQueue.ts @@ -16,6 +16,7 @@ export interface SentTxPayload { nullifier: string txPayload: TxPayload prevAttempts: SendAttempt[] + traceId?: string } export enum SentTxState { diff --git a/zp-relayer/router.ts b/zp-relayer/router.ts index f52cb298..51a74343 100644 --- a/zp-relayer/router.ts +++ b/zp-relayer/router.ts @@ -2,6 +2,9 @@ import express, { NextFunction, Request, Response } from 'express' import cors from 'cors' import endpoints from './endpoints' import { logger } from './services/appLogger' +import { ValidationError } from './validation/validation' +import config from './config' +import { TRACE_ID } from './utils/constants' function wrapErr(f: (_req: Request, _res: Response, _next: NextFunction) => Promise | void) { return async (req: Request, res: Response, next: NextFunction) => { @@ -28,11 +31,16 @@ router.use((err: any, req: Request, res: Response, next: NextFunction) => { next() }) +router.use((req: Request, res: Response, next: NextFunction) => { + if (config.requireTraceId && req.headers[TRACE_ID]) { + logger.info('TraceId', { traceId: req.headers[TRACE_ID], path: req.path }) + } + next() +}) + router.get('/', endpoints.root) router.get('/version', endpoints.relayerVersion) -router.post('/sendTransaction', wrapErr(endpoints.sendTransaction)) router.post('/sendTransactions', wrapErr(endpoints.sendTransactions)) -router.get('/transactions', wrapErr(endpoints.getTransactions)) router.get('/transactions/v2', wrapErr(endpoints.getTransactionsV2)) router.get('/merkle/root/:index?', wrapErr(endpoints.merkleRoot)) router.get('/job/:id', wrapErr(endpoints.getJob)) @@ -43,4 +51,16 @@ router.get('/siblings', wrapErr(endpoints.getSiblings)) router.get('/params/hash/tree', wrapErr(endpoints.getParamsHash('tree'))) router.get('/params/hash/tx', wrapErr(endpoints.getParamsHash('transfer'))) +// Error handler middleware +router.use((err: any, req: Request, res: Response, next: NextFunction) => { + if (err instanceof ValidationError) { + const validationErrors = err.validationErrors + logger.info('Request errors: %o', validationErrors, { path: req.path }) + res.status(400).json(validationErrors) + next() + } else { + next(err) + } +}) + export default router diff --git a/zp-relayer/test/worker-tests/poolWorker.test.ts b/zp-relayer/test/worker-tests/poolWorker.test.ts index 8eb2bfe9..952f4944 100644 --- a/zp-relayer/test/worker-tests/poolWorker.test.ts +++ b/zp-relayer/test/worker-tests/poolWorker.test.ts @@ -8,7 +8,7 @@ import { web3 } from './web3' import { pool } from '../../pool' import config from '../../config' import { sentTxQueue, SentTxState } from '../../queue/sentTxQueue' -import { poolTxQueue, TxPayload, PoolTxResult } from '../../queue/poolTxQueue' +import { poolTxQueue, PoolTxResult, BatchTx } from '../../queue/poolTxQueue' import { createPoolTxWorker } from '../../workers/poolTxWorker' import { createSentTxWorker } from '../../workers/sentTxWorker' import { PoolState } from '../../state/PoolState' @@ -26,17 +26,20 @@ import flowZeroAddressWithdraw from '../flows/flow_zero-address_withdraw_2.json' chai.use(chaiAsPromised) const expect = chai.expect -async function submitJob(item: FlowOutputItem): Promise> { - const job = await poolTxQueue.add('test', [ - { - amount: '0', - gas: '2000000', - txProof: item.proof, - txType: item.txType, - rawMemo: item.transactionData.memo, - depositSignature: item.depositSignature, - }, - ]) +async function submitJob(item: FlowOutputItem): Promise> { + const job = await poolTxQueue.add('test', { + transactions: [ + { + amount: '0', + gas: '2000000', + txProof: item.proof, + txType: item.txType, + rawMemo: item.transactionData.memo, + depositSignature: item.depositSignature, + }, + ], + traceId: 'test', + }) return job } diff --git a/zp-relayer/txProcessor.ts b/zp-relayer/txProcessor.ts index 6ef7bf8b..3c4d38cc 100644 --- a/zp-relayer/txProcessor.ts +++ b/zp-relayer/txProcessor.ts @@ -62,20 +62,18 @@ function buildTxData(txData: TxData) { return data.join('') } -export async function processTx(id: string, tx: TxPayload) { +export async function processTx(tx: TxPayload) { const { txType, txProof, rawMemo: memo, depositSignature } = tx const nullifier = getTxProofField(txProof, 'nullifier') const outCommit = getTxProofField(txProof, 'out_commit') const delta = parseDelta(getTxProofField(txProof, 'delta')) - const logPrefix = `Job ${id}:` - const { pub, sec, commitIndex } = pool.optimisticState.getVirtualTreeProofInputs(outCommit) - logger.debug(`${logPrefix} Proving tree...`) + logger.debug(`Proving tree...`) const treeProof = await Proof.treeAsync(pool.treeParams, pub, sec) - logger.debug(`${logPrefix} Tree proved`) + logger.debug(`Tree proved`) const rootAfter = treeProof.inputs[1] const data = buildTxData({ diff --git a/zp-relayer/utils/constants.ts b/zp-relayer/utils/constants.ts index 2d78f479..04712a00 100644 --- a/zp-relayer/utils/constants.ts +++ b/zp-relayer/utils/constants.ts @@ -17,6 +17,7 @@ const constants = { maxTimeout: 60000, randomize: true, }, + TRACE_ID: 'zkbob-support-id' as const, } export = constants diff --git a/zp-relayer/validation/validation.ts b/zp-relayer/validation/validation.ts index de46f670..e14116c2 100644 --- a/zp-relayer/validation/validation.ts +++ b/zp-relayer/validation/validation.ts @@ -3,7 +3,8 @@ import { isAddress } from 'web3-utils' import { Proof, SnarkProof } from 'libzkbob-rs-node' import { TxType } from 'zp-memo-parser' import type { PoolTx } from '@/pool' -import { ZERO_ADDRESS } from '@/utils/constants' +import { TRACE_ID, ZERO_ADDRESS } from '@/utils/constants' +import config from '@/config' const ajv = new Ajv({ allErrors: true, coerceTypes: true, useDefaults: true }) @@ -24,6 +25,7 @@ ajv.addKeyword({ }) const AjvString: JSONSchemaType = { type: 'string' } +const AjvNullableString: JSONSchemaType = { type: 'string', nullable: true } const AjvNullableAddress: JSONSchemaType = { type: 'string', @@ -77,7 +79,7 @@ const AjvSendTransactionSchema: JSONSchemaType = { type: 'string', enum: [TxType.DEPOSIT, TxType.PERMITTABLE_DEPOSIT, TxType.TRANSFER, TxType.WITHDRAWAL], }, - depositSignature: { type: 'string', nullable: true }, + depositSignature: AjvNullableString, }, required: ['proof', 'memo', 'txType'], } @@ -87,31 +89,6 @@ const AjvSendTransactionsSchema: JSONSchemaType = { items: AjvSendTransactionSchema, } -const AjvGetTransactionsSchema: JSONSchemaType<{ - limit: number - offset: number - optimistic: boolean -}> = { - type: 'object', - properties: { - limit: { - type: 'integer', - minimum: 1, - default: 100, - }, - offset: { - type: 'integer', - minimum: 0, - default: 0, - }, - optimistic: { - type: 'boolean', - default: false, - }, - }, - required: [], -} - const AjvGetTransactionsV2Schema: JSONSchemaType<{ limit: number offset: number @@ -168,6 +145,12 @@ const AjvGetSiblingsSchema: JSONSchemaType<{ required: ['index'], } +const AjvTraceIdSchema: JSONSchemaType<{ [TRACE_ID]: string }> = { + type: 'object', + properties: { [TRACE_ID]: AjvNullableString }, + required: config.requireTraceId ? [TRACE_ID] : [], +} + function checkErrors(schema: JSONSchemaType) { const validate = ajv.compile(schema) return (data: any) => { @@ -181,10 +164,25 @@ function checkErrors(schema: JSONSchemaType) { } } +type ValidationFunction = ReturnType + +export class ValidationError extends Error { + constructor(public validationErrors: ReturnType) { + super() + } +} + +export function validateBatch(validationSet: [ValidationFunction, any][]) { + for (const [validate, data] of validationSet) { + const errors = validate(data) + if (errors) throw new ValidationError(errors) + } + return null +} + export const checkMerkleRootErrors = checkErrors(AjvMerkleRootSchema) -export const checkSendTransactionErrors = checkErrors(AjvSendTransactionSchema) export const checkSendTransactionsErrors = checkErrors(AjvSendTransactionsSchema) -export const checkGetTransactions = checkErrors(AjvGetTransactionsSchema) export const checkGetTransactionsV2 = checkErrors(AjvGetTransactionsV2Schema) export const checkGetLimits = checkErrors(AjvGetLimitsSchema) export const checkGetSiblings = checkErrors(AjvGetSiblingsSchema) +export const checkTraceId = checkErrors(AjvTraceIdSchema) diff --git a/zp-relayer/workers/poolTxWorker.ts b/zp-relayer/workers/poolTxWorker.ts index cfd43e01..531a9b91 100644 --- a/zp-relayer/workers/poolTxWorker.ts +++ b/zp-relayer/workers/poolTxWorker.ts @@ -2,7 +2,7 @@ import { toBN, toWei } from 'web3-utils' import { Job, Worker } from 'bullmq' import { web3, web3Redundant } from '@/services/web3' import { logger } from '@/services/appLogger' -import { poolTxQueue, PoolTxResult, TxPayload } from '@/queue/poolTxQueue' +import { poolTxQueue, BatchTx, PoolTxResult, TxPayload } from '@/queue/poolTxQueue' import { TX_QUEUE_NAME, OUTPLUSONE } from '@/utils/constants' import { readNonce, updateField, RelayerKeys, updateNonce } from '@/utils/redisFields' import { buildPrefixedMemo, truncateMemoTxPrefix, waitForFunds, withErrorLog, withMutex } from '@/utils/helpers' @@ -25,6 +25,7 @@ export async function createPoolTxWorker( mutex: Mutex, redis: Redis ) { + const workerLogger = logger.child({ worker: 'pool' }) const WORKER_OPTIONS = { autorun: false, connection: redis, @@ -35,17 +36,18 @@ export async function createPoolTxWorker( await updateNonce(nonce) const CHAIN_ID = await getChainId(web3) - const poolTxWorkerProcessor = async (job: Job) => { + const poolTxWorkerProcessor = async (job: Job) => { const sentTxNum = await sentTxQueue.count() if (sentTxNum >= config.maxSentQueueSize) { throw new Error('Optimistic state overflow') } - const txs = job.data + const txs = job.data.transactions + const traceId = job.data.traceId - const logPrefix = `POOL WORKER: Job ${job.id}:` - logger.info('%s processing...', logPrefix) - logger.info('Received %s txs', txs.length) + const jobLogger = workerLogger.child({ jobId: job.id, traceId }) + jobLogger.info('Processing...') + jobLogger.info('Received %s txs', txs.length) const txHashes: [string, string][] = [] for (const tx of txs) { @@ -53,9 +55,9 @@ export async function createPoolTxWorker( await validateTx(tx, pool) - const { data, commitIndex, rootAfter } = await processTx(job.id as string, tx) + const { data, commitIndex, rootAfter } = await processTx(tx) - logger.info(`${logPrefix} nonce: ${nonce}`) + jobLogger.info(`nonce: ${nonce}`) const txConfig = { data, @@ -81,7 +83,7 @@ export async function createPoolTxWorker( const err = e as Error if (isInsufficientBalanceError(err)) { const minimumBalance = toBN(gas).mul(toBN(getMaxRequiredGasPrice(gasPriceWithExtra))) - logger.error('Insufficient balance, waiting for funds', { minimumBalance: minimumBalance.toString(10) }) + jobLogger.error('Insufficient balance, waiting for funds', { minimumBalance: minimumBalance.toString(10) }) await Promise.all([poolTxQueue.pause(), sentTxQueue.pause()]) waitForFunds( web3, @@ -96,7 +98,7 @@ export async function createPoolTxWorker( await updateNonce(++nonce) - logger.info(`${logPrefix} TX hash ${txHash}`) + jobLogger.info('Sent tx', { txHash }) await updateField(RelayerKeys.TRANSFER_NUM, commitIndex * OUTPLUSONE) @@ -107,7 +109,7 @@ export async function createPoolTxWorker( const prefixedMemo = buildPrefixedMemo(outCommit, txHash, truncatedMemo) pool.optimisticState.updateState(commitIndex, outCommit, prefixedMemo) - logger.debug('Adding nullifier %s to OS', nullifier) + jobLogger.debug('Adding nullifier %s to OS', nullifier) await pool.optimisticState.nullifiers.add([nullifier]) const sentJob = await sentTxQueue.add( @@ -122,12 +124,14 @@ export async function createPoolTxWorker( txConfig, txPayload: tx, prevAttempts: [[txHash, gasPriceWithExtra]], + traceId, }, { delay: config.sentTxDelay, priority: nonce, } ) + jobLogger.info(`Added sentTxWorker job: ${sentJob.id}`) txHashes.push([txHash, sentJob.id as string]) } @@ -135,7 +139,7 @@ export async function createPoolTxWorker( return txHashes } - const poolTxWorker = new Worker( + const poolTxWorker = new Worker( TX_QUEUE_NAME, job => withErrorLog( withMutex(mutex, () => poolTxWorkerProcessor(job)), @@ -145,7 +149,7 @@ export async function createPoolTxWorker( ) poolTxWorker.on('error', e => { - logger.info('POOL_WORKER ERR: %o', e) + workerLogger.info('POOL_WORKER ERR: %o', e) }) return poolTxWorker diff --git a/zp-relayer/workers/sentTxWorker.ts b/zp-relayer/workers/sentTxWorker.ts index 39a493b0..f2f6f765 100644 --- a/zp-relayer/workers/sentTxWorker.ts +++ b/zp-relayer/workers/sentTxWorker.ts @@ -41,6 +41,7 @@ async function clearOptimisticState() { } export async function createSentTxWorker(gasPrice: GasPrice, mutex: Mutex, redis: Redis) { + const workerLogger = logger.child({ worker: 'sent-tx' }) const WORKER_OPTIONS = { autorun: false, connection: redis, @@ -62,11 +63,11 @@ export async function createSentTxWorker(gasPrice: Gas // Iterate in reverse order to check the latest hash first for (let i = prevAttempts.length - 1; i >= 0; i--) { const txHash = prevAttempts[i][0] - logger.info('Verifying %s ...', txHash) + logger.info('Verifying tx', { txHash }) try { tx = await web3.eth.getTransactionReceipt(txHash) } catch (e) { - logger.warn('Cannot get tx receipt for %s; RPC response: %s', txHash, (e as Error).message) + logger.warn('Cannot get tx receipt; RPC response: %s', (e as Error).message, { txHash }) // Exception should be caught by `withLoop` to re-run job throw e } @@ -78,8 +79,9 @@ export async function createSentTxWorker(gasPrice: Gas } const sentTxWorkerProcessor = async (job: Job) => { - const logPrefix = `SENT WORKER: Job ${job.id}:` - logger.info('%s processing...', logPrefix) + const jobLogger = workerLogger.child({ jobId: job.id, traceId: job.data.traceId }) + + jobLogger.info('Verifying job %s', job.data.poolJobId) const { truncatedMemo, commitIndex, outCommit, nullifier, root, prevAttempts, txConfig } = job.data // Any thrown web3 error will re-trigger re-send loop iteration @@ -98,7 +100,7 @@ export async function createSentTxWorker(gasPrice: Gas // Tx mined if (tx.status) { // Successful - logger.info('%s Transaction %s was successfully mined at block %s', logPrefix, txHash, tx.blockNumber) + jobLogger.info('Transaction was successfully mined', { txHash, blockNumber: tx.blockNumber }) const prefixedMemo = buildPrefixedMemo(outCommit, txHash, truncatedMemo) pool.state.updateState(commitIndex, outCommit, prefixedMemo) @@ -106,34 +108,34 @@ export async function createSentTxWorker(gasPrice: Gas pool.optimisticState.addTx(commitIndex * OUTPLUSONE, Buffer.from(prefixedMemo, 'hex')) // Add nullifier to confirmed state and remove from optimistic one - logger.info('Adding nullifier %s to PS', nullifier) + jobLogger.info('Adding nullifier %s to PS', nullifier) await pool.state.nullifiers.add([nullifier]) - logger.info('Removing nullifier %s from OS', nullifier) + jobLogger.info('Removing nullifier %s from OS', nullifier) await pool.optimisticState.nullifiers.remove([nullifier]) const node1 = pool.state.getCommitment(commitIndex) const node2 = pool.optimisticState.getCommitment(commitIndex) - logger.info(`Assert commitments are equal: ${node1}, ${node2}`) + jobLogger.info('Assert commitments are equal: %s, %s', node1, node2) if (node1 !== node2) { - logger.error('Commitments are not equal') + jobLogger.error('Commitments are not equal') } const rootConfirmed = pool.state.getMerkleRoot() - logger.info(`Assert roots are equal`) + jobLogger.info('Assert roots are equal') if (rootConfirmed !== root) { // TODO: Should be impossible but in such case // we should recover from some checkpoint - logger.error('Roots are not equal: %s should be %s', rootConfirmed, root) + jobLogger.error('Roots are not equal: %s should be %s', rootConfirmed, root) } return [SentTxState.MINED, txHash, []] as SentTxResult } else { // Revert - logger.error('%s Transaction %s reverted at block %s', logPrefix, txHash, tx.blockNumber) + jobLogger.error('Transaction reverted', { txHash, blockNumber: tx.blockNumber }) // Means that rollback was done previously, no need to do it now if (await checkMarked(redis, job.id as string)) { - logger.info('%s Job %s marked as failed, skipping', logPrefix, job.id) + jobLogger.info('Job marked as failed, skipping') return [SentTxState.REVERT, txHash, []] as SentTxResult } @@ -150,18 +152,23 @@ export async function createSentTxWorker(gasPrice: Gas // https://github.com/taskforcesh/bullmq/blob/master/src/commands/addJob-8.lua#L142-L143 if (!wj?.id) continue waitingJobIds.push(wj.id) - const reschedulePromise = poolTxQueue.add(txHash, [wj.data.txPayload]).then(j => { + + const { txPayload, traceId } = wj.data + const transactions = [txPayload] + + // To not mess up traceId we add each transaction separately + const reschedulePromise = poolTxQueue.add(txHash, { transactions, traceId }).then(j => { const newPoolJobId = j.id as string newPoolJobIdMapping[wj.data.poolJobId] = newPoolJobId return newPoolJobId }) reschedulePromises.push(reschedulePromise) } - logger.info('Marking ids %j as failed', waitingJobIds) + jobLogger.info('Marking ids %j as failed', waitingJobIds) await markFailed(redis, waitingJobIds) - logger.info('%s Rescheduling %d jobs to process...', logPrefix, waitingJobs.length) + jobLogger.info('Rescheduling %d jobs to process...', waitingJobs.length) const rescheduledIds = await Promise.all(reschedulePromises) - logger.info('%s Update pool job id mapping %j ...', logPrefix, newPoolJobIdMapping) + jobLogger.info('Update pool job id mapping %j ...', newPoolJobIdMapping) await pool.state.jobIdsMapping.add(newPoolJobIdMapping) return [SentTxState.REVERT, txHash, rescheduledIds] as SentTxResult @@ -174,7 +181,7 @@ export async function createSentTxWorker(gasPrice: Gas const newGasPrice = chooseGasPriceOptions(oldWithExtra, newWithExtra) - logger.warn('%s Tx %s is not mined; updating gasPrice: %o -> %o', logPrefix, lastHash, lastGasPrice, newGasPrice) + jobLogger.warn('Tx %s is not mined; updating gasPrice: %o -> %o', lastHash, lastGasPrice, newGasPrice) const newTxConfig = { ...txConfig, @@ -185,10 +192,10 @@ export async function createSentTxWorker(gasPrice: Gas job.data.prevAttempts.push([newTxHash, newGasPrice]) try { await sendTransaction(web3Redundant, rawTransaction) - logger.info(`${logPrefix} Re-send tx; New hash: ${newTxHash}`) + jobLogger.info('Re-send tx', { txHash: newTxHash }) } catch (e) { const err = e as Error - logger.warn('%s Tx resend failed for %s: %s', logPrefix, lastHash, err.message) + jobLogger.warn('Tx resend failed: %s', err.message, { txHash: newTxHash }) if (isGasPriceError(err) || isSameTransactionError(err)) { // Tx wasn't sent successfully, but still update last attempt's // gasPrice to be accounted in the next iteration @@ -237,7 +244,7 @@ export async function createSentTxWorker(gasPrice: Gas ) sentTxWorker.on('error', e => { - logger.info('SENT_WORKER ERR: %o', e) + workerLogger.info('SENT_WORKER ERR: %o', e) }) return sentTxWorker From 21fa17b6b9854f48d304a3fcacd35ef14273911b Mon Sep 17 00:00:00 2001 From: Leonid Tyurin Date: Mon, 2 Jan 2023 14:37:39 +0400 Subject: [PATCH 07/10] Fix missing configuration variables (#130) --- CONFIGURATION.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 1cf31e0c..f734f33f 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -29,7 +29,10 @@ | RELAYER_REDIS_URL | Url to redis instance | URL | | RPC_URL | The HTTPS URL(s) used to communicate to the RPC nodes. Several URLs can be specified, delimited by spaces. If the connection to one of these nodes is lost the next URL is used for connection. | URL | | RELAYER_TX_REDUNDANCY | If set to `true`, instructs relayer to send `eth_sendRawTransaction` requests through all available RPC urls defined in `RPC_URL` variables instead of using first available one. Defaults to `false` | boolean | -| RELAYER_RPC_SYNC_STATE_CHECK_INTERVAL | Interval for checking JSON RPC sync state, by requesting the latest block number. Relayer will switch to the fallback JSON RPC in case sync process is stuck. If this variable is `0` sync state check is disabled. Defaults to `0` | integer | +| RELAYER_RPC_SYNC_STATE_CHECK_INTERVAL | Interval in milliseconds for checking JSON RPC sync state, by requesting the latest block number. Relayer will switch to the fallback JSON RPC in case sync process is stuck. If this variable is `0` sync state check is disabled. Defaults to `0` | integer | +| INSUFFICIENT_BALANCE_CHECK_TIMEOUT | Interval in milliseconds to check for relayer balance update if transaction send failed with insufficient balance error. Default `60000` | integer | | SENT_TX_DELAY | Delay in milliseconds for sentTxWorker to verify submitted transactions | integer | | PERMIT_DEADLINE_THRESHOLD_INITIAL | Minimum time threshold in seconds for permit signature deadline to be valid (before initial transaction submission) | integer | | PERMIT_DEADLINE_THRESHOLD_RESEND | Minimum time threshold in seconds for permit signature deadline to be valid (for re-send attempts) | integer | +| RELAYER_REQUIRE_TRACE_ID | If set to `true`, then requests to relayer (except `/info`, `/version`, `/params/hash/tree`, `/params/hash/tx`) without `zkbob-support-id` header will be rejected. | boolean | +| RELAYER_REQUIRE_HTTPS | If set to `true`, then RPC URL(s) must be in HTTPS format. HTTP RPC URL(s) should be used in test environment only. | boolean | \ No newline at end of file From eb4f6e88813b22d94ba92c8ffd157d23207b3b59 Mon Sep 17 00:00:00 2001 From: Alexander Kolotov Date: Mon, 2 Jan 2023 13:40:07 +0300 Subject: [PATCH 08/10] Bump package version to 3.0.0 (#127) --- zp-relayer/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zp-relayer/package.json b/zp-relayer/package.json index f269eaed..99821084 100644 --- a/zp-relayer/package.json +++ b/zp-relayer/package.json @@ -1,6 +1,6 @@ { "name": "zp-relayer", - "version": "2.2.0", + "version": "3.0.0", "main": "build/index.js", "types": "build/index.d.ts", "scripts": { From fe1418008dc8dd689b6dae3da97c55f44be3e84e Mon Sep 17 00:00:00 2001 From: Leonid Tyurin Date: Wed, 4 Jan 2023 18:57:11 +0400 Subject: [PATCH 09/10] Fix events block range (#134) --- zp-relayer/pool.ts | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/zp-relayer/pool.ts b/zp-relayer/pool.ts index b0ce3548..73bf4c6e 100644 --- a/zp-relayer/pool.ts +++ b/zp-relayer/pool.ts @@ -17,7 +17,7 @@ import { PoolState } from './state/PoolState' import { TxType } from 'zp-memo-parser' import { numToHex, toTxType, truncateHexPrefix, truncateMemoTxPrefix } from './utils/helpers' import { PoolCalldataParser } from './utils/PoolCalldataParser' -import { INIT_ROOT, OUTPLUSONE } from './utils/constants' +import { OUTPLUSONE } from './utils/constants' export interface PoolTx { proof: Proof @@ -130,8 +130,8 @@ class Pool { return lastBlockNumber } - async syncState(fromBlock: number) { - logger.debug('Syncing state; starting from block %d', fromBlock) + async syncState(startBlock: number) { + logger.debug('Syncing state; starting from block %d', startBlock) const localIndex = this.state.getNextIndex() const localRoot = this.state.getMerkleRoot() @@ -154,12 +154,12 @@ class Pool { } const lastBlockNumber = await this.getLastBlockToProcess() - let finishBlock = fromBlock - for (let startBlock = fromBlock; finishBlock <= lastBlockNumber; startBlock = finishBlock) { - finishBlock += config.eventsProcessingBatchSize + let toBlock = startBlock + for (let fromBlock = startBlock; toBlock <= lastBlockNumber + 1; startBlock = toBlock) { + toBlock += config.eventsProcessingBatchSize const events = await getEvents(this.PoolInstance, 'Message', { - fromBlock: startBlock, - toBlock: finishBlock, + fromBlock, + toBlock: toBlock - 1, filter: { index: missedIndices, }, From 8badee90a76a5a50a229424020222e2bdb8511fa Mon Sep 17 00:00:00 2001 From: Leonid Tyurin Date: Fri, 6 Jan 2023 16:31:01 +0400 Subject: [PATCH 10/10] Fix a bug in logic of sync loop (#137) --- zp-relayer/pool.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zp-relayer/pool.ts b/zp-relayer/pool.ts index 73bf4c6e..f6c345dd 100644 --- a/zp-relayer/pool.ts +++ b/zp-relayer/pool.ts @@ -155,7 +155,7 @@ class Pool { const lastBlockNumber = await this.getLastBlockToProcess() let toBlock = startBlock - for (let fromBlock = startBlock; toBlock <= lastBlockNumber + 1; startBlock = toBlock) { + for (let fromBlock = startBlock; toBlock <= lastBlockNumber + 1; fromBlock = toBlock) { toBlock += config.eventsProcessingBatchSize const events = await getEvents(this.PoolInstance, 'Message', { fromBlock,