Skip to content

Commit

Permalink
Fix issues
Browse files Browse the repository at this point in the history
  • Loading branch information
EvgenKor committed Sep 5, 2024
1 parent de9b88c commit 96f4701
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 12 deletions.
18 changes: 10 additions & 8 deletions zp-relayer/pool/RelayPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import config from '@/configs/relayerConfig'
import { logger } from '@/lib/appLogger'
import { Network } from '@/lib/network'
import { redis } from '@/lib/redisClient'
import { BasePoolTx, JobState, PoolTx, poolTxQueue, TxPayload, WorkerTxType } from '@/queue/poolTxQueue'
import { JobState, PoolTx, poolTxQueue, TxPayload, WorkerTxType } from '@/queue/poolTxQueue'
import { TxStore } from '@/state/TxStore'
import { ENERGY_SIZE, MOCK_CALLDATA, OUTPLUSONE, PERMIT2_CONTRACT, TOKEN_SIZE, TRANSFER_INDEX_SIZE } from '@/utils/constants'
import {
Expand Down Expand Up @@ -44,6 +44,7 @@ import { bytesToHex, toBN } from 'web3-utils'
import { getTxDataProverV2, TxDataProverV2, TxType } from 'zp-memo-parser'
import { BasePool } from './BasePool'
import { OptionalChecks, PermitConfig, ProcessResult } from './types'
import BigNumber from 'bignumber.js'

const ZERO = toBN(0)

Expand Down Expand Up @@ -262,9 +263,10 @@ export class RelayPool extends BasePool<Network> {
}

// cache transaction locally
const indexerOptimisticIndex = Number((await this.getIndexerInfo()).deltaIndex);
await this.cacheTxLocally(outCommit, txHash, memo, commitIndex);
// start monitoring local cache against the indexer to cleanup already indexed txs
this.startLocalCacheObserver(commitIndex);
this.startLocalCacheObserver(indexerOptimisticIndex);
}

async onConfirmed(res: ProcessResult<RelayPool>, txHash: string, callback?: () => Promise<void>, jobId?: string): Promise<void> {
Expand All @@ -277,8 +279,6 @@ export class RelayPool extends BasePool<Network> {
poolJob.data.transaction.state = JobState.COMPLETED;
poolJob.data.transaction.txHash = txHash;
await poolJob.update(poolJob.data);

await this.cacheTxLocally(res.outCommit, txHash, res.memo, res.commitIndex);
}
}
}
Expand Down Expand Up @@ -311,7 +311,7 @@ export class RelayPool extends BasePool<Network> {
memo
);
await this.txStore.add(commit, prefixedMemo, index);
logger.info(`Tx with commit ${commit} has been CACHED locally`, { index });
logger.info('Tx has been CACHED locally', { commit, index });
}

private async getIndexerInfo() {
Expand Down Expand Up @@ -368,22 +368,24 @@ export class RelayPool extends BasePool<Network> {
try {
const indexerOptimisticIndex = Number((await this.getIndexerInfo()).optimisticDeltaIndex);
const limit = (indexerOptimisticIndex - fromIndex) / OUTPLUSONE + localEntries.length + EXTEND_LIMIT_TO_FETCH;
const indexerCommitments = (await this.getIndexerTxs(fromIndex, limit)).map(tx => tx.slice(65, 129));
const indexerCommitments = (await this.getIndexerTxs(fromIndex, limit)).map(tx => BigNumber(tx.slice(65, 129), 16).toString(10));

// find cached commitments in the indexer's response
for (const [commit, {memo, index}] of localEntries) {
if (indexerCommitments.includes(commit) || index < indexerOptimisticIndex) {
logger.info('Deleting cached entry', { commit })
logger.info('Deleting cached entry', { commit, index })
await this.txStore.remove(commit)
localEntriesCnt--;
} else {
//logger.info('Cached entry is still in the local cache', { commit, index });
}
}
} catch(e) {
logger.error(`Cannot check local cache against indexer : ${(e as Error).message}`);
}

if (localEntriesCnt > 0) {
sleep(CACHE_OBSERVE_INTERVAL_MS);
await sleep(CACHE_OBSERVE_INTERVAL_MS);
}
}

Expand Down
2 changes: 1 addition & 1 deletion zp-relayer/services/indexer/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ function getRoot(req: Request, res: Response, { pool }: PoolInjection) {
validateBatch([[checkGetRoot, req.query]])

const index = req.query.index as unknown as number
const root = pool.state.getMerkleRootAt(index)
const root = pool.state.getMerkleRootAt(index) ?? pool.optimisticState.getMerkleRootAt(index)

res.json({ root })
}
8 changes: 5 additions & 3 deletions zp-relayer/services/relayer/endpoints.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
validateCountryIP,
ValidationFunction,
} from '../../validation/api/validation'
import BigNumber from 'bignumber.js'

interface PoolInjection {
pool: BasePool
Expand Down Expand Up @@ -98,16 +99,17 @@ async function getTransactionsV2(req: Request, res: Response, { pool }: PoolInje
const indexerTxs: string[] = await response.json()

const lastRequestedIndex = offset + limit * OUTPLUSONE;
const lastReceivedIndex = offset + indexerTxs.length * OUTPLUSONE;
const txStore = (pool as RelayPool).txStore;
const localEntries = await txStore.getAll().then(entries =>
Object.entries(entries)
.filter(([commit, {memo, index}]) => offset <= index && index < lastRequestedIndex)
);

const indexerCommitments = indexerTxs.map(tx => tx.slice(65, 129));
const indexerCommitments = indexerTxs.map(tx => BigNumber(tx.slice(65, 129), 16).toString(10));
const optimisticTxs: string[] = []
for (const [commit, {memo, index}] of localEntries) {
if (indexerCommitments.includes(commit)) {
if (indexerCommitments.includes(commit) || index < lastReceivedIndex) {
// !!! we shouldn't modify local cache from here. Just filter entries to return correct response
//logger.info('Deleting index from optimistic state', { index })
//await txStore.remove(commit)
Expand Down Expand Up @@ -192,7 +194,7 @@ async function relayerInfo(req: Request, res: Response, { pool }: PoolInjection)
.then(keys => {
return Object.entries(keys)
.map(([commit, {memo, index}]) => index)
.filter(i => indexerMaxIdx <= i)
.filter(i => i >= indexerMaxIdx)
})
.then(a => a.length);

Expand Down

0 comments on commit 96f4701

Please sign in to comment.