diff --git a/zp-relayer/endpoints.ts b/zp-relayer/endpoints.ts index 0156c614..bae8ab2e 100644 --- a/zp-relayer/endpoints.ts +++ b/zp-relayer/endpoints.ts @@ -11,6 +11,7 @@ import { checkSendTransactionErrors, checkSendTransactionsErrors, } from './validation/validation' +import { sentTxQueue, SentTxState } from './queue/sentTxQueue' async function sendTransactions(req: Request, res: Response, next: NextFunction) { const errors = checkSendTransactionsErrors(req.body) @@ -108,22 +109,84 @@ async function getTransactionsV2(req: Request, res: Response, next: NextFunction } async function getJob(req: Request, res: Response) { + enum JobStatus { + WAITING = 'waiting', + FAILED = 'failed', + SENT = 'sent', + REVERTED = 'reverted', + COMPLETED = 'completed', + } + + interface GetJobResponse { + resolvedJobId: string + createdOn: number + failedReason: null | string + finishedOn: null | number + state: JobStatus + txHash: null | string + } + const jobId = req.params.id - const job = await poolTxQueue.getJob(jobId) - if (job) { - const state = await job.getState() - const txHash = job.returnvalue - const failedReason = job.failedReason - const createdOn = job.timestamp - const finishedOn = job.finishedOn - - res.json({ - state, - txHash, - failedReason, - createdOn, - finishedOn, - }) + + async function getPoolJobState(requestedJobId: string): Promise { + const jobId = await pool.state.jobIdsMapping.get(requestedJobId) + const job = await poolTxQueue.getJob(jobId) + if (!job) return null + + // Default result object + let result: GetJobResponse = { + resolvedJobId: jobId, + createdOn: job.timestamp, + failedReason: null, + finishedOn: null, + state: JobStatus.WAITING, + txHash: null, + } + + const poolJobState = await job.getState() + if (poolJobState === 'completed') { + // Transaction was included in optimistic state, waiting to be mined + const sentJobId = job.returnvalue[0][1] + const sentJob = await sentTxQueue.getJob(sentJobId) + // Should not happen here, but need to verify to be sure + if (!sentJob) throw new Error('Sent job not found') + + const sentJobState = await sentJob.getState() + if (sentJobState === 'waiting' || sentJobState === 'active' || sentJobState === 'delayed') { + // Transaction is in re-send loop + const txHash = sentJob.data.prevAttempts.at(-1)?.[0] + result.state = JobStatus.SENT + result.txHash = txHash || null + } else if (sentJobState === 'completed') { + const [txState, txHash] = sentJob.returnvalue + if (txState === SentTxState.MINED) { + // Transaction mined successfully + result.state = JobStatus.COMPLETED + result.txHash = txHash + result.finishedOn = sentJob.finishedOn || null + } else if (txState === SentTxState.REVERT) { + // Transaction reverted + result.state = JobStatus.REVERTED + result.txHash = txHash + result.finishedOn = sentJob.finishedOn || null + } + } + } else if (poolJobState === 'failed') { + // Either validation or tx sendind failed + result.state = JobStatus.FAILED + result.failedReason = job.failedReason + result.finishedOn = job.finishedOn || null + } + // Other states mean that transaction is either waiting in queue + // or being processed by worker + // So, no need to update `result` object + + return result + } + + const jobState = await getPoolJobState(jobId) + if (jobState) { + res.json(jobState) } else { res.json(`Job ${jobId} not found`) } diff --git a/zp-relayer/queue/sentTxQueue.ts b/zp-relayer/queue/sentTxQueue.ts index 1436db4e..fa6df919 100644 --- a/zp-relayer/queue/sentTxQueue.ts +++ b/zp-relayer/queue/sentTxQueue.ts @@ -7,10 +7,11 @@ import { TxPayload } from './poolTxQueue' export type SendAttempt = [string, GasPriceValue] export interface SentTxPayload { + poolJobId: string root: string outCommit: string commitIndex: number - prefixedMemo: string + truncatedMemo: string txConfig: TransactionConfig nullifier: string txPayload: TxPayload diff --git a/zp-relayer/state/PoolState.ts b/zp-relayer/state/PoolState.ts index 66b1aa6b..9e98d766 100644 --- a/zp-relayer/state/PoolState.ts +++ b/zp-relayer/state/PoolState.ts @@ -4,18 +4,23 @@ import { OUTPLUSONE } from '@/utils/constants' import { MerkleTree, TxStorage, MerkleProof, Constants, Helpers } from 'libzkbob-rs-node' import { NullifierSet } from './nullifierSet' import { RootSet } from './rootSet' +import { JobIdsMapping } from './jobIdsMapping' export class PoolState { private tree: MerkleTree private txs: TxStorage public nullifiers: NullifierSet public roots: RootSet + public jobIdsMapping: JobIdsMapping constructor(private name: string, redis: Redis, path: string) { this.tree = new MerkleTree(`${path}/${name}Tree.db`) this.txs = new TxStorage(`${path}/${name}Txs.db`) this.nullifiers = new NullifierSet(`${name}-nullifiers`, redis) this.roots = new RootSet(`${name}-roots`, redis) + // This structure can be shared among different pool states + // So, use constant name + this.jobIdsMapping = new JobIdsMapping('job-id-mapping', redis) } getVirtualTreeProofInputs(outCommit: string, transferNum?: number) { diff --git a/zp-relayer/state/jobIdsMapping.ts b/zp-relayer/state/jobIdsMapping.ts new file mode 100644 index 00000000..70337193 --- /dev/null +++ b/zp-relayer/state/jobIdsMapping.ts @@ -0,0 +1,28 @@ +import type { Redis } from 'ioredis' + +export class JobIdsMapping { + constructor(public name: string, private redis: Redis) {} + + async add(mapping: Record) { + if (Object.keys(mapping).length === 0) return + await this.redis.hset(this.name, mapping) + } + + async remove(indices: string[]) { + if (indices.length === 0) return + await this.redis.hdel(this.name, ...indices) + } + + async get(id: string): Promise { + const mappedId = await this.redis.hget(this.name, id) + if (mappedId) { + return await this.get(mappedId) + } else { + return id + } + } + + async clear() { + await this.redis.del(this.name) + } +} diff --git a/zp-relayer/utils/helpers.ts b/zp-relayer/utils/helpers.ts index 3da36a52..2efe1716 100644 --- a/zp-relayer/utils/helpers.ts +++ b/zp-relayer/utils/helpers.ts @@ -4,6 +4,7 @@ import { logger } from '@/services/appLogger' import { SnarkProof } from 'libzkbob-rs-node' import { TxType } from 'zp-memo-parser' import type { Mutex } from 'async-mutex' +import { TxValidationError } from '@/validateTx' const S_MASK = toBN('0x7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff') const S_MAX = toBN('0x7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF5D576E7357A4501DDFE92F46681B20A0') @@ -91,6 +92,10 @@ export function flattenProof(p: SnarkProof): string { .join('') } +export function buildPrefixedMemo(outCommit: string, txHash: string, truncatedMemo: string) { + return numToHex(toBN(outCommit)).concat(txHash.slice(2)).concat(truncatedMemo) +} + export async function setIntervalAndRun(f: () => Promise | void, interval: number) { const handler = setInterval(f, interval) await f() @@ -112,7 +117,11 @@ export async function withErrorLog(f: () => Promise): Promise { try { return await f() } catch (e) { - logger.error('Found error: %s', (e as Error).message) + if (e instanceof TxValidationError) { + logger.warn('Validation error: %s', (e as Error).message) + } else { + logger.error('Found error: %s', (e as Error).message) + } throw e } } diff --git a/zp-relayer/validateTx.ts b/zp-relayer/validateTx.ts index 624e7936..bd7ee24b 100644 --- a/zp-relayer/validateTx.ts +++ b/zp-relayer/validateTx.ts @@ -19,11 +19,16 @@ const tokenContract = new web3.eth.Contract(TokenAbi as AbiItem[], config.tokenA const ZERO = toBN(0) +export class TxValidationError extends Error { + constructor(message: string) { + super(message) + } +} + type OptionError = Error | null export async function checkAssertion(f: () => Promise | OptionError) { const err = await f() if (err) { - logger.warn('Assertion error: %s', err.message) throw err } } @@ -36,7 +41,7 @@ export async function checkBalance(address: string, minBalance: string) { const balance = await tokenContract.methods.balanceOf(address).call() const res = toBN(balance).gte(toBN(minBalance)) if (!res) { - return new Error('Not enough balance for deposit') + return new TxValidationError('Not enough balance for deposit') } return null } @@ -48,7 +53,7 @@ export function checkCommitment(treeProof: Proof, txProof: Proof) { export function checkProof(txProof: Proof, verify: (p: SnarkProof, i: Array) => boolean) { const res = verify(txProof.proof, txProof.inputs) if (!res) { - return new Error('Incorrect snark proof') + return new TxValidationError('Incorrect snark proof') } return null } @@ -56,12 +61,12 @@ export function checkProof(txProof: Proof, verify: (p: SnarkProof, i: Array config.maxFaucet) { - return new Error('Native amount too high') + return new TxValidationError('Native amount too high') } return null } @@ -97,7 +102,7 @@ export function checkNativeAmount(nativeAmount: BN | null) { export function checkFee(fee: BN) { logger.debug(`Fee: ${fee}`) if (fee.lt(config.relayerFee)) { - return new Error('Fee too low') + return new TxValidationError('Fee too low') } return null } @@ -111,7 +116,7 @@ export function checkDeadline(signedDeadline: BN, threshold: number) { // Check native amount (relayer faucet) const currentTimestamp = new BN(Math.floor(Date.now() / 1000)) if (signedDeadline <= currentTimestamp.addn(threshold)) { - return new Error(`Deadline is expired`) + return new TxValidationError(`Deadline is expired`) } return null } @@ -119,20 +124,20 @@ export function checkDeadline(signedDeadline: BN, threshold: number) { export function checkLimits(limits: Limits, amount: BN) { if (amount.gt(toBN(0))) { if (amount.gt(limits.depositCap)) { - return new Error('Single deposit cap exceeded') + return new TxValidationError('Single deposit cap exceeded') } if (limits.tvl.add(amount).gte(limits.tvlCap)) { - return new Error('Tvl cap exceeded') + return new TxValidationError('Tvl cap exceeded') } if (limits.dailyUserDepositCapUsage.add(amount).gt(limits.dailyUserDepositCap)) { - return new Error('Daily user deposit cap exceeded') + return new TxValidationError('Daily user deposit cap exceeded') } if (limits.dailyDepositCapUsage.add(amount).gt(limits.dailyDepositCap)) { - return new Error('Daily deposit cap exceeded') + return new TxValidationError('Daily deposit cap exceeded') } } else { if (limits.dailyWithdrawalCapUsage.sub(amount).gt(limits.dailyWithdrawalCap)) { - return new Error('Daily withdrawal cap exceeded') + return new TxValidationError('Daily withdrawal cap exceeded') } } return null @@ -140,7 +145,7 @@ export function checkLimits(limits: Limits, amount: BN) { async function checkDepositEnoughBalance(address: string, requiredTokenAmount: BN) { if (requiredTokenAmount.lte(toBN(0))) { - throw new Error('Requested balance check for token amount <= 0') + throw new TxValidationError('Requested balance check for token amount <= 0') } return checkBalance(address, requiredTokenAmount.toString(10)) @@ -156,7 +161,7 @@ async function getRecoveredAddress( // Signature without `0x` prefix, size is 64*2=128 await checkAssertion(() => { if (depositSignature !== null && checkSize(depositSignature, 128)) return null - return new Error('Invalid deposit signature') + return new TxValidationError('Invalid deposit signature size') }) const nullifier = '0x' + numToHex(toBN(proofNullifier)) const sig = unpackSignature(depositSignature as string) @@ -179,8 +184,11 @@ async function getRecoveredAddress( salt: nullifier, } recoveredAddress = recoverSaltedPermit(message, sig) + if (recoveredAddress.toLowerCase() !== owner.toLowerCase()) { + throw new TxValidationError(`Invalid deposit signer; Restored: ${recoveredAddress}; Expected: ${owner}`) + } } else { - throw new Error('Unsupported txtype') + throw new TxValidationError('Unsupported txtype') } return recoveredAddress @@ -207,7 +215,7 @@ async function checkRoot( } if (root !== proofRoot) { - return new Error(`Incorrect root at index ${indexStr}: given ${proofRoot}, expected ${root}`) + return new TxValidationError(`Incorrect root at index ${indexStr}: given ${proofRoot}, expected ${root}`) } // If recieved correct root from contract update cache (only confirmed state) diff --git a/zp-relayer/workers/poolTxWorker.ts b/zp-relayer/workers/poolTxWorker.ts index 6cebddb2..cebfa042 100644 --- a/zp-relayer/workers/poolTxWorker.ts +++ b/zp-relayer/workers/poolTxWorker.ts @@ -5,7 +5,7 @@ import { logger } from '@/services/appLogger' import { PoolTxResult, TxPayload } from '@/queue/poolTxQueue' import { TX_QUEUE_NAME, OUTPLUSONE } from '@/utils/constants' import { readNonce, updateField, RelayerKeys, updateNonce } from '@/utils/redisFields' -import { numToHex, truncateMemoTxPrefix, withErrorLog, withMutex } from '@/utils/helpers' +import { buildPrefixedMemo, truncateMemoTxPrefix, withErrorLog, withMutex } from '@/utils/helpers' import { signTransaction, sendTransaction } from '@/tx/signAndSend' import { Pool, pool } from '@/pool' import { sentTxQueue } from '@/queue/sentTxQueue' @@ -78,7 +78,7 @@ export async function createPoolTxWorker( await updateNonce(++nonce) - logger.debug(`${logPrefix} TX hash ${txHash}`) + logger.info(`${logPrefix} TX hash ${txHash}`) await updateField(RelayerKeys.TRANSFER_NUM, commitIndex * OUTPLUSONE) @@ -86,7 +86,7 @@ export async function createPoolTxWorker( const outCommit = getTxProofField(txProof, 'out_commit') const truncatedMemo = truncateMemoTxPrefix(rawMemo, txType) - const prefixedMemo = numToHex(toBN(outCommit)).concat(txHash.slice(2)).concat(truncatedMemo) + const prefixedMemo = buildPrefixedMemo(outCommit, txHash, truncatedMemo) pool.optimisticState.updateState(commitIndex, outCommit, prefixedMemo) logger.debug('Adding nullifier %s to OS', nullifier) @@ -100,10 +100,11 @@ export async function createPoolTxWorker( const sentJob = await sentTxQueue.add( txHash, { + poolJobId: job.id as string, root: rootAfter, outCommit, commitIndex, - prefixedMemo, + truncatedMemo, nullifier, txConfig, txPayload: tx, diff --git a/zp-relayer/workers/sentTxWorker.ts b/zp-relayer/workers/sentTxWorker.ts index dce32318..2025b2f7 100644 --- a/zp-relayer/workers/sentTxWorker.ts +++ b/zp-relayer/workers/sentTxWorker.ts @@ -7,7 +7,7 @@ import { pool } from '@/pool' import { web3 } from '@/services/web3' import { logger } from '@/services/appLogger' import { GasPrice, EstimationType, chooseGasPriceOptions, addExtraGasPrice } from '@/services/gas-price' -import { withErrorLog, withLoop, withMutex } from '@/utils/helpers' +import { buildPrefixedMemo, withErrorLog, withLoop, withMutex } from '@/utils/helpers' import { OUTPLUSONE, SENT_TX_QUEUE_NAME } from '@/utils/constants' import { isGasPriceError, isSameTransactionError } from '@/utils/web3Errors' import { SendAttempt, SentTxPayload, sentTxQueue, SentTxResult, SentTxState } from '@/queue/sentTxQueue' @@ -81,7 +81,7 @@ export async function createSentTxWorker(gasPrice: Gas const sentTxWorkerProcessor = async (job: Job) => { const logPrefix = `SENT WORKER: Job ${job.id}:` logger.info('%s processing...', logPrefix) - const { prefixedMemo, commitIndex, outCommit, nullifier, root, prevAttempts, txConfig } = job.data + const { truncatedMemo, commitIndex, outCommit, nullifier, root, prevAttempts, txConfig } = job.data // Any thrown web3 error will re-trigger re-send loop iteration const [tx, shouldReprocess] = await checkMined(prevAttempts, txConfig.nonce as number) @@ -99,9 +99,12 @@ export async function createSentTxWorker(gasPrice: Gas // Tx mined if (tx.status) { // Successful - logger.debug('%s Transaction %s was successfully mined at block %s', logPrefix, txHash, tx.blockNumber) + logger.info('%s Transaction %s was successfully mined at block %s', logPrefix, txHash, tx.blockNumber) + const prefixedMemo = buildPrefixedMemo(outCommit, txHash, truncatedMemo) pool.state.updateState(commitIndex, outCommit, prefixedMemo) + // Update tx hash in optimistic state tx db + pool.optimisticState.addTx(commitIndex * OUTPLUSONE, Buffer.from(prefixedMemo, 'hex')) // Add nullifer to confirmed state and remove from optimistic one logger.info('Adding nullifier %s to PS', nullifier) @@ -140,18 +143,26 @@ export async function createSentTxWorker(gasPrice: Gas // Validation of these jobs will be done in `poolTxWorker` const waitingJobIds = [] const reschedulePromises = [] + const newPoolJobIdMapping: Record = {} const waitingJobs = await sentTxQueue.getJobs(['delayed', 'waiting']) for (let wj of waitingJobs) { // One of the jobs can be undefined, so we need to check it // https://github.com/taskforcesh/bullmq/blob/master/src/commands/addJob-8.lua#L142-L143 if (!wj?.id) continue waitingJobIds.push(wj.id) - reschedulePromises.push(poolTxQueue.add(txHash, [wj.data.txPayload]).then(j => j.id as string)) + const reschedulePromise = poolTxQueue.add(txHash, [wj.data.txPayload]).then(j => { + const newPoolJobId = j.id as string + newPoolJobIdMapping[wj.data.poolJobId] = newPoolJobId + return newPoolJobId + }) + reschedulePromises.push(reschedulePromise) } logger.info('Marking ids %j as failed', waitingJobIds) await markFailed(redis, waitingJobIds) logger.info('%s Rescheduling %d jobs to process...', logPrefix, waitingJobs.length) const rescheduledIds = await Promise.all(reschedulePromises) + logger.info('%s Update pool job id mapping %j ...', logPrefix, newPoolJobIdMapping) + await pool.state.jobIdsMapping.add(newPoolJobIdMapping) return [SentTxState.REVERT, txHash, rescheduledIds] as SentTxResult } @@ -174,6 +185,7 @@ export async function createSentTxWorker(gasPrice: Gas job.data.prevAttempts.push([newTxHash, newGasPrice]) try { await sendTransaction(web3, rawTransaction) + logger.info(`${logPrefix} Re-send tx; New hash: ${newTxHash}`) } catch (e) { const err = e as Error logger.warn('%s Tx resend failed for %s: %s', logPrefix, lastHash, err.message) @@ -188,6 +200,10 @@ export async function createSentTxWorker(gasPrice: Gas throw e } + // Overwrite old tx recorded in optimistic state db with new tx hash + const prefixedMemo = buildPrefixedMemo(outCommit, newTxHash, truncatedMemo) + pool.optimisticState.addTx(commitIndex * OUTPLUSONE, Buffer.from(prefixedMemo, 'hex')) + // Update job await job.update({ ...job.data,