diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 9b521f72..28a30e60 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -17,6 +17,8 @@ | GAS_PRICE_SPEED_TYPE | This parameter specifies the desirable transaction speed | `instant` / `fast` / `standard` / `low` | | GAS_PRICE_FACTOR | A value that will multiply the gas price of the oracle to convert it to gwei. If the oracle API returns gas prices in gwei then this can be set to `1`. Also, it could be used to intentionally pay more gas than suggested by the oracle to guarantee the transaction verification. E.g. `1.25` or `1.5`. | integer | | GAS_PRICE_UPDATE_INTERVAL | Interval in milliseconds used to get the updated gas price value using specified estimation type | integer | +| START_BLOCK | The block number used to start searching for events when the relayer instance is run for the first time | integer +| EVENTS_PROCESSING_BATCH_SIZE | Batch size for one `eth_getLogs` request when reprocessing old logs. Defaults to `10000` | integer | RELAYER_LOG_LEVEL | Log level | Winston log level | | RELAYER_REDIS_URL | Url to redis instance | URL | | RPC_URL | Url to RPC node | URL | diff --git a/zp-relayer/config.ts b/zp-relayer/config.ts index 3362c2fc..19d5c04a 100644 --- a/zp-relayer/config.ts +++ b/zp-relayer/config.ts @@ -21,8 +21,10 @@ const config = { gasPriceFallback: process.env.GAS_PRICE_FALLBACK as string, gasPriceEstimationType: (process.env.GAS_PRICE_ESTIMATION_TYPE as EstimationType) || 'web3', gasPriceSpeedType: (process.env.GAS_PRICE_SPEED_TYPE as GasPriceKey) || 'fast', - gasPriceFactor: Number((process.env.GAS_PRICE_FACTOR as string) || '1'), + gasPriceFactor: parseInt(process.env.GAS_PRICE_FACTOR || '1'), gasPriceUpdateInterval: parseInt(process.env.GAS_PRICE_UPDATE_INTERVAL || '5000'), + startBlock: parseInt(process.env.START_BLOCK || '0'), + eventsProcessingBatchSize: parseInt(process.env.EVENTS_PROCESSING_BATCH_SIZE || '10000'), logLevel: process.env.RELAYER_LOG_LEVEL || 'debug', redisUrl: process.env.RELAYER_REDIS_URL, rpcUrl: process.env.RPC_URL as string, diff --git a/zp-relayer/pool.ts b/zp-relayer/pool.ts index 1e0bb7a4..077c9a55 100644 --- a/zp-relayer/pool.ts +++ b/zp-relayer/pool.ts @@ -7,7 +7,7 @@ import config from './config' import { web3 } from './services/web3' import { logger } from './services/appLogger' import { poolTxQueue } from './queue/poolTxQueue' -import { getEvents, getTransaction } from './utils/web3' +import { getBlockNumber, getEvents, getTransaction } from './utils/web3' import { Helpers, Params, Proof, SnarkProof, VK } from 'libzkbob-rs-node' import { validateTx } from './validateTx' import { PoolState } from './state' @@ -84,7 +84,7 @@ class Pool { if (this.isInitialized) return this.denominator = toBN(await this.PoolInstance.methods.denominator().call()) - await this.syncState() + await this.syncState(config.startBlock) this.isInitialized = true } @@ -108,8 +108,13 @@ class Pool { return job.id } - async syncState(fromBlock: number | string = 'earliest') { - logger.debug('Syncing state...') + async getLastBlockToProcess() { + const lastBlockNumber = await getBlockNumber(web3) + return lastBlockNumber + } + + async syncState(fromBlock: number) { + logger.debug('Syncing state; starting from block %d', fromBlock) const localIndex = this.state.getNextIndex() const localRoot = this.state.getMerkleRoot() @@ -131,53 +136,61 @@ class Pool { missedIndices[i] = localIndex + (i + 1) * OUTPLUSONE } - const events = await getEvents(this.PoolInstance, 'Message', { - fromBlock, - filter: { - index: missedIndices, - }, - }) - - if (events.length !== missedIndices.length) { - logger.error('Not all events found') - return - } + const lastBlockNumber = await this.getLastBlockToProcess() + let finishBlock = fromBlock + for (let startBlock = fromBlock; finishBlock <= lastBlockNumber; startBlock = finishBlock) { + finishBlock += config.eventsProcessingBatchSize + const events = await getEvents(this.PoolInstance, 'Message', { + fromBlock: startBlock, + toBlock: finishBlock, + filter: { + index: missedIndices, + }, + }) - for (let i = 0; i < events.length; i++) { - const { returnValues, transactionHash } = events[i] - const memoString: string = returnValues.message - if (!memoString) { - throw new Error('incorrect memo in event') - } + for (let i = 0; i < events.length; i++) { + const { returnValues, transactionHash } = events[i] + const memoString: string = returnValues.message + if (!memoString) { + throw new Error('incorrect memo in event') + } - const { input } = await getTransaction(web3, transactionHash) - const calldata = Buffer.from(truncateHexPrefix(input), 'hex') + const { input } = await getTransaction(web3, transactionHash) + const calldata = Buffer.from(truncateHexPrefix(input), 'hex') - const parser = new PoolCalldataParser(calldata) + const parser = new PoolCalldataParser(calldata) - const nullifier = parser.getField('nullifier') - await this.state.nullifiers.add([web3.utils.hexToNumberString(nullifier)]) + const nullifier = parser.getField('nullifier') + await this.state.nullifiers.add([web3.utils.hexToNumberString(nullifier)]) - const outCommitRaw = parser.getField('outCommit') - const outCommit = web3.utils.hexToNumberString(outCommitRaw) + const outCommitRaw = parser.getField('outCommit') + const outCommit = web3.utils.hexToNumberString(outCommitRaw) - const txTypeRaw = parser.getField('txType') - const txType = toTxType(txTypeRaw) + const txTypeRaw = parser.getField('txType') + const txType = toTxType(txTypeRaw) - const memoSize = web3.utils.hexToNumber(parser.getField('memoSize')) - const memoRaw = truncateHexPrefix(parser.getField('memo', memoSize)) + const memoSize = web3.utils.hexToNumber(parser.getField('memoSize')) + const memoRaw = truncateHexPrefix(parser.getField('memo', memoSize)) - const truncatedMemo = truncateMemoTxPrefix(memoRaw, txType) - const commitAndMemo = numToHex(toBN(outCommit)).concat(transactionHash.slice(2)).concat(truncatedMemo) + const truncatedMemo = truncateMemoTxPrefix(memoRaw, txType) + const commitAndMemo = numToHex(toBN(outCommit)).concat(transactionHash.slice(2)).concat(truncatedMemo) - const index = Number(returnValues.index) - OUTPLUSONE - for (let state of [this.state, this.optimisticState]) { - state.addCommitment(Math.floor(index / OUTPLUSONE), Helpers.strToNum(outCommit)) - state.addTx(index, Buffer.from(commitAndMemo, 'hex')) + const index = Number(returnValues.index) - OUTPLUSONE + for (let state of [this.state, this.optimisticState]) { + state.addCommitment(Math.floor(index / OUTPLUSONE), Helpers.strToNum(outCommit)) + state.addTx(index, Buffer.from(commitAndMemo, 'hex')) + } + console.log(returnValues.index, index) + console.log(this.state.getMerkleRoot()) + console.log(await this.PoolInstance.methods.roots(index).call()) } } - logger.debug(`LOCAL ROOT AFTER UPDATE ${this.state.getMerkleRoot()}`) + const newLocalRoot = this.state.getMerkleRoot() + logger.debug(`LOCAL ROOT AFTER UPDATE ${localRoot}`) + if (newLocalRoot !== contractRoot) { + logger.error('State is corrupted, roots mismatch') + } } verifyProof(proof: SnarkProof, inputs: Array) { diff --git a/zp-relayer/utils/helpers.ts b/zp-relayer/utils/helpers.ts index 8b03427b..23cb5efd 100644 --- a/zp-relayer/utils/helpers.ts +++ b/zp-relayer/utils/helpers.ts @@ -99,7 +99,6 @@ export async function setIntervalAndRun(f: () => Promise | void, interval: export async function withMutex(mutex: Mutex, f: () => Promise): Promise { const release = await mutex.acquire() - logger.info('ACQUIRED MUTEX') try { const res = await f() return res @@ -107,6 +106,5 @@ export async function withMutex(mutex: Mutex, f: () => Promise): Promise