From 92e63f7c3ccb484856328cafb2c719ecabe537e0 Mon Sep 17 00:00:00 2001 From: Alexander Filippov Date: Tue, 10 Sep 2024 11:29:07 +0300 Subject: [PATCH] Attempt to fix doublespend in optimistic nullifiers --- zp-relayer/pool/RelayPool.ts | 20 +++++++++++++----- zp-relayer/services/relayer/endpoints.ts | 27 +++++++++++------------- zp-relayer/state/TxStore.ts | 20 +++++++++--------- 3 files changed, 37 insertions(+), 30 deletions(-) diff --git a/zp-relayer/pool/RelayPool.ts b/zp-relayer/pool/RelayPool.ts index f956825..c030d0d 100644 --- a/zp-relayer/pool/RelayPool.ts +++ b/zp-relayer/pool/RelayPool.ts @@ -264,7 +264,7 @@ export class RelayPool extends BasePool { // cache transaction locally const indexerOptimisticIndex = Number((await this.getIndexerInfo()).deltaIndex); - await this.cacheTxLocally(outCommit, txHash, memo, commitIndex); + await this.cacheTxLocally(outCommit, txHash, memo, Date.now()); // start monitoring local cache against the indexer to cleanup already indexed txs this.startLocalCacheObserver(indexerOptimisticIndex); } @@ -352,15 +352,20 @@ export class RelayPool extends BasePool { } protected async localCacheObserverWorker(fromIndex: number): Promise { + // we start checking transactions slightly earlier than the current optimistic index + // to cover the case when the indexer was already updated before onSend was called + const OFFSET_MARGIN = 10; + fromIndex = Math.max(fromIndex - OFFSET_MARGIN, 0); logger.debug('Local cache observer worker was started', { fromIndex }) const CACHE_OBSERVE_INTERVAL_MS = 1000; // waiting time between checks const EXTEND_LIMIT_TO_FETCH = 10; // taking into account non-atomic nature of /info and /transactions/v2 requests + const EXPIRATION_MS = 1000 * 60 * 60 * 24; // we drop entries older than 24 hours, unlikely that they ever will be indexed + while (true) { const localEntries = Object.entries(await this.txStore.getAll()); let localEntriesCnt = localEntries.length; if (localEntries.length == 0) { - // stop observer when localCache is empty break; } @@ -371,12 +376,17 @@ export class RelayPool extends BasePool { const indexerCommitments = (await this.getIndexerTxs(fromIndex, limit)).map(tx => BigNumber(tx.slice(65, 129), 16).toString(10)); // find cached commitments in the indexer's response - for (const [commit, {memo, index}] of localEntries) { - if (indexerCommitments.includes(commit) || index < indexerOptimisticIndex) { - logger.info('Deleting cached entry', { commit, index }) + for (const [commit, {memo, timestamp}] of localEntries) { + if (indexerCommitments.includes(commit)) { + logger.info('Deleting cached entry', { commit, timestamp }) await this.txStore.remove(commit) localEntriesCnt--; } else { + if (Date.now() - timestamp > EXPIRATION_MS) { + logger.error('Cached transaction was not indexed for a long time, removing', { commit, timestamp }); + await this.txStore.remove(commit) + localEntriesCnt--; + } //logger.info('Cached entry is still in the local cache', { commit, index }); } } diff --git a/zp-relayer/services/relayer/endpoints.ts b/zp-relayer/services/relayer/endpoints.ts index ede301d..d893c93 100644 --- a/zp-relayer/services/relayer/endpoints.ts +++ b/zp-relayer/services/relayer/endpoints.ts @@ -98,18 +98,20 @@ async function getTransactionsV2(req: Request, res: Response, { pool }: PoolInje } const indexerTxs: string[] = await response.json() - const lastRequestedIndex = offset + limit * OUTPLUSONE; - const lastReceivedIndex = offset + indexerTxs.length * OUTPLUSONE; const txStore = (pool as RelayPool).txStore; const localEntries = await txStore.getAll().then(entries => Object.entries(entries) - .filter(([commit, {memo, index}]) => offset <= index && index < lastRequestedIndex) + .sort((a, b) => a[1].timestamp - b[1].timestamp) ); const indexerCommitments = indexerTxs.map(tx => BigNumber(tx.slice(65, 129), 16).toString(10)); const optimisticTxs: string[] = [] - for (const [commit, {memo, index}] of localEntries) { - if (indexerCommitments.includes(commit) || index < lastReceivedIndex) { + for (const [commit, {memo, timestamp}] of localEntries) { + if (indexerTxs.length + optimisticTxs.length >= limit) { + break + } + + if (indexerCommitments.includes(commit)) { // !!! we shouldn't modify local cache from here. Just filter entries to return correct response //logger.info('Deleting index from optimistic state', { index }) //await txStore.remove(commit) @@ -190,16 +192,11 @@ async function relayerInfo(req: Request, res: Response, { pool }: PoolInjection) const indexerMaxIdx = Math.max(parseInt(info.deltaIndex ?? '0'), parseInt(info.optimisticDeltaIndex ?? '0')) const txStore = (pool as RelayPool).txStore - const pendingCnt = await txStore.getAll() - .then(keys => { - return Object.entries(keys) - .map(([commit, {memo, index}]) => index) - .filter(i => i >= indexerMaxIdx) - }) - .then(a => a.length); - - info.pendingDeltaIndex = indexerMaxIdx + pendingCnt * OUTPLUSONE; - + const pendingCnt = await txStore.getAll().then(map => Object.keys(map).length) + // This number is not accurate since some txs might be already included in the indexer + // but still be available in the local cache + // This value should be used ONLY as some estimate of the total number of txs including pending ones + info.pendingDeltaIndex = indexerMaxIdx + pendingCnt * OUTPLUSONE; res.json(info) } diff --git a/zp-relayer/state/TxStore.ts b/zp-relayer/state/TxStore.ts index faccb10..05f802b 100644 --- a/zp-relayer/state/TxStore.ts +++ b/zp-relayer/state/TxStore.ts @@ -1,37 +1,37 @@ import { hexToNumber, numberToHexPadded } from '@/utils/helpers'; import type { Redis } from 'ioredis' -const INDEX_BYTES = 6; +const TIMESTAMP_BYTES = 6; // enough for another ~8000 years export class TxStore { constructor(public name: string, private redis: Redis) {} - async add(commitment: string, memo: string, index: number) { - await this.redis.hset(this.name, { [commitment]: `${numberToHexPadded(index, INDEX_BYTES)}${memo}` }) + async add(commitment: string, memo: string, timestamp: number) { + await this.redis.hset(this.name, { [commitment]: `${numberToHexPadded(timestamp, TIMESTAMP_BYTES)}${memo}` }) } async remove(commitment: string) { await this.redis.hdel(this.name, commitment) } - async get(commitment: string): Promise<{memo: string, index: number} | null> { + async get(commitment: string): Promise<{memo: string, timestamp: number} | null> { const data = await this.redis.hget(this.name, commitment); return data ? { - memo: data.slice(INDEX_BYTES * 2), - index: hexToNumber(data.slice(0, INDEX_BYTES * 2)), + memo: data.slice(TIMESTAMP_BYTES * 2), + timestamp: hexToNumber(data.slice(0, TIMESTAMP_BYTES * 2)), } : null; } - async getAll(): Promise> { + async getAll(): Promise> { return this.redis.hgetall(this.name).then(keys => Object.fromEntries( Object.entries(keys) .map(([commit, data]) => [commit, { - memo: data.slice(INDEX_BYTES * 2), - index: hexToNumber(data.slice(0, INDEX_BYTES * 2)), - }] as [string, {memo: string, index: number}] + memo: data.slice(TIMESTAMP_BYTES * 2), + timestamp: hexToNumber(data.slice(0, TIMESTAMP_BYTES * 2)), + }] as [string, {memo: string, timestamp: number}] ) )); }