Skip to content

Commit

Permalink
Fix/doublespend in optimistic (#227)
Browse files Browse the repository at this point in the history
* Attempt to fix doublespend in optimistic nullifiers

* Fix offset margin

* Fix logs
  • Loading branch information
AllFi authored Sep 13, 2024
1 parent 8cde68d commit a662ce3
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 33 deletions.
26 changes: 18 additions & 8 deletions zp-relayer/pool/RelayPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ export class RelayPool extends BasePool<Network> {

// cache transaction locally
const indexerOptimisticIndex = Number((await this.getIndexerInfo()).deltaIndex);
await this.cacheTxLocally(outCommit, txHash, memo, commitIndex);
await this.cacheTxLocally(outCommit, txHash, memo, Date.now());
// start monitoring local cache against the indexer to cleanup already indexed txs
this.startLocalCacheObserver(indexerOptimisticIndex);
}
Expand Down Expand Up @@ -302,16 +302,16 @@ export class RelayPool extends BasePool<Network> {
}
}

protected async cacheTxLocally(commit: string, txHash: string, memo: string, index: number) {
protected async cacheTxLocally(commit: string, txHash: string, memo: string, timestamp: number) {
// store or updating local tx store
// (we should keep sent transaction until the indexer grab them)
const prefixedMemo = buildPrefixedMemo(
commit,
txHash,
memo
);
await this.txStore.add(commit, prefixedMemo, index);
logger.info('Tx has been CACHED locally', { commit, index });
await this.txStore.add(commit, prefixedMemo, timestamp);
logger.info('Tx has been CACHED locally', { commit, timestamp });
}

private async getIndexerInfo() {
Expand Down Expand Up @@ -352,15 +352,20 @@ export class RelayPool extends BasePool<Network> {
}

protected async localCacheObserverWorker(fromIndex: number): Promise<void> {
// we start checking transactions slightly earlier than the current optimistic index
// to cover the case when the indexer was already updated before onSend was called
const OFFSET_MARGIN = 10 * OUTPLUSONE;
fromIndex = Math.max(fromIndex - OFFSET_MARGIN, 0);
logger.debug('Local cache observer worker was started', { fromIndex })
const CACHE_OBSERVE_INTERVAL_MS = 1000; // waiting time between checks
const EXTEND_LIMIT_TO_FETCH = 10; // taking into account non-atomic nature of /info and /transactions/v2 requests
const EXPIRATION_MS = 1000 * 60 * 60 * 24; // we drop entries older than 24 hours, unlikely that they ever will be indexed

while (true) {
const localEntries = Object.entries(await this.txStore.getAll());
let localEntriesCnt = localEntries.length;

if (localEntries.length == 0) {
// stop observer when localCache is empty
break;
}

Expand All @@ -371,12 +376,17 @@ export class RelayPool extends BasePool<Network> {
const indexerCommitments = (await this.getIndexerTxs(fromIndex, limit)).map(tx => BigNumber(tx.slice(65, 129), 16).toString(10));

// find cached commitments in the indexer's response
for (const [commit, {memo, index}] of localEntries) {
if (indexerCommitments.includes(commit) || index < indexerOptimisticIndex) {
logger.info('Deleting cached entry', { commit, index })
for (const [commit, {memo, timestamp}] of localEntries) {
if (indexerCommitments.includes(commit)) {
logger.info('Deleting cached entry', { commit, timestamp })
await this.txStore.remove(commit)
localEntriesCnt--;
} else {
if (Date.now() - timestamp > EXPIRATION_MS) {
logger.error('Cached transaction was not indexed for a long time, removing', { commit, timestamp });
await this.txStore.remove(commit)
localEntriesCnt--;
}
//logger.info('Cached entry is still in the local cache', { commit, index });
}
}
Expand Down
27 changes: 12 additions & 15 deletions zp-relayer/services/relayer/endpoints.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,20 @@ async function getTransactionsV2(req: Request, res: Response, { pool }: PoolInje
}
const indexerTxs: string[] = await response.json()

const lastRequestedIndex = offset + limit * OUTPLUSONE;
const lastReceivedIndex = offset + indexerTxs.length * OUTPLUSONE;
const txStore = (pool as RelayPool).txStore;
const localEntries = await txStore.getAll().then(entries =>
Object.entries(entries)
.filter(([commit, {memo, index}]) => offset <= index && index < lastRequestedIndex)
.sort((a, b) => a[1].timestamp - b[1].timestamp)
);

const indexerCommitments = indexerTxs.map(tx => BigNumber(tx.slice(65, 129), 16).toString(10));
const optimisticTxs: string[] = []
for (const [commit, {memo, index}] of localEntries) {
if (indexerCommitments.includes(commit) || index < lastReceivedIndex) {
for (const [commit, {memo, timestamp}] of localEntries) {
if (indexerTxs.length + optimisticTxs.length >= limit) {
break
}

if (indexerCommitments.includes(commit)) {
// !!! we shouldn't modify local cache from here. Just filter entries to return correct response
//logger.info('Deleting index from optimistic state', { index })
//await txStore.remove(commit)
Expand Down Expand Up @@ -190,16 +192,11 @@ async function relayerInfo(req: Request, res: Response, { pool }: PoolInjection)
const indexerMaxIdx = Math.max(parseInt(info.deltaIndex ?? '0'), parseInt(info.optimisticDeltaIndex ?? '0'))

const txStore = (pool as RelayPool).txStore
const pendingCnt = await txStore.getAll()
.then(keys => {
return Object.entries(keys)
.map(([commit, {memo, index}]) => index)
.filter(i => i >= indexerMaxIdx)
})
.then(a => a.length);

info.pendingDeltaIndex = indexerMaxIdx + pendingCnt * OUTPLUSONE;

const pendingCnt = await txStore.getAll().then(map => Object.keys(map).length)
// This number is not accurate since some txs might be already included in the indexer
// but still be available in the local cache
// This value should be used ONLY as some estimate of the total number of txs including pending ones
info.pendingDeltaIndex = indexerMaxIdx + pendingCnt * OUTPLUSONE;
res.json(info)
}

Expand Down
20 changes: 10 additions & 10 deletions zp-relayer/state/TxStore.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,37 @@
import { hexToNumber, numberToHexPadded } from '@/utils/helpers';
import type { Redis } from 'ioredis'

const INDEX_BYTES = 6;
const TIMESTAMP_BYTES = 6; // enough for another ~8000 years

export class TxStore {
constructor(public name: string, private redis: Redis) {}

async add(commitment: string, memo: string, index: number) {
await this.redis.hset(this.name, { [commitment]: `${numberToHexPadded(index, INDEX_BYTES)}${memo}` })
async add(commitment: string, memo: string, timestamp: number) {
await this.redis.hset(this.name, { [commitment]: `${numberToHexPadded(timestamp, TIMESTAMP_BYTES)}${memo}` })
}

async remove(commitment: string) {
await this.redis.hdel(this.name, commitment)
}

async get(commitment: string): Promise<{memo: string, index: number} | null> {
async get(commitment: string): Promise<{memo: string, timestamp: number} | null> {
const data = await this.redis.hget(this.name, commitment);

return data ? {
memo: data.slice(INDEX_BYTES * 2),
index: hexToNumber(data.slice(0, INDEX_BYTES * 2)),
memo: data.slice(TIMESTAMP_BYTES * 2),
timestamp: hexToNumber(data.slice(0, TIMESTAMP_BYTES * 2)),
} : null;
}

async getAll(): Promise<Record<string, {memo: string, index: number}>> {
async getAll(): Promise<Record<string, {memo: string, timestamp: number}>> {
return this.redis.hgetall(this.name).then(keys => Object.fromEntries(
Object.entries(keys)
.map(([commit, data]) =>
[commit,
{
memo: data.slice(INDEX_BYTES * 2),
index: hexToNumber(data.slice(0, INDEX_BYTES * 2)),
}] as [string, {memo: string, index: number}]
memo: data.slice(TIMESTAMP_BYTES * 2),
timestamp: hexToNumber(data.slice(0, TIMESTAMP_BYTES * 2)),
}] as [string, {memo: string, timestamp: number}]
)
));
}
Expand Down

0 comments on commit a662ce3

Please sign in to comment.