Skip to content

Commit

Permalink
Refactor watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
lok52 committed Sep 21, 2023
1 parent e603e42 commit d56036a
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 102 deletions.
35 changes: 0 additions & 35 deletions zp-relayer/direct-deposit/utils.ts

This file was deleted.

115 changes: 48 additions & 67 deletions zp-relayer/direct-deposit/watcher.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
// Reference implementation:
// https://github.com/omni/tokenbridge/blob/master/oracle/src/watcher.js
import type Web3 from 'web3'
import type { AbiItem } from 'web3-utils'
import type { DirectDeposit } from '@/queue/poolTxQueue'
import { web3 } from '@/services/web3'
Expand All @@ -9,81 +6,65 @@ import DirectDepositQueueAbi from '@/abi/direct-deposit-queue-abi.json'
import config from '@/configs/watcherConfig'
import { logger } from '@/services/appLogger'
import { redis } from '@/services/redisClient'
import { lastProcessedBlock, getLastProcessedBlock, updateLastProcessedBlock, parseDirectDepositEvent } from './utils'
import { BatchCache } from './BatchCache'
import { validateDirectDeposit } from '@/validation/tx/validateDirectDeposit'
import { getBlockNumber, getEvents } from '@/utils/web3'
import { directDepositQueue } from '@/queue/directDepositQueue'
import { EventWatcher } from '../services/EventWatcher'

const PoolInstance = new web3.eth.Contract(PoolAbi as AbiItem[], config.poolAddress)
const DirectDepositQueueInstance = new web3.eth.Contract(DirectDepositQueueAbi as AbiItem[])

const eventName = 'SubmitDirectDeposit'
export function parseDirectDepositEvent(o: Record<string, any>): DirectDeposit {
const dd: DirectDeposit = {
sender: o.sender,
nonce: o.nonce,
fallbackUser: o.fallbackUser,
zkAddress: {
diversifier: o.zkAddress.diversifier,
pk: o.zkAddress.pk,
},
deposit: o.deposit,
}

const batch = new BatchCache<DirectDeposit>(
config.directDepositBatchSize,
config.directDepositBatchTtl,
ds => {
logger.info('Adding direct-deposit events to queue', { count: ds.length })
directDepositQueue.add('', ds)
},
dd => validateDirectDeposit(dd, DirectDepositQueueInstance),
redis
)
return dd
}

async function init() {
await getLastProcessedBlock()
await batch.init()
const PoolInstance = new web3.eth.Contract(PoolAbi as AbiItem[], config.poolAddress)
const queueAddress = await PoolInstance.methods.direct_deposit_queue().call()
DirectDepositQueueInstance.options.address = queueAddress
runWatcher()
}
const DirectDepositQueueInstance = new web3.eth.Contract(DirectDepositQueueAbi as AbiItem[], queueAddress)

async function getLastBlockToProcess(web3: Web3) {
const lastBlockNumber = await getBlockNumber(web3)
return lastBlockNumber - config.blockConfirmations
}

