From de9b88cf04a4ffde128b52b40001ef06e5cf4483 Mon Sep 17 00:00:00 2001 From: Evgen Date: Thu, 5 Sep 2024 17:35:11 +0300 Subject: [PATCH] Removing tx from the cache on fail --- zp-relayer/pool/RelayPool.ts | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/zp-relayer/pool/RelayPool.ts b/zp-relayer/pool/RelayPool.ts index e24af42..45c66c8 100644 --- a/zp-relayer/pool/RelayPool.ts +++ b/zp-relayer/pool/RelayPool.ts @@ -2,7 +2,7 @@ import config from '@/configs/relayerConfig' import { logger } from '@/lib/appLogger' import { Network } from '@/lib/network' import { redis } from '@/lib/redisClient' -import { JobState, PoolTx, poolTxQueue, WorkerTxType } from '@/queue/poolTxQueue' +import { BasePoolTx, JobState, PoolTx, poolTxQueue, TxPayload, WorkerTxType } from '@/queue/poolTxQueue' import { TxStore } from '@/state/TxStore' import { ENERGY_SIZE, MOCK_CALLDATA, OUTPLUSONE, PERMIT2_CONTRACT, TOKEN_SIZE, TRANSFER_INDEX_SIZE } from '@/utils/constants' import { @@ -285,13 +285,19 @@ export class RelayPool extends BasePool { async onFailed(txHash: string, jobId: string): Promise { super.onFailed(txHash, jobId); - this.txStore.remove(jobId); const poolJob = await poolTxQueue.getJob(jobId); if (!poolJob) { logger.error('Pool job not found', { jobId }); } else { poolJob.data.transaction.state = JobState.REVERTED; poolJob.data.transaction.txHash = txHash; + + const txPayload = poolJob.data.transaction as TxPayload; + if (txPayload.proof.inputs.length > 2) { + const commit = txPayload.proof.inputs[2]; + this.txStore.remove(commit); + logger.info('Removing local cached transaction', {commit}); + } await poolJob.update(poolJob.data); } } @@ -305,7 +311,7 @@ export class RelayPool extends BasePool { memo ); await this.txStore.add(commit, prefixedMemo, index); - logger.info(`Tx with commit ${commit} has been CACHED locally`); + logger.info(`Tx with commit ${commit} has been CACHED locally`, { index }); } private async getIndexerInfo() { @@ -317,7 +323,7 @@ export class RelayPool extends BasePool { private async assumeNextPendingTxIndex() { const [indexerInfo, localCache] = await Promise.all([this.getIndexerInfo(), this.txStore.getAll()]); - return Number(indexerInfo.optimisticDeltaIndex + Object.keys(localCache).length); + return Number(indexerInfo.optimisticDeltaIndex + Object.keys(localCache).length * OUTPLUSONE); } private async getIndexerTxs(offset: number, limit: number): Promise { @@ -366,7 +372,7 @@ export class RelayPool extends BasePool { // find cached commitments in the indexer's response for (const [commit, {memo, index}] of localEntries) { - if (indexerCommitments.includes(commit)) { + if (indexerCommitments.includes(commit) || index < indexerOptimisticIndex) { logger.info('Deleting cached entry', { commit }) await this.txStore.remove(commit) localEntriesCnt--;