Skip to content

Commit

Permalink
Batch event queries
Browse files Browse the repository at this point in the history
  • Loading branch information
Leonid committed Sep 14, 2022
1 parent 79f4a65 commit d9ed785
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 42 deletions.
2 changes: 2 additions & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
| GAS_PRICE_SPEED_TYPE | This parameter specifies the desirable transaction speed | `instant` / `fast` / `standard` / `low` |
| GAS_PRICE_FACTOR | A value that will multiply the gas price of the oracle to convert it to gwei. If the oracle API returns gas prices in gwei then this can be set to `1`. Also, it could be used to intentionally pay more gas than suggested by the oracle to guarantee the transaction verification. E.g. `1.25` or `1.5`. | integer |
| GAS_PRICE_UPDATE_INTERVAL | Interval in milliseconds used to get the updated gas price value using specified estimation type | integer |
| START_BLOCK | The block number used to start searching for events when the relayer instance is run for the first time | integer
| EVENTS_PROCESSING_BATCH_SIZE | Batch size for one `eth_getLogs` request when reprocessing old logs. Defaults to `10000` | integer
| RELAYER_LOG_LEVEL | Log level | Winston log level |
| RELAYER_REDIS_URL | Url to redis instance | URL |
| RPC_URL | Url to RPC node | URL |
Expand Down
4 changes: 3 additions & 1 deletion zp-relayer/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ const config = {
gasPriceFallback: process.env.GAS_PRICE_FALLBACK as string,
gasPriceEstimationType: (process.env.GAS_PRICE_ESTIMATION_TYPE as EstimationType) || 'web3',
gasPriceSpeedType: (process.env.GAS_PRICE_SPEED_TYPE as GasPriceKey) || 'fast',
gasPriceFactor: Number((process.env.GAS_PRICE_FACTOR as string) || '1'),
gasPriceFactor: parseInt(process.env.GAS_PRICE_FACTOR || '1'),
gasPriceUpdateInterval: parseInt(process.env.GAS_PRICE_UPDATE_INTERVAL || '5000'),
startBlock: parseInt(process.env.START_BLOCK || '0'),
eventsProcessingBatchSize: parseInt(process.env.EVENTS_PROCESSING_BATCH_SIZE || '10000'),
logLevel: process.env.RELAYER_LOG_LEVEL || 'debug',
redisUrl: process.env.RELAYER_REDIS_URL,
rpcUrl: process.env.RPC_URL as string,
Expand Down
91 changes: 52 additions & 39 deletions zp-relayer/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import config from './config'
import { web3 } from './services/web3'
import { logger } from './services/appLogger'
import { poolTxQueue } from './queue/poolTxQueue'
import { getEvents, getTransaction } from './utils/web3'
import { getBlockNumber, getEvents, getTransaction } from './utils/web3'
import { Helpers, Params, Proof, SnarkProof, VK } from 'libzkbob-rs-node'
import { validateTx } from './validateTx'
import { PoolState } from './state'
Expand Down Expand Up @@ -84,7 +84,7 @@ class Pool {
if (this.isInitialized) return

this.denominator = toBN(await this.PoolInstance.methods.denominator().call())
await this.syncState()
await this.syncState(config.startBlock)
this.isInitialized = true
}

Expand All @@ -108,8 +108,13 @@ class Pool {
return job.id
}

async syncState(fromBlock: number | string = 'earliest') {
logger.debug('Syncing state...')
async getLastBlockToProcess() {
const lastBlockNumber = await getBlockNumber(web3)
return lastBlockNumber
}

async syncState(fromBlock: number) {
logger.debug('Syncing state; starting from block %d', fromBlock)

const localIndex = this.state.getNextIndex()
const localRoot = this.state.getMerkleRoot()
Expand All @@ -131,53 +136,61 @@ class Pool {
missedIndices[i] = localIndex + (i + 1) * OUTPLUSONE
}

const events = await getEvents(this.PoolInstance, 'Message', {
fromBlock,
filter: {
index: missedIndices,
},
})

if (events.length !== missedIndices.length) {
logger.error('Not all events found')
return
}
const lastBlockNumber = await this.getLastBlockToProcess()
let finishBlock = fromBlock
for (let startBlock = fromBlock; finishBlock <= lastBlockNumber; startBlock = finishBlock) {
finishBlock += config.eventsProcessingBatchSize
const events = await getEvents(this.PoolInstance, 'Message', {
fromBlock: startBlock,
toBlock: finishBlock,
filter: {
index: missedIndices,
},
})

for (let i = 0; i < events.length; i++) {
const { returnValues, transactionHash } = events[i]
const memoString: string = returnValues.message
if (!memoString) {
throw new Error('incorrect memo in event')
}
for (let i = 0; i < events.length; i++) {
const { returnValues, transactionHash } = events[i]
const memoString: string = returnValues.message
if (!memoString) {
throw new Error('incorrect memo in event')
}

const { input } = await getTransaction(web3, transactionHash)
const calldata = Buffer.from(truncateHexPrefix(input), 'hex')
const { input } = await getTransaction(web3, transactionHash)
const calldata = Buffer.from(truncateHexPrefix(input), 'hex')

const parser = new PoolCalldataParser(calldata)
const parser = new PoolCalldataParser(calldata)

const nullifier = parser.getField('nullifier')
await this.state.nullifiers.add([web3.utils.hexToNumberString(nullifier)])
const nullifier = parser.getField('nullifier')
await this.state.nullifiers.add([web3.utils.hexToNumberString(nullifier)])

const outCommitRaw = parser.getField('outCommit')
const outCommit = web3.utils.hexToNumberString(outCommitRaw)
const outCommitRaw = parser.getField('outCommit')
const outCommit = web3.utils.hexToNumberString(outCommitRaw)

const txTypeRaw = parser.getField('txType')
const txType = toTxType(txTypeRaw)
const txTypeRaw = parser.getField('txType')
const txType = toTxType(txTypeRaw)

const memoSize = web3.utils.hexToNumber(parser.getField('memoSize'))
const memoRaw = truncateHexPrefix(parser.getField('memo', memoSize))
const memoSize = web3.utils.hexToNumber(parser.getField('memoSize'))
const memoRaw = truncateHexPrefix(parser.getField('memo', memoSize))

const truncatedMemo = truncateMemoTxPrefix(memoRaw, txType)
const commitAndMemo = numToHex(toBN(outCommit)).concat(transactionHash.slice(2)).concat(truncatedMemo)
const truncatedMemo = truncateMemoTxPrefix(memoRaw, txType)
const commitAndMemo = numToHex(toBN(outCommit)).concat(transactionHash.slice(2)).concat(truncatedMemo)

const index = Number(returnValues.index) - OUTPLUSONE
for (let state of [this.state, this.optimisticState]) {
state.addCommitment(Math.floor(index / OUTPLUSONE), Helpers.strToNum(outCommit))
state.addTx(index, Buffer.from(commitAndMemo, 'hex'))
const index = Number(returnValues.index) - OUTPLUSONE
for (let state of [this.state, this.optimisticState]) {
state.addCommitment(Math.floor(index / OUTPLUSONE), Helpers.strToNum(outCommit))
state.addTx(index, Buffer.from(commitAndMemo, 'hex'))
}
console.log(returnValues.index, index)
console.log(this.state.getMerkleRoot())
console.log(await this.PoolInstance.methods.roots(index).call())
}
}

logger.debug(`LOCAL ROOT AFTER UPDATE ${this.state.getMerkleRoot()}`)
const newLocalRoot = this.state.getMerkleRoot()
logger.debug(`LOCAL ROOT AFTER UPDATE ${localRoot}`)
if (newLocalRoot !== contractRoot) {
logger.error('State is corrupted, roots mismatch')
}
}

verifyProof(proof: SnarkProof, inputs: Array<string>) {
Expand Down
2 changes: 0 additions & 2 deletions zp-relayer/utils/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,12 @@ export async function setIntervalAndRun(f: () => Promise<void> | void, interval:

export async function withMutex<R>(mutex: Mutex, f: () => Promise<R>): Promise<R> {
const release = await mutex.acquire()
logger.info('ACQUIRED MUTEX')
try {
const res = await f()
return res
} catch (e) {
throw e
} finally {
release()
logger.info('RELEASED MUTEX')
}
}
12 changes: 12 additions & 0 deletions zp-relayer/utils/web3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,15 @@ export async function getChainId(web3: Web3) {
throw new Error('Chain Id cannot be obtained')
}
}

export async function getBlockNumber(web3: Web3) {
try {
logger.debug('Getting block number')
const blockNumber = await web3.eth.getBlockNumber()
logger.debug('Block number obtained %d', blockNumber)
return blockNumber
} catch (e) {
if (e instanceof Error) logger.error(e.message)
throw new Error(`Block Number cannot be obtained`)
}
}

0 comments on commit d9ed785

Please sign in to comment.