async function watch() {
const lastBlockToProcess = await getLastBlockToProcess(web3)

if (lastBlockToProcess <= lastProcessedBlock) {
logger.debug('All blocks already processed')
return
}
const batch = new BatchCache<DirectDeposit>(
config.directDepositBatchSize,
config.directDepositBatchTtl,
ds => {
logger.info('Adding direct-deposit events to queue', { count: ds.length })
directDepositQueue.add('', ds)
},
dd => validateDirectDeposit(dd, DirectDepositQueueInstance),
redis
)
await batch.init()

const fromBlock = lastProcessedBlock + 1
const rangeEndBlock = fromBlock + config.eventsProcessingBatchSize
let toBlock = Math.min(lastBlockToProcess, rangeEndBlock)
const watcher = new EventWatcher({
name: 'direct-deposit',
startBlock: config.startBlock,
blockConfirmations: config.blockConfirmations,
eventName: 'SubmitDirectDeposit',
eventPollingInterval: config.eventPollingInterval,
eventsProcessingBatchSize: config.eventsProcessingBatchSize,
redis,
web3,
contract: DirectDepositQueueInstance,
callback: async events => {
const directDeposits: [string, DirectDeposit][] = []
for (let event of events) {
const dd = parseDirectDepositEvent(event.returnValues)
directDeposits.push([dd.nonce, dd])
}

let events = await getEvents(DirectDepositQueueInstance, eventName, {
fromBlock,
toBlock,
await batch.add(directDeposits)
},
})
logger.info(`Found ${events.length} direct-deposit events`)

const directDeposits: [string, DirectDeposit][] = []
for (let event of events) {
const dd = parseDirectDepositEvent(event.returnValues)
directDeposits.push([dd.nonce, dd])
}

await batch.add(directDeposits)

logger.debug('Updating last processed block', { lastProcessedBlock: toBlock.toString() })
await updateLastProcessedBlock(toBlock)
}

async function runWatcher() {
try {
await watch()
} catch (e) {
logger.error(e)
}

setTimeout(() => {
runWatcher()
}, config.eventPollingInterval)
await watcher.init()
return watcher
}

init()
init().then(w => w.run())
86 changes: 86 additions & 0 deletions zp-relayer/services/EventWatcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Reference implementation:
// https://github.com/omni/tokenbridge/blob/master/oracle/src/watcher.js
import { logger } from '@/services/appLogger'
import { getBlockNumber, getEvents } from '@/utils/web3'
import type { Redis } from 'ioredis'
import type Web3 from 'web3'
import type { Contract, EventData } from 'web3-eth-contract'

interface IWatcherConfig {
name: string
startBlock: number
blockConfirmations: number
eventName: string
eventPollingInterval: number
eventsProcessingBatchSize: number
redis: Redis
web3: Web3
contract: Contract
callback: (events: EventData[]) => Promise<void>
}

export class EventWatcher {
lastProcessedBlock: number
lastBlockRedisKey: string

constructor(private config: IWatcherConfig) {
this.lastBlockRedisKey = `${config.name}:lastProcessedBlock`
this.lastProcessedBlock = Math.max(config.startBlock - 1, 0)
}

async init() {
await this.getLastProcessedBlock()
}

private async getLastProcessedBlock() {
const result = await this.config.redis.get(this.lastBlockRedisKey)
logger.debug('Last Processed block obtained', { fromRedis: result, fromConfig: this.lastProcessedBlock })
this.lastProcessedBlock = result ? parseInt(result, 10) : this.lastProcessedBlock
}

private updateLastProcessedBlock(lastBlockNumber: number) {
this.lastProcessedBlock = lastBlockNumber
return this.config.redis.set(this.lastBlockRedisKey, this.lastProcessedBlock)
}

private async getLastBlockToProcess(web3: Web3) {
const lastBlockNumber = await getBlockNumber(web3)
return lastBlockNumber - this.config.blockConfirmations
}

private async watch() {
const lastBlockToProcess = await this.getLastBlockToProcess(this.config.web3)

if (lastBlockToProcess <= this.lastProcessedBlock) {
logger.debug('All blocks already processed')
return
}

const fromBlock = this.lastProcessedBlock + 1
const rangeEndBlock = fromBlock + this.config.eventsProcessingBatchSize
let toBlock = Math.min(lastBlockToProcess, rangeEndBlock)

let events = await getEvents(this.config.contract, this.config.eventName, {
fromBlock,
toBlock,
})
logger.info(`Found ${events.length} events`)

await this.config.callback(events)

logger.debug('Updating last processed block', { lastProcessedBlock: toBlock.toString() })
await this.updateLastProcessedBlock(toBlock)
}

async run() {
try {
await this.watch()
} catch (e) {
logger.error(e)
}

setTimeout(() => {
this.run()
}, this.config.eventPollingInterval)
}
}

0 comments on commit d56036a

Please sign in to comment.