diff --git a/.gitignore b/.gitignore index 3b1d9d7..0b76cfd 100644 --- a/.gitignore +++ b/.gitignore @@ -16,4 +16,4 @@ prover.js # Log file zp.log yarn-error.log -zp-relayer/state/ \ No newline at end of file +zp-relayer/POOL_STATE \ No newline at end of file diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 3e2bdb9..ade5c26 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -13,15 +13,18 @@ | TREE_UPDATE_PARAMS_PATH | Local path to tree update circuit parameters | string | | TRANSFER_PARAMS_PATH | Local path to transfer circuit parameters | string | | TX_VK_PATH | Local path to transaction curcuit verification key | string | -| STATE_DIR_PATH | Path to persistent state files related to tree and transactions storage. Default: `./state` | string | +| STATE_DIR_PATH | Path to persistent state files related to tree and transactions storage. Default: `./POOL_STATE` | string | | GAS_PRICE_FALLBACK | Default fallback gas price | integer | | GAS_PRICE_ESTIMATION_TYPE | Gas price estimation type | `web3` / `gas-price-oracle` / `eip1559-gas-estimation` / `polygon-gasstation-v2` | | GAS_PRICE_SPEED_TYPE | This parameter specifies the desirable transaction speed | `instant` / `fast` / `standard` / `low` | | GAS_PRICE_FACTOR | A value that will multiply the gas price of the oracle to convert it to gwei. If the oracle API returns gas prices in gwei then this can be set to `1`. Also, it could be used to intentionally pay more gas than suggested by the oracle to guarantee the transaction verification. E.g. `1.25` or `1.5`. | integer | | GAS_PRICE_UPDATE_INTERVAL | Interval in milliseconds used to get the updated gas price value using specified estimation type | integer | +| MAX_FEE_PER_GAS_LIMIT | Max limit on `maxFeePerGas` parameter for each transaction in wei | integer | | START_BLOCK | The block number used to start searching for events when the relayer instance is run for the first time | integer | EVENTS_PROCESSING_BATCH_SIZE | Batch size for one `eth_getLogs` request when reprocessing old logs. Defaults to `10000` | integer | RELAYER_LOG_LEVEL | Log level | Winston log level | | RELAYER_REDIS_URL | Url to redis instance | URL | | RPC_URL | Url to RPC node | URL | -| SENT_TX_DELAY | Delay in milliseconds for sentTxWorker to verify submitted transactions | integer +| SENT_TX_DELAY | Delay in milliseconds for sentTxWorker to verify submitted transactions | integer | +| PERMIT_DEADLINE_THRESHOLD_INITIAL | Minimum time threshold in seconds for permit signature deadline to be valid (before initial transaction submition) | integer | +| PERMIT_DEADLINE_THRESHOLD_RESEND | Minimum time threshold in seconds for permit signature deadline to be valid (for re-send attempts) | integer | diff --git a/yarn.lock b/yarn.lock index f5945d8..ef60261 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6765,6 +6765,3 @@ yocto-queue@^0.1.0: dependencies: borsh "^0.5.0" buffer "^6.0.3" - stream-http "^3.2.0" - web3 "1.7.4" - webpack "^5.46.0" diff --git a/zp-memo-parser/memo.ts b/zp-memo-parser/memo.ts index 935cee6..70174a2 100644 --- a/zp-memo-parser/memo.ts +++ b/zp-memo-parser/memo.ts @@ -1,7 +1,5 @@ -import BN from 'bn.js' import { Buffer } from 'buffer' import { deserialize, BinaryReader } from 'borsh' -import { toBN } from 'web3-utils' type Option = T | null @@ -13,16 +11,16 @@ export enum TxType { } interface DefaultTxData { - fee: BN + fee: string } export interface WithdrawTxData extends DefaultTxData { - nativeAmount: BN + nativeAmount: string reciever: Uint8Array } export interface PermittableDepositTxData extends DefaultTxData { - deadline: BN + deadline: string holder: Uint8Array } @@ -97,7 +95,7 @@ function getNoteHashes(rawHashes: Buffer, num: number, maxNotes: number): Uint8A export function getTxData(data: Buffer, txType: Option): TxData { function readU64(offset: number) { let uint = data.readBigUInt64BE(offset) - return toBN(uint.toString()) + return uint.toString(10) } let offset = 0 const fee = readU64(offset) diff --git a/zp-memo-parser/package.json b/zp-memo-parser/package.json index f458f14..50aa362 100644 --- a/zp-memo-parser/package.json +++ b/zp-memo-parser/package.json @@ -12,9 +12,6 @@ }, "dependencies": { "borsh": "^0.5.0", - "buffer": "^6.0.3", - "stream-http": "^3.2.0", - "webpack": "^5.46.0", - "web3": "1.7.4" + "buffer": "^6.0.3" } } diff --git a/zp-relayer/clear.sh b/zp-relayer/clear.sh index fbff0a5..b10c49e 100755 --- a/zp-relayer/clear.sh +++ b/zp-relayer/clear.sh @@ -1 +1 @@ -rm -rf ./state \ No newline at end of file +rm -rf ./POOL_STATE \ No newline at end of file diff --git a/zp-relayer/config.ts b/zp-relayer/config.ts index 7412cdb..27554bc 100644 --- a/zp-relayer/config.ts +++ b/zp-relayer/config.ts @@ -19,18 +19,21 @@ const config = { treeUpdateParamsPath: process.env.TREE_UPDATE_PARAMS_PATH || './params/tree_params.bin', transferParamsPath: process.env.TRANSFER_PARAMS_PATH || './params/transfer_params.bin', txVKPath: process.env.TX_VK_PATH || './params/transfer_verification_key.json', - stateDirPath: process.env.STATE_DIR_PATH || './state', + stateDirPath: process.env.STATE_DIR_PATH || './POOL_STATE', gasPriceFallback: process.env.GAS_PRICE_FALLBACK as string, gasPriceEstimationType: (process.env.GAS_PRICE_ESTIMATION_TYPE as EstimationType) || 'web3', gasPriceSpeedType: (process.env.GAS_PRICE_SPEED_TYPE as GasPriceKey) || 'fast', gasPriceFactor: parseInt(process.env.GAS_PRICE_FACTOR || '1'), gasPriceUpdateInterval: parseInt(process.env.GAS_PRICE_UPDATE_INTERVAL || '5000'), + maxFeeLimit: process.env.MAX_FEE_PER_GAS_LIMIT ? toBN(process.env.MAX_FEE_PER_GAS_LIMIT) : null, startBlock: parseInt(process.env.START_BLOCK || '0'), eventsProcessingBatchSize: parseInt(process.env.EVENTS_PROCESSING_BATCH_SIZE || '10000'), logLevel: process.env.RELAYER_LOG_LEVEL || 'debug', redisUrl: process.env.RELAYER_REDIS_URL, rpcUrl: process.env.RPC_URL as string, sentTxDelay: parseInt(process.env.SENT_TX_DELAY || '30000'), + permitDeadlineThresholdInitial: parseInt(process.env.PERMIT_DEADLINE_THRESHOLD_INITIAL || '300'), + permitDeadlineThresholdResend: parseInt(process.env.PERMIT_DEADLINE_THRESHOLD_RESEND || '10'), } export default config diff --git a/zp-relayer/init.ts b/zp-relayer/init.ts index 0381d61..f5a35cb 100644 --- a/zp-relayer/init.ts +++ b/zp-relayer/init.ts @@ -15,6 +15,7 @@ export async function init() { const gasPriceService = new GasPrice(web3, config.gasPriceUpdateInterval, config.gasPriceEstimationType, { speedType: config.gasPriceSpeedType, factor: config.gasPriceFactor, + maxFeeLimit: config.maxFeeLimit, }) await gasPriceService.start() const workerMutex = new Mutex() diff --git a/zp-relayer/nullifierSet.ts b/zp-relayer/nullifierSet.ts deleted file mode 100644 index 468a388..0000000 --- a/zp-relayer/nullifierSet.ts +++ /dev/null @@ -1,23 +0,0 @@ -import { redis } from './services/redisClient' - -export class NullifierSet { - constructor(public name: string) {} - - async add(nullifiers: string[]) { - if (nullifiers.length === 0) return - await redis.sadd(this.name, nullifiers) - } - - async remove(nullifiers: string[]) { - if (nullifiers.length === 0) return - await redis.srem(this.name, nullifiers) - } - - async isInSet(nullifier: string) { - return await redis.sismember(this.name, nullifier) - } - - async clear() { - await redis.del(this.name) - } -} diff --git a/zp-relayer/package.json b/zp-relayer/package.json index c885c83..189baaa 100644 --- a/zp-relayer/package.json +++ b/zp-relayer/package.json @@ -10,7 +10,7 @@ "dev:worker": "ts-node poolTxWorker.ts", "start:dev": "ts-node index.ts", "start:prod": "node index.js", - "test": "ts-mocha --timeout 1000000 test/**/*.test.ts" + "test": "ts-mocha --paths --timeout 1000000 test/**/*.test.ts" }, "dependencies": { "@metamask/eth-sig-util": "^4.0.1", diff --git a/zp-relayer/pool.ts b/zp-relayer/pool.ts index 5e9894b..f246dc6 100644 --- a/zp-relayer/pool.ts +++ b/zp-relayer/pool.ts @@ -8,15 +8,16 @@ import { Contract } from 'web3-eth-contract' import config from './config' import { web3 } from './services/web3' import { logger } from './services/appLogger' +import { redis } from './services/redisClient' import { poolTxQueue } from './queue/poolTxQueue' import { getBlockNumber, getEvents, getTransaction } from './utils/web3' import { Helpers, Params, Proof, SnarkProof, VK } from 'libzkbob-rs-node' -import { PoolState } from './state' +import { PoolState } from './state/PoolState' import { TxType } from 'zp-memo-parser' import { numToHex, toTxType, truncateHexPrefix, truncateMemoTxPrefix } from './utils/helpers' import { PoolCalldataParser } from './utils/PoolCalldataParser' -import { OUTPLUSONE } from './utils/constants' +import { INIT_ROOT, OUTPLUSONE } from './utils/constants' export interface PoolTx { proof: Proof @@ -84,8 +85,8 @@ class Pool { const txVK = require(config.txVKPath) this.txVK = txVK - this.state = new PoolState('pool', config.stateDirPath) - this.optimisticState = new PoolState('optimistic', config.stateDirPath) + this.state = new PoolState('pool', redis, config.stateDirPath) + this.optimisticState = new PoolState('optimistic', redis, config.stateDirPath) } private static getHash(path: string) { @@ -136,11 +137,19 @@ class Pool { logger.debug(`LOCAL ROOT: ${localRoot}; LOCAL INDEX: ${localIndex}`) logger.debug(`CONTRACT ROOT: ${contractRoot}; CONTRACT INDEX: ${contractIndex}`) - if (contractRoot === localRoot && contractIndex === localIndex) { + const rootSetRoot = await this.state.roots.get(localIndex.toString(10)) + logger.debug(`ROOT FROM ROOTSET: ${rootSetRoot}`) + + if (contractRoot === localRoot && rootSetRoot === localRoot && contractIndex === localIndex) { logger.info('State is ok, no need to resync') return } + // Set initial root + await this.state.roots.add({ + 0: INIT_ROOT, + }) + const numTxs = Math.floor((contractIndex - localIndex) / OUTPLUSONE) const missedIndices = Array(numTxs) for (let i = 0; i < numTxs; i++) { @@ -171,9 +180,6 @@ class Pool { const parser = new PoolCalldataParser(calldata) - const nullifier = parser.getField('nullifier') - await this.state.nullifiers.add([web3.utils.hexToNumberString(nullifier)]) - const outCommitRaw = parser.getField('outCommit') const outCommit = web3.utils.hexToNumberString(outCommitRaw) @@ -186,16 +192,29 @@ class Pool { const truncatedMemo = truncateMemoTxPrefix(memoRaw, txType) const commitAndMemo = numToHex(toBN(outCommit)).concat(transactionHash.slice(2)).concat(truncatedMemo) - const index = Number(returnValues.index) - OUTPLUSONE + const newPoolIndex = Number(returnValues.index) + const prevPoolIndex = newPoolIndex - OUTPLUSONE + const prevCommitIndex = Math.floor(Number(prevPoolIndex) / OUTPLUSONE) + for (let state of [this.state, this.optimisticState]) { - state.addCommitment(Math.floor(index / OUTPLUSONE), Helpers.strToNum(outCommit)) - state.addTx(index, Buffer.from(commitAndMemo, 'hex')) + state.addCommitment(prevCommitIndex, Helpers.strToNum(outCommit)) + state.addTx(prevPoolIndex, Buffer.from(commitAndMemo, 'hex')) } + + // Save nullifier in confirmed state + const nullifier = parser.getField('nullifier') + await this.state.nullifiers.add([web3.utils.hexToNumberString(nullifier)]) + + // Save root in confirmed state + const root = this.state.getMerkleRoot() + await this.state.roots.add({ + [newPoolIndex]: root, + }) } } const newLocalRoot = this.state.getMerkleRoot() - logger.debug(`LOCAL ROOT AFTER UPDATE ${localRoot}`) + logger.debug(`LOCAL ROOT AFTER UPDATE ${newLocalRoot}`) if (newLocalRoot !== contractRoot) { logger.error('State is corrupted, roots mismatch') } diff --git a/zp-relayer/queue/sentTxQueue.ts b/zp-relayer/queue/sentTxQueue.ts index 7120bcd..df3fdec 100644 --- a/zp-relayer/queue/sentTxQueue.ts +++ b/zp-relayer/queue/sentTxQueue.ts @@ -1,24 +1,37 @@ import { Queue, QueueScheduler } from 'bullmq' import { redis } from '@/services/redisClient' import { SENT_TX_QUEUE_NAME } from '@/utils/constants' -import { TxPayload } from './poolTxQueue' import type { TransactionConfig } from 'web3-core' +import { GasPriceValue } from '@/services/gas-price' +import { TxData, TxType } from 'zp-memo-parser' export interface SentTxPayload { - payload: TxPayload + txType: TxType + root: string outCommit: string commitIndex: number txHash: string - txData: string + prefixedMemo: string txConfig: TransactionConfig nullifier: string + gasPriceOptions: GasPriceValue + txData: TxData } +export enum SentTxState { + MINED = 'MINED', + REVERT = 'REVERT', + RESEND = 'RESEND', + FAILED = 'FAILED', +} + +export type SentTxResult = [SentTxState, string] + // Required for delayed jobs processing const sentTxQueueScheduler = new QueueScheduler(SENT_TX_QUEUE_NAME, { connection: redis, }) -export const sentTxQueue = new Queue(SENT_TX_QUEUE_NAME, { +export const sentTxQueue = new Queue(SENT_TX_QUEUE_NAME, { connection: redis, }) diff --git a/zp-relayer/services/gas-price/GasPrice.ts b/zp-relayer/services/gas-price/GasPrice.ts index b116de2..509c442 100644 --- a/zp-relayer/services/gas-price/GasPrice.ts +++ b/zp-relayer/services/gas-price/GasPrice.ts @@ -1,5 +1,6 @@ +import BN from 'bn.js' import type Web3 from 'web3' -import { toWei } from 'web3-utils' +import { toWei, toBN } from 'web3-utils' import config from '@/config' import { setIntervalAndRun } from '@/utils/helpers' import { estimateFees } from '@mycrypto/gas-estimation' @@ -17,6 +18,8 @@ import { PolygonGSV2Response, PolygonGSV2GasPriceKey, GasPriceKey, + LegacyGasPrice, + EIP1559GasPrice, } from './types' const polygonGasPriceKeyMapping: Record = { @@ -26,6 +29,43 @@ const polygonGasPriceKeyMapping: Record = { instant: 'fast', } +function isLegacyGasPrice(gp: GasPriceValue): gp is LegacyGasPrice { + return 'gasPrice' in gp +} + +function isEIP1559GasPrice(gp: GasPriceValue): gp is EIP1559GasPrice { + return 'maxFeePerGas' in gp && 'maxPriorityFeePerGas' in gp +} + +export function chooseGasPriceOptions(a: GasPriceValue, b: GasPriceValue): GasPriceValue { + if (isLegacyGasPrice(a) && isLegacyGasPrice(b)) { + return { gasPrice: BN.max(toBN(a.gasPrice), toBN(b.gasPrice)).toString(10) } + } + if (isEIP1559GasPrice(a) && isEIP1559GasPrice(b)) { + return { + maxFeePerGas: BN.max(toBN(a.maxFeePerGas), toBN(b.maxFeePerGas)).toString(10), + maxPriorityFeePerGas: BN.max(toBN(a.maxPriorityFeePerGas), toBN(b.maxPriorityFeePerGas)).toString(10), + } + } + return b +} + +export function EIP1559GasPriceWithinLimit(fees: EIP1559GasPrice, maxFeeLimit: BN | null): EIP1559GasPrice { + if (!maxFeeLimit) return fees + + const diff = toBN(fees.maxFeePerGas).sub(maxFeeLimit) + if (diff.isNeg()) { + return fees + } else { + const maxFeePerGas = maxFeeLimit.toString(10) + const maxPriorityFeePerGas = BN.min(toBN(fees.maxPriorityFeePerGas), maxFeeLimit).toString(10) + return { + maxFeePerGas, + maxPriorityFeePerGas, + } + } +} + export class GasPrice { private fetchGasPriceInterval: NodeJS.Timeout | null = null private cachedGasPrice: GasPriceValue @@ -99,10 +139,16 @@ export class GasPrice { const json: PolygonGSV2Response = await response.json() const speedType = polygonGasPriceKeyMapping[options.speedType] const { maxFee, maxPriorityFee } = json[speedType] - return { - maxFeePerGas: GasPrice.normalizeGasPrice(maxFee), - maxPriorityFeePerGas: GasPrice.normalizeGasPrice(maxPriorityFee), - } + + const gasPriceOptions = EIP1559GasPriceWithinLimit( + { + maxFeePerGas: GasPrice.normalizeGasPrice(maxFee), + maxPriorityFeePerGas: GasPrice.normalizeGasPrice(maxPriorityFee), + }, + options.maxFeeLimit + ) + + return gasPriceOptions } static normalizeGasPrice(rawGasPrice: number, factor = 1) { diff --git a/zp-relayer/services/gas-price/types.ts b/zp-relayer/services/gas-price/types.ts index 2d19061..834fc3e 100644 --- a/zp-relayer/services/gas-price/types.ts +++ b/zp-relayer/services/gas-price/types.ts @@ -1,8 +1,10 @@ +import type BN from 'bn.js' + // GasPrice fields -interface LegacyGasPrice { +export interface LegacyGasPrice { gasPrice: string } -interface EIP1559GasPrice { +export interface EIP1559GasPrice { maxFeePerGas: string maxPriorityFeePerGas: string } @@ -38,7 +40,7 @@ export type EstimationPolygonGSV2 = 'polygon-gasstation-v2' export type EstimationType = EstimationEIP1559 | EstimationOracle | EstimationWeb3 | EstimationPolygonGSV2 export type EstimationOracleOptions = { speedType: GasPriceKey; factor: number } -export type EstimationPolygonGSV2Options = { speedType: GasPriceKey } +export type EstimationPolygonGSV2Options = { speedType: GasPriceKey; maxFeeLimit: BN | null } export type EstimationOptions = ET extends EstimationOracle ? EstimationOracleOptions : ET extends EstimationPolygonGSV2 diff --git a/zp-relayer/state.ts b/zp-relayer/state/PoolState.ts similarity index 90% rename from zp-relayer/state.ts rename to zp-relayer/state/PoolState.ts index 687e6f6..66b1aa6 100644 --- a/zp-relayer/state.ts +++ b/zp-relayer/state/PoolState.ts @@ -1,17 +1,21 @@ -import { logger } from './services/appLogger' -import { OUTPLUSONE } from './utils/constants' +import type { Redis } from 'ioredis' +import { logger } from '@/services/appLogger' +import { OUTPLUSONE } from '@/utils/constants' import { MerkleTree, TxStorage, MerkleProof, Constants, Helpers } from 'libzkbob-rs-node' import { NullifierSet } from './nullifierSet' +import { RootSet } from './rootSet' export class PoolState { private tree: MerkleTree private txs: TxStorage public nullifiers: NullifierSet + public roots: RootSet - constructor(private name: string, path: string) { + constructor(private name: string, redis: Redis, path: string) { this.tree = new MerkleTree(`${path}/${name}Tree.db`) this.txs = new TxStorage(`${path}/${name}Txs.db`) - this.nullifiers = new NullifierSet(`${name}-nullifiers`) + this.nullifiers = new NullifierSet(`${name}-nullifiers`, redis) + this.roots = new RootSet(`${name}-roots`, redis) } getVirtualTreeProofInputs(outCommit: string, transferNum?: number) { diff --git a/zp-relayer/state/nullifierSet.ts b/zp-relayer/state/nullifierSet.ts new file mode 100644 index 0000000..cac566f --- /dev/null +++ b/zp-relayer/state/nullifierSet.ts @@ -0,0 +1,22 @@ +import type { Redis } from 'ioredis' +export class NullifierSet { + constructor(public name: string, private redis: Redis) {} + + async add(nullifiers: string[]) { + if (nullifiers.length === 0) return + await this.redis.sadd(this.name, nullifiers) + } + + async remove(nullifiers: string[]) { + if (nullifiers.length === 0) return + await this.redis.srem(this.name, nullifiers) + } + + async isInSet(nullifier: string) { + return await this.redis.sismember(this.name, nullifier) + } + + async clear() { + await this.redis.del(this.name) + } +} diff --git a/zp-relayer/state/rootSet.ts b/zp-relayer/state/rootSet.ts new file mode 100644 index 0000000..70e3f64 --- /dev/null +++ b/zp-relayer/state/rootSet.ts @@ -0,0 +1,23 @@ +import type { Redis } from 'ioredis' + +export class RootSet { + constructor(public name: string, private redis: Redis) {} + + async add(roots: Record) { + if (Object.keys(roots).length === 0) return + await this.redis.hset(this.name, roots) + } + + async remove(indices: string[]) { + if (indices.length === 0) return + await this.redis.hdel(this.name, indices) + } + + async get(index: string) { + return this.redis.hget(this.name, index) + } + + async clear() { + await this.redis.del(this.name) + } +} diff --git a/zp-relayer/test/pool.test.ts b/zp-relayer/test/pool.test.ts index 6446868..368988f 100644 --- a/zp-relayer/test/pool.test.ts +++ b/zp-relayer/test/pool.test.ts @@ -1,30 +1,35 @@ import { expect } from 'chai' -import { decodeMemo } from 'zp-memo-parser/memo' -import { MerkleTree, Constants, Helpers } from 'libzkbob-rs-node' -import depositMemo from './depositMemo.json' -import fs from 'fs' -import { TxType } from 'zp-memo-parser' - -const DB_PATH = './test-tree.db' +import { toBN } from 'web3-utils' +import { EIP1559GasPriceWithinLimit } from '../services/gas-price/GasPrice' +import { checkDeadline } from '../validateTx' describe('Pool', () => { - it('calculates out commit', () => { - const tree = new MerkleTree(DB_PATH) - - const buf = Buffer.from(depositMemo, 'hex') - const memo = decodeMemo(buf, TxType.DEPOSIT) + it('correctly calculates fee limit', () => { + const fees = { + maxFeePerGas: '15', + maxPriorityFeePerGas: '7', + } - tree.appendHash(Buffer.from(memo.accHash)) - memo.noteHashes.forEach(n => tree.appendHash(Buffer.from(n))) + expect(EIP1559GasPriceWithinLimit(fees, toBN(100))).to.eql({ + maxFeePerGas: '15', + maxPriorityFeePerGas: '7', + }) - // Commit calculated from raw hashes - const hashes = [memo.accHash].concat(memo.noteHashes).map(Buffer.from) - const out_commit_calc = Helpers.outCommitmentHash(hashes) - // Commit as a root of subtree with inserted hashes - const out_commit_node = tree.getNode(Constants.OUTLOG, 0) + expect(EIP1559GasPriceWithinLimit(fees, toBN(10))).to.eql({ + maxFeePerGas: '10', + maxPriorityFeePerGas: '7', + }) - expect(out_commit_calc).eq(out_commit_node) + expect(EIP1559GasPriceWithinLimit(fees, toBN(6))).to.eql({ + maxFeePerGas: '6', + maxPriorityFeePerGas: '6', + }) + }) + it('correctly checks deadline', () => { + // curent time + 10 sec + const signedDeadline = toBN(Math.floor(Date.now() / 1000) + 10) - fs.rmdirSync(DB_PATH, { recursive: true }) + expect(checkDeadline(signedDeadline, 7)).to.be.null + expect(checkDeadline(signedDeadline, 11)).to.be.instanceOf(Error) }) }) diff --git a/zp-relayer/txProcessor.ts b/zp-relayer/txProcessor.ts index 411343b..6ef7bf8 100644 --- a/zp-relayer/txProcessor.ts +++ b/zp-relayer/txProcessor.ts @@ -2,14 +2,13 @@ import Contract from 'web3-eth-contract' import PoolAbi from './abi/pool-abi.json' import { AbiItem, toBN } from 'web3-utils' import { logger } from './services/appLogger' -import { TxPayload } from './queue/poolTxQueue' import { TRANSFER_INDEX_SIZE, ENERGY_SIZE, TOKEN_SIZE } from './utils/constants' import { numToHex, flattenProof, truncateHexPrefix } from './utils/helpers' +import { Delta, getTxProofField, parseDelta } from './utils/proofInputs' import { SnarkProof, Proof } from 'libzkbob-rs-node' import { TxType } from 'zp-memo-parser' -import type { Pool } from './pool' - -import { Delta, parseDelta } from './validateTx' +import { pool } from './pool' +import { TxPayload } from './queue/poolTxQueue' // @ts-ignore const PoolInstance = new Contract(PoolAbi as AbiItem[]) @@ -63,32 +62,32 @@ function buildTxData(txData: TxData) { return data.join('') } -export async function processTx(id: string, tx: TxPayload, pool: Pool) { - const { amount, txProof, txType, rawMemo, depositSignature } = tx +export async function processTx(id: string, tx: TxPayload) { + const { txType, txProof, rawMemo: memo, depositSignature } = tx - const logPrefix = `Job ${id}:` - - logger.info(`${logPrefix} Recieved ${txType} tx with ${amount} native amount`) + const nullifier = getTxProofField(txProof, 'nullifier') + const outCommit = getTxProofField(txProof, 'out_commit') + const delta = parseDelta(getTxProofField(txProof, 'delta')) - const delta = parseDelta(txProof.inputs[3]) + const logPrefix = `Job ${id}:` - const outCommit = txProof.inputs[2] const { pub, sec, commitIndex } = pool.optimisticState.getVirtualTreeProofInputs(outCommit) logger.debug(`${logPrefix} Proving tree...`) const treeProof = await Proof.treeAsync(pool.treeParams, pub, sec) logger.debug(`${logPrefix} Tree proved`) + const rootAfter = treeProof.inputs[1] const data = buildTxData({ txProof: txProof.proof, treeProof: treeProof.proof, - nullifier: numToHex(toBN(txProof.inputs[1])), - outCommit: numToHex(toBN(treeProof.inputs[2])), - rootAfter: numToHex(toBN(treeProof.inputs[1])), + nullifier: numToHex(toBN(nullifier)), + outCommit: numToHex(toBN(outCommit)), + rootAfter: numToHex(toBN(rootAfter)), delta, txType, - memo: rawMemo, + memo, depositSignature, }) - return { data, commitIndex } + return { data, commitIndex, rootAfter } } diff --git a/zp-relayer/utils/constants.ts b/zp-relayer/utils/constants.ts index 7b0478d..abcba23 100644 --- a/zp-relayer/utils/constants.ts +++ b/zp-relayer/utils/constants.ts @@ -11,6 +11,7 @@ const constants = { TOKEN_SIZE: 16, POOL_ID_SIZE: 6, ZERO_ADDRESS: '0x0000000000000000000000000000000000000000', + INIT_ROOT: '11469701942666298368112882412133877458305516134926649826543144744382391691533', } export = constants diff --git a/zp-relayer/utils/helpers.ts b/zp-relayer/utils/helpers.ts index 23cb5ef..6c09dae 100644 --- a/zp-relayer/utils/helpers.ts +++ b/zp-relayer/utils/helpers.ts @@ -97,14 +97,22 @@ export async function setIntervalAndRun(f: () => Promise | void, interval: return handler } -export async function withMutex(mutex: Mutex, f: () => Promise): Promise { - const release = await mutex.acquire() +export function withMutex(mutex: Mutex, f: () => Promise): () => Promise { + return async () => { + const release = await mutex.acquire() + try { + return await f() + } finally { + release() + } + } +} + +export async function withErrorLog(f: () => Promise): Promise { try { - const res = await f() - return res + return await f() } catch (e) { + logger.error('Found error: %o', e) throw e - } finally { - release() } } diff --git a/zp-relayer/utils/proofInputs.ts b/zp-relayer/utils/proofInputs.ts new file mode 100644 index 0000000..2d63600 --- /dev/null +++ b/zp-relayer/utils/proofInputs.ts @@ -0,0 +1,36 @@ +import BN from 'bn.js' +import { toBN } from 'web3-utils' +import { Helpers, Proof } from 'libzkbob-rs-node' + +export interface Delta { + transferIndex: BN + energyAmount: BN + tokenAmount: BN + poolId: BN +} + +export function parseDelta(delta: string): Delta { + const { poolId, index, e, v } = Helpers.parseDelta(delta) + return { + transferIndex: toBN(index), + energyAmount: toBN(e), + tokenAmount: toBN(v), + poolId: toBN(poolId), + } +} + +type TxProofField = 'root' | 'nullifier' | 'out_commit' | 'delta' | 'memo' +type TxProofFieldMapping = { + [key in TxProofField]: number +} +const txProofFieldMapping: TxProofFieldMapping = { + root: 0, + nullifier: 1, + out_commit: 2, + delta: 3, + memo: 4, +} +export function getTxProofField({ inputs }: Proof, field: T): string { + if (inputs.length !== 5) throw new Error('Incorrect number of proof inputs') + return inputs[txProofFieldMapping[field]] +} diff --git a/zp-relayer/utils/redisFields.ts b/zp-relayer/utils/redisFields.ts index 19862aa..1d94707 100644 --- a/zp-relayer/utils/redisFields.ts +++ b/zp-relayer/utils/redisFields.ts @@ -41,6 +41,10 @@ export function updateField(key: RelayerKeys, val: any) { return redis.set(key, val) } +export function updateNonce(nonce: number) { + return updateField(RelayerKeys.NONCE, nonce) +} + export async function incrNonce() { const nonce = await redis.incr(RelayerKeys.NONCE) logger.info(`Incremented nonce to ${nonce}`) diff --git a/zp-relayer/utils/web3Errors.ts b/zp-relayer/utils/web3Errors.ts new file mode 100644 index 0000000..25cec2e --- /dev/null +++ b/zp-relayer/utils/web3Errors.ts @@ -0,0 +1,25 @@ +export function isGasPriceError(e: Error) { + const message = e.message.toLowerCase() + return message.includes('replacement transaction underpriced') +} + +export function isSameTransactionError(e: Error) { + const message = e.message.toLowerCase() + return ( + message.includes('transaction with the same hash was already imported') || + message.includes('already known') || + message.includes('alreadyknown') || + message.includes('transaction already imported') + ) +} + +export function isNonceError(e: Error) { + const message = e.message.toLowerCase() + return ( + message.includes('transaction nonce is too low') || + message.includes('nonce too low') || + message.includes('transaction with same nonce in the queue') || + message.includes('oldnonce') || + message.includes(`the tx doesn't have the correct nonce`) + ) +} diff --git a/zp-relayer/validateTx.ts b/zp-relayer/validateTx.ts index b71cb56..775a763 100644 --- a/zp-relayer/validateTx.ts +++ b/zp-relayer/validateTx.ts @@ -1,29 +1,24 @@ import BN from 'bn.js' import { toBN, AbiItem } from 'web3-utils' import { TxType, TxData, WithdrawTxData, PermittableDepositTxData, getTxData } from 'zp-memo-parser' -import { Helpers, Proof } from 'libzkbob-rs-node' +import { Proof, SnarkProof } from 'libzkbob-rs-node' import { logger } from './services/appLogger' import config from './config' -import { Limits, pool, PoolTx } from './pool' -import { NullifierSet } from './nullifierSet' +import type { Limits, Pool } from './pool' +import { NullifierSet } from './state/nullifierSet' import TokenAbi from './abi/token-abi.json' import { web3 } from './services/web3' import { numToHex, unpackSignature } from './utils/helpers' import { recoverSaltedPermit } from './utils/EIP712SaltedPermit' import { ZERO_ADDRESS } from './utils/constants' import { TxPayload } from './queue/poolTxQueue' +import { getTxProofField, parseDelta } from './utils/proofInputs' +import { RootSet } from './state/rootSet' const tokenContract = new web3.eth.Contract(TokenAbi as AbiItem[], config.tokenAddress) const ZERO = toBN(0) -export interface Delta { - transferIndex: BN - energyAmount: BN - tokenAmount: BN - poolId: BN -} - type OptionError = Error | null export async function checkAssertion(f: () => Promise | OptionError) { const err = await f() @@ -50,10 +45,10 @@ export function checkCommitment(treeProof: Proof, txProof: Proof) { return treeProof.inputs[2] === txProof.inputs[2] } -export function checkTxProof(txProof: Proof) { - const res = pool.verifyProof(txProof.proof, txProof.inputs) +export function checkProof(txProof: Proof, verify: (p: SnarkProof, i: Array) => boolean) { + const res = verify(txProof.proof, txProof.inputs) if (!res) { - return new Error('Incorrect transfer proof') + return new Error('Incorrect snark proof') } return null } @@ -69,23 +64,20 @@ export function checkTransferIndex(contractPoolIndex: BN, transferIndex: BN) { return new Error(`Incorrect transfer index`) } -export function checkTxSpecificFields(txType: TxType, tokenAmount: BN, energyAmount: BN, txData: TxData, msgValue: BN) { +export function checkTxSpecificFields(txType: TxType, tokenAmount: BN, energyAmount: BN, txData: TxData) { logger.debug( - 'TOKENS %s, ENERGY %s, TX DATA %s, MSG VALUE %s', + 'TOKENS %s, ENERGY %s, TX DATA %s', tokenAmount.toString(), energyAmount.toString(), - JSON.stringify(txData), - msgValue.toString() + JSON.stringify(txData) ) let isValid = false if (txType === TxType.DEPOSIT || txType === TxType.PERMITTABLE_DEPOSIT) { - isValid = tokenAmount.gte(ZERO) && energyAmount.eq(ZERO) && msgValue.eq(ZERO) + isValid = tokenAmount.gte(ZERO) && energyAmount.eq(ZERO) } else if (txType === TxType.TRANSFER) { - isValid = tokenAmount.eq(ZERO) && energyAmount.eq(ZERO) && msgValue.eq(ZERO) + isValid = tokenAmount.eq(ZERO) && energyAmount.eq(ZERO) } else if (txType === TxType.WITHDRAWAL) { - const nativeAmount = (txData as WithdrawTxData).nativeAmount isValid = tokenAmount.lte(ZERO) && energyAmount.lte(ZERO) - isValid = isValid && msgValue.eq(nativeAmount.mul(pool.denominator)) } if (!isValid) { return new Error('Tx specific fields are incorrect') @@ -93,16 +85,6 @@ export function checkTxSpecificFields(txType: TxType, tokenAmount: BN, energyAmo return null } -export function parseDelta(delta: string): Delta { - const { poolId, index, e, v } = Helpers.parseDelta(delta) - return { - transferIndex: toBN(index), - energyAmount: toBN(e), - tokenAmount: toBN(v), - poolId: toBN(poolId), - } -} - export function checkNativeAmount(nativeAmount: BN | null) { logger.debug(`Native amount: ${nativeAmount}`) // Check native amount (relayer faucet) @@ -120,11 +102,15 @@ export function checkFee(fee: BN) { return null } -export function checkDeadline(deadline: BN) { - logger.debug(`Deadline: ${deadline}`) +/** + * @param signedDeadline deadline signed by user, in seconds + * @param threshold "window" added to curent relayer time, in seconds + */ +export function checkDeadline(signedDeadline: BN, threshold: number) { + logger.debug(`Deadline: ${signedDeadline}`) // Check native amount (relayer faucet) const currentTimestamp = new BN(Math.floor(Date.now() / 1000)) - if (deadline <= currentTimestamp) { + if (signedDeadline <= currentTimestamp.addn(threshold)) { return new Error(`Deadline is expired`) } return null @@ -189,12 +175,10 @@ async function getRecoveredAddress( spender, value: tokenAmount.toString(10), nonce, - deadline: deadline.toString(10), + deadline, salt: nullifier, } recoveredAddress = recoverSaltedPermit(message, sig) - - await checkAssertion(() => checkDeadline(deadline)) } else { throw new Error('Unsupported txtype') } @@ -202,29 +186,46 @@ async function getRecoveredAddress( return recoveredAddress } -export async function validateTx( - { txType, rawMemo, txProof, depositSignature }: TxPayload, - delta: Delta, - nullifier: string -) { +async function checkRoot(index: BN, proofRoot: string, poolSet: RootSet, optimisticSet: RootSet) { + const indexStr = index.toString(10) + + const root = (await poolSet.get(indexStr)) || (await optimisticSet.get(indexStr)) + + if (root === null) { + return new Error(`Root ${proofRoot} at ${indexStr} not found`) + } + if (root !== proofRoot) { + return new Error(`Incorrect root at index ${indexStr}: given ${proofRoot}, expected ${root}`) + } + + return null +} + +export async function validateTx({ txType, rawMemo, txProof, depositSignature }: TxPayload, pool: Pool) { const buf = Buffer.from(rawMemo, 'hex') const txData = getTxData(buf, txType) + const root = getTxProofField(txProof, 'root') + const nullifier = getTxProofField(txProof, 'nullifier') + const delta = parseDelta(getTxProofField(txProof, 'delta')) + const fee = toBN(txData.fee) + + await checkAssertion(() => checkRoot(delta.transferIndex, root, pool.state.roots, pool.optimisticState.roots)) await checkAssertion(() => checkNullifier(nullifier, pool.state.nullifiers)) await checkAssertion(() => checkNullifier(nullifier, pool.optimisticState.nullifiers)) await checkAssertion(() => checkTransferIndex(toBN(pool.optimisticState.getNextIndex()), delta.transferIndex)) - await checkAssertion(() => checkFee(txData.fee)) + await checkAssertion(() => checkFee(fee)) if (txType === TxType.WITHDRAWAL) { const nativeAmount = (txData as WithdrawTxData).nativeAmount - await checkAssertion(() => checkNativeAmount(nativeAmount)) + await checkAssertion(() => checkNativeAmount(toBN(nativeAmount))) } - await checkAssertion(() => checkTxProof(txProof)) + await checkAssertion(() => checkProof(txProof, (p, i) => pool.verifyProof(p, i))) - const tokenAmountWithFee = delta.tokenAmount.add(txData.fee) - await checkAssertion(() => checkTxSpecificFields(txType, tokenAmountWithFee, delta.energyAmount, txData, toBN('0'))) + const tokenAmountWithFee = delta.tokenAmount.add(fee) + await checkAssertion(() => checkTxSpecificFields(txType, tokenAmountWithFee, delta.energyAmount, txData)) const requiredTokenAmount = tokenAmountWithFee.mul(pool.denominator) let userAddress = ZERO_ADDRESS @@ -232,7 +233,13 @@ export async function validateTx( userAddress = await getRecoveredAddress(txType, nullifier, txData, requiredTokenAmount, depositSignature) await checkAssertion(() => checkDepositEnoughBalance(userAddress, requiredTokenAmount)) } + if (txType === TxType.PERMITTABLE_DEPOSIT) { + const deadline = (txData as PermittableDepositTxData).deadline + await checkAssertion(() => checkDeadline(toBN(deadline), config.permitDeadlineThresholdInitial)) + } const limits = await pool.getLimitsFor(userAddress) await checkAssertion(() => checkLimits(limits, delta.tokenAmount)) + + return txData } diff --git a/zp-relayer/workers/poolTxWorker.ts b/zp-relayer/workers/poolTxWorker.ts index 2ad1ea7..0ce191e 100644 --- a/zp-relayer/workers/poolTxWorker.ts +++ b/zp-relayer/workers/poolTxWorker.ts @@ -4,18 +4,19 @@ import { web3 } from '@/services/web3' import { logger } from '@/services/appLogger' import { TxPayload } from '@/queue/poolTxQueue' import { TX_QUEUE_NAME, OUTPLUSONE, MAX_SENT_LIMIT } from '@/utils/constants' -import { readNonce, updateField, RelayerKeys, incrNonce } from '@/utils/redisFields' -import { numToHex, truncateMemoTxPrefix, withMutex } from '@/utils/helpers' +import { readNonce, updateField, RelayerKeys, incrNonce, updateNonce } from '@/utils/redisFields' +import { numToHex, truncateMemoTxPrefix, withErrorLog, withMutex } from '@/utils/helpers' import { signAndSend } from '@/tx/signAndSend' import { pool } from '@/pool' import { sentTxQueue } from '@/queue/sentTxQueue' import { processTx } from '@/txProcessor' import config from '@/config' import { redis } from '@/services/redisClient' -import { parseDelta, validateTx } from '@/validateTx' +import { validateTx } from '@/validateTx' import type { EstimationType, GasPrice } from '@/services/gas-price' import type { Mutex } from 'async-mutex' import { getChainId } from '@/utils/web3' +import { getTxProofField } from '@/utils/proofInputs' const WORKER_OPTIONS = { autorun: false, @@ -36,13 +37,9 @@ export async function createPoolTxWorker(gasPrice: Gas for (const tx of txs) { const { gas, amount, rawMemo, txType, txProof } = tx - const nullifier = txProof.inputs[1] - const outCommit = txProof.inputs[2] - const delta = parseDelta(txProof.inputs[3]) + const txData = await validateTx(tx, pool) - await validateTx(tx, delta, nullifier) - - const { data, commitIndex } = await processTx(job.id as string, tx, pool) + const { data, commitIndex, rootAfter } = await processTx(job.id as string, tx) const nonce = await incrNonce() logger.info(`${logPrefix} nonce: ${nonce}`) @@ -55,33 +52,50 @@ export async function createPoolTxWorker(gasPrice: Gas gas, to: config.poolAddress, chainId: CHAIN_ID, - ...gasPriceOptions, } try { - const txHash = await signAndSend(txConfig, config.relayerPrivateKey, web3) + const txHash = await signAndSend( + { + ...txConfig, + ...gasPriceOptions, + }, + config.relayerPrivateKey, + web3 + ) logger.debug(`${logPrefix} TX hash ${txHash}`) await updateField(RelayerKeys.TRANSFER_NUM, commitIndex * OUTPLUSONE) + const nullifier = getTxProofField(txProof, 'nullifier') + const outCommit = getTxProofField(txProof, 'out_commit') + const truncatedMemo = truncateMemoTxPrefix(rawMemo, txType) - const txData = numToHex(toBN(outCommit)).concat(txHash.slice(2)).concat(truncatedMemo) + const prefixedMemo = numToHex(toBN(outCommit)).concat(txHash.slice(2)).concat(truncatedMemo) - pool.optimisticState.updateState(commitIndex, outCommit, txData) - logger.info('Adding nullifier %s to OS', nullifier) + pool.optimisticState.updateState(commitIndex, outCommit, prefixedMemo) + logger.debug('Adding nullifier %s to OS', nullifier) await pool.optimisticState.nullifiers.add([nullifier]) + const poolIndex = (commitIndex + 1) * OUTPLUSONE + logger.debug('Adding root %s at %s to OS', rootAfter, poolIndex) + await pool.optimisticState.roots.add({ + [poolIndex]: rootAfter, + }) txHashes.push(txHash) await sentTxQueue.add( txHash, { - payload: tx, + txType, + root: rootAfter, outCommit, commitIndex, txHash, - txData, + prefixedMemo, nullifier, - txConfig: {}, + txConfig, + gasPriceOptions, + txData, }, { delay: config.sentTxDelay, @@ -102,10 +116,10 @@ export async function createPoolTxWorker(gasPrice: Gas return txHashes } - await updateField(RelayerKeys.NONCE, await readNonce(true)) + await updateNonce(await readNonce(true)) const poolTxWorker = new Worker( TX_QUEUE_NAME, - job => withMutex(mutex, () => poolTxWorkerProcessor(job)), + job => withErrorLog(withMutex(mutex, () => poolTxWorkerProcessor(job))), WORKER_OPTIONS ) diff --git a/zp-relayer/workers/sentTxWorker.ts b/zp-relayer/workers/sentTxWorker.ts index a9fa9a8..3f4e4eb 100644 --- a/zp-relayer/workers/sentTxWorker.ts +++ b/zp-relayer/workers/sentTxWorker.ts @@ -1,15 +1,20 @@ +import type { Mutex } from 'async-mutex' +import { toBN } from 'web3-utils' import { Job, Queue, Worker } from 'bullmq' +import { PermittableDepositTxData, TxType } from 'zp-memo-parser' +import config from '@/config' +import { pool } from '@/pool' import { web3 } from '@/services/web3' import { logger } from '@/services/appLogger' -import { SENT_TX_QUEUE_NAME } from '@/utils/constants' -import { pool } from '@/pool' -import { SentTxPayload, sentTxQueue } from '@/queue/sentTxQueue' import { redis } from '@/services/redisClient' -import type { GasPrice, EstimationType, GasPriceValue } from '@/services/gas-price' -import type { TransactionConfig } from 'web3-core' -import type { Mutex } from 'async-mutex' -import { withMutex } from '@/utils/helpers' -import config from '@/config' +import { GasPrice, EstimationType, chooseGasPriceOptions } from '@/services/gas-price' +import { withErrorLog, withMutex } from '@/utils/helpers' +import { readNonce, updateNonce } from '@/utils/redisFields' +import { OUTPLUSONE, SENT_TX_QUEUE_NAME } from '@/utils/constants' +import { isGasPriceError, isNonceError, isSameTransactionError } from '@/utils/web3Errors' +import { SentTxPayload, sentTxQueue, SentTxResult, SentTxState } from '@/queue/sentTxQueue' +import { signAndSend } from '@/tx/signAndSend' +import { checkAssertion, checkDeadline } from '@/validateTx' const token = 'RELAYER' @@ -31,14 +36,6 @@ async function checkMarked(id: string) { return Boolean(inSet) } -function updateTxGasPrice(txConfig: TransactionConfig, newGasPrice: GasPriceValue) { - const newTxConfig = { - ...txConfig, - ...newGasPrice, - } - return newTxConfig -} - async function collectBatch(queue: Queue) { const jobs = await queue.getJobs(['delayed', 'waiting']) @@ -58,18 +55,47 @@ async function collectBatch(queue: Queue) { return jobs } +async function clearOptimisticState() { + // TODO: a more efficient strategy would be to collect all other jobs + // and move them to 'failed' state as we know they will be reverted + // To do this we need to acquire a lock for each job. Did not find + // an easy way to do that yet. See 'collectBatch' + + // XXX: txs marked as failed potentially could mine + // We should either try to resend them until we are sure + // they have mined or try to make new replacement txs + // with higher gasPrice + const jobs = await sentTxQueue.getJobs(['delayed', 'waiting']) + const ids = jobs.map(j => j.id as string) + logger.info('Marking ids %j as failed', ids) + await markFailed(ids) + + logger.info('Rollback optimistic state...') + pool.optimisticState.rollbackTo(pool.state) + logger.info('Clearing optimistic nullifiers...') + await pool.optimisticState.nullifiers.clear() + logger.info('Clearing optimistic roots...') + await pool.optimisticState.roots.clear() + + const root1 = pool.state.getMerkleRoot() + const root2 = pool.optimisticState.getMerkleRoot() + logger.info(`Assert roots are equal: ${root1}, ${root2}, ${root1 === root2}`) +} + export async function createSentTxWorker(gasPrice: GasPrice, mutex: Mutex) { const sentTxWorkerProcessor = async (job: Job) => { const logPrefix = `SENT WORKER: Job ${job.id}:` logger.info('%s processing...', logPrefix) + const { txType, txHash, prefixedMemo, commitIndex, outCommit, nullifier, root, txData } = job.data + // TODO: it is possible that a tx marked as failed could be stuck + // in the mempool. Worker should either assure that it is mined + // or try to substitute such transaction with another one if (await checkMarked(job.id as string)) { logger.info('%s marked as failed, skipping', logPrefix) - return null + return [SentTxState.REVERT, txHash] as SentTxResult } - const { txHash, txData, commitIndex, outCommit, nullifier, payload } = job.data - const tx = await web3.eth.getTransactionReceipt(txHash) if (tx) { // Tx mined @@ -77,7 +103,7 @@ export async function createSentTxWorker(gasPrice: Gas // Successful logger.debug('%s Transaction %s was successfully mined at block %s', logPrefix, txHash, tx.blockNumber) - pool.state.updateState(commitIndex, outCommit, txData) + pool.state.updateState(commitIndex, outCommit, prefixedMemo) // Add nullifer to confirmed state and remove from optimistic one logger.info('Adding nullifier %s to PS', nullifier) @@ -85,6 +111,13 @@ export async function createSentTxWorker(gasPrice: Gas logger.info('Removing nullifier %s from OS', nullifier) await pool.optimisticState.nullifiers.remove([nullifier]) + // Add root to confirmed state and remove from optimistic one + const poolIndex = ((commitIndex + 1) * OUTPLUSONE).toString(10) + logger.info('Adding root %s %s to PS', poolIndex, root) + await pool.state.roots.add({ [poolIndex]: root }) + logger.info('Removing root %s %s from OS', poolIndex, root) + await pool.optimisticState.roots.remove([poolIndex]) + const node1 = pool.state.getCommitment(commitIndex) const node2 = pool.optimisticState.getCommitment(commitIndex) logger.info(`Assert commitments are equal: ${node1}, ${node2}`) @@ -92,52 +125,81 @@ export async function createSentTxWorker(gasPrice: Gas logger.error('Commitments are not equal') } - return txHash + return [SentTxState.MINED, txHash] as SentTxResult } else { // Revert logger.error('%s Transaction %s reverted at block %s', logPrefix, txHash, tx.blockNumber) - // TODO: a more efficient strategy would be to collect all other jobs - // and move them to 'failed' state as we know they will be reverted - // To do this we need to acquire a lock for each job. Did not find - // an easy way to do that yet. See 'collectBatch' - const jobs = await sentTxQueue.getJobs(['delayed', 'waiting']) - const ids = jobs.map(j => j.id as string) - logger.info('%s marking ids %j as failed', logPrefix, ids) - await markFailed(ids) - - logger.info('Rollback optimistic state...') - pool.optimisticState.rollbackTo(pool.state) - logger.info('Clearing optimistic nullifiers...') - await pool.optimisticState.nullifiers.clear() - const root1 = pool.state.getMerkleRoot() - const root2 = pool.optimisticState.getMerkleRoot() - logger.info(`Assert roots are equal: ${root1}, ${root2}, ${root1 === root2}`) + await clearOptimisticState() + return [SentTxState.REVERT, txHash] as SentTxResult } } else { + // Resend with updated gas price const txConfig = job.data.txConfig - const oldGasPrice = txConfig.gasPrice - const newGasPrice = gasPrice.getPrice() + const oldGasPrice = job.data.gasPriceOptions + const fetchedGasPrice = gasPrice.getPrice() - logger.warn('Tx %s is not mined; updating gasPrice: %o -> %o', txHash, oldGasPrice, newGasPrice) + const newGasPrice = chooseGasPriceOptions(oldGasPrice, fetchedGasPrice) - const newTxConfig = updateTxGasPrice(txConfig, newGasPrice) + logger.warn('Tx %s is not mined; updating gasPrice: %o -> %o', txHash, oldGasPrice, newGasPrice) - const newJobData = { - ...job.data, - txConfig: newTxConfig, + const newTxConfig = { + ...txConfig, + ...newGasPrice, } - await sentTxQueue.add(txHash, newJobData, { - priority: txConfig.nonce, - delay: config.sentTxDelay, - }) + try { + if (txType === TxType.PERMITTABLE_DEPOSIT) { + const deadline = (txData as PermittableDepositTxData).deadline + await checkAssertion(() => checkDeadline(toBN(deadline), config.permitDeadlineThresholdResend)) + } + + const newTxHash = await signAndSend(newTxConfig, config.relayerPrivateKey, web3) + + // Add updated job + await sentTxQueue.add( + newTxHash, + { + ...job.data, + txHash: newTxHash, + txConfig: newTxConfig, + gasPriceOptions: newGasPrice, + }, + { + priority: txConfig.nonce, + delay: config.sentTxDelay, + } + ) + return [SentTxState.RESEND, newTxHash] as SentTxResult + } catch (e) { + const err = e as Error + logger.warn('%s: Tx resend failed for %s: %s', logPrefix, txHash, err.message) + if (isSameTransactionError(err) || isGasPriceError(err)) { + // Force update gas price + gasPrice.start() + } else if (isNonceError(err)) { + await updateNonce(await readNonce(true)) + } else { + // Error can't be handled + logger.error('%s: Error cannot be handled: %o', logPrefix, err) + // Rollback the tree + await clearOptimisticState() + return [SentTxState.FAILED, txHash] as SentTxResult + } + + // Add same job + await sentTxQueue.add(txHash, job.data, { + priority: txConfig.nonce, + delay: config.sentTxDelay, + }) + return [SentTxState.RESEND, txHash] as SentTxResult + } } } - const sentTxWorker = new Worker( + const sentTxWorker = new Worker( SENT_TX_QUEUE_NAME, - job => withMutex(mutex, () => sentTxWorkerProcessor(job)), + job => withErrorLog(withMutex(mutex, () => sentTxWorkerProcessor(job))), WORKER_OPTIONS )