Skip to content

Commit

Permalink
feat: prevent processing same transaction twice (#282)
Browse files Browse the repository at this point in the history
We've seen cases of pool transactions being processed multiple times.
It's possible to happen if RPC node returns outdated transactions from pool.
We only cache seen pool transactions for single block, so if new block is minted
but RPC returns pool before that block it could trigger twice.

Now we cache all processed transactions - this should be managable memory wise
and prevent it from ever happening.
  • Loading branch information
Sekhmet authored Mar 15, 2024
1 parent 3a97296 commit d7ff07b
Showing 1 changed file with 24 additions and 5 deletions.
29 changes: 24 additions & 5 deletions src/providers/starknet/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import {

export class StarknetProvider extends BaseProvider {
private readonly provider: RpcProvider;
private processedPoolTransactions = new Set();
private seenPoolTransactions = new Set();
private processedTransactions = new Set();
private startupLatestBlockNumber: number | undefined;

constructor({ instance, log, abis }: ConstructorParameters<typeof BaseProvider>[0]) {
Expand Down Expand Up @@ -107,7 +108,7 @@ export class StarknetProvider extends BaseProvider {
this.log.info({ blockNumber: block.block_number }, 'handling block');

const txsToCheck = block.transactions.filter(
tx => !this.processedPoolTransactions.has(tx.transaction_hash)
tx => !this.seenPoolTransactions.has(tx.transaction_hash)
);

for (const [i, tx] of txsToCheck.entries()) {
Expand All @@ -120,15 +121,15 @@ export class StarknetProvider extends BaseProvider {
);
}

this.processedPoolTransactions.clear();
this.seenPoolTransactions.clear();

this.log.debug({ blockNumber: block.block_number }, 'handling block done');
}

private async handlePool(txs: PendingTransaction[], eventsMap: EventsMap, blockNumber: number) {
this.log.info('handling pool');

const txsToCheck = txs.filter(tx => !this.processedPoolTransactions.has(tx.transaction_hash));
const txsToCheck = txs.filter(tx => !this.seenPoolTransactions.has(tx.transaction_hash));

for (const [i, tx] of txsToCheck.entries()) {
await this.handleTx(
Expand All @@ -139,7 +140,7 @@ export class StarknetProvider extends BaseProvider {
tx.transaction_hash ? eventsMap[tx.transaction_hash] || [] : []
);

this.processedPoolTransactions.add(tx.transaction_hash);
this.seenPoolTransactions.add(tx.transaction_hash);
}

this.log.info('handling pool done');
Expand All @@ -154,6 +155,12 @@ export class StarknetProvider extends BaseProvider {
) {
this.log.debug({ txIndex }, 'handling transaction');

if (this.processedTransactions.has(tx.transaction_hash)) {
this.log.warn({ hash: tx.transaction_hash }, 'transaction already processed');
return;
}

let wasTransactionProcessed = false;
const writerParams = await this.instance.getWriterParams();

if (this.instance.config.tx_fn) {
Expand All @@ -163,6 +170,8 @@ export class StarknetProvider extends BaseProvider {
tx,
...writerParams
});

wasTransactionProcessed = true;
}

if (this.instance.config.global_events) {
Expand Down Expand Up @@ -191,6 +200,8 @@ export class StarknetProvider extends BaseProvider {
eventIndex,
...writerParams
});

wasTransactionProcessed = true;
}
}

Expand Down Expand Up @@ -218,6 +229,8 @@ export class StarknetProvider extends BaseProvider {
tx,
...writerParams
});

wasTransactionProcessed = true;
}

for (const [eventIndex, event] of events.entries()) {
Expand Down Expand Up @@ -251,11 +264,17 @@ export class StarknetProvider extends BaseProvider {
eventIndex,
...writerParams
});

wasTransactionProcessed = true;
}
}
}
}

if (wasTransactionProcessed) {
this.processedTransactions.add(tx.transaction_hash);
}

const nextSources = this.instance.getCurrentSources(blockNumber);
const newSources = nextSources.filter(
nextSource => !lastSources.find(lastSource => lastSource.contract === nextSource.contract)
Expand Down

0 comments on commit d7ff07b

Please sign in to comment.