Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/updated hash #97

Merged
merged 7 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 78 additions & 15 deletions zp-relayer/endpoints.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
checkSendTransactionErrors,
checkSendTransactionsErrors,
} from './validation/validation'
import { sentTxQueue, SentTxState } from './queue/sentTxQueue'

async function sendTransactions(req: Request, res: Response, next: NextFunction) {
const errors = checkSendTransactionsErrors(req.body)
Expand Down Expand Up @@ -108,22 +109,84 @@ async function getTransactionsV2(req: Request, res: Response, next: NextFunction
}

async function getJob(req: Request, res: Response) {
enum JobStatus {
WAITING = 'waiting',
FAILED = 'failed',
SENT = 'sent',
REVERTED = 'reverted',
COMPLETED = 'completed',
}

interface GetJobResponse {
k1rill-fedoseev marked this conversation as resolved.
Show resolved Hide resolved
resolvedJobId: string
createdOn: number
failedReason: null | string
finishedOn: null | number
state: JobStatus
txHash: null | string
}

const jobId = req.params.id
const job = await poolTxQueue.getJob(jobId)
if (job) {
const state = await job.getState()
const txHash = job.returnvalue
const failedReason = job.failedReason
const createdOn = job.timestamp
const finishedOn = job.finishedOn

res.json({
state,
txHash,
failedReason,
createdOn,
finishedOn,
})

async function getPoolJobState(requestedJobId: string): Promise<GetJobResponse | null> {
const jobId = await pool.state.jobIdsMapping.get(requestedJobId)
k1rill-fedoseev marked this conversation as resolved.
Show resolved Hide resolved
const job = await poolTxQueue.getJob(jobId)
if (!job) return null

// Default result object
let result: GetJobResponse = {
resolvedJobId: jobId,
createdOn: job.timestamp,
failedReason: null,
finishedOn: null,
state: JobStatus.WAITING,
txHash: null,
}

const poolJobState = await job.getState()
if (poolJobState === 'completed') {
// Transaction was included in optimistic state, waiting to be mined
const sentJobId = job.returnvalue[0][1]
const sentJob = await sentTxQueue.getJob(sentJobId)
// Should not happen here, but need to verify to be sure
if (!sentJob) throw new Error('Sent job not found')

const sentJobState = await sentJob.getState()
if (sentJobState === 'waiting' || sentJobState === 'active' || sentJobState === 'delayed') {
// Transaction is in re-send loop
const txHash = sentJob.data.prevAttempts.at(-1)?.[0]
result.state = JobStatus.SENT
result.txHash = txHash || null
} else if (sentJobState === 'completed') {
const [txState, txHash] = sentJob.returnvalue
k1rill-fedoseev marked this conversation as resolved.
Show resolved Hide resolved
if (txState === SentTxState.MINED) {
// Transaction mined successfully
result.state = JobStatus.COMPLETED
result.txHash = txHash
result.finishedOn = sentJob.finishedOn || null
} else if (txState === SentTxState.REVERT) {
// Transaction reverted
result.state = JobStatus.REVERTED
result.txHash = txHash
result.finishedOn = sentJob.finishedOn || null
}
}
} else if (poolJobState === 'failed') {
// Either validation or tx sendind failed
result.state = JobStatus.FAILED
result.failedReason = job.failedReason
result.finishedOn = job.finishedOn || null
}
// Other states mean that transaction is either waiting in queue
// or being processed by worker
// So, no need to update `result` object

return result
}

const jobState = await getPoolJobState(jobId)
if (jobState) {
res.json(jobState)
} else {
res.json(`Job ${jobId} not found`)
}
Expand Down
3 changes: 2 additions & 1 deletion zp-relayer/queue/sentTxQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import { TxPayload } from './poolTxQueue'

export type SendAttempt = [string, GasPriceValue]
export interface SentTxPayload {
poolJobId: string
root: string
outCommit: string
commitIndex: number
prefixedMemo: string
truncatedMemo: string
txConfig: TransactionConfig
nullifier: string
txPayload: TxPayload
Expand Down
5 changes: 5 additions & 0 deletions zp-relayer/state/PoolState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@ import { OUTPLUSONE } from '@/utils/constants'
import { MerkleTree, TxStorage, MerkleProof, Constants, Helpers } from 'libzkbob-rs-node'
import { NullifierSet } from './nullifierSet'
import { RootSet } from './rootSet'
import { JobIdsMapping } from './jobIdsMapping'

export class PoolState {
private tree: MerkleTree
private txs: TxStorage
public nullifiers: NullifierSet
public roots: RootSet
public jobIdsMapping: JobIdsMapping

constructor(private name: string, redis: Redis, path: string) {
this.tree = new MerkleTree(`${path}/${name}Tree.db`)
this.txs = new TxStorage(`${path}/${name}Txs.db`)
this.nullifiers = new NullifierSet(`${name}-nullifiers`, redis)
this.roots = new RootSet(`${name}-roots`, redis)
// This structure can be shared among different pool states
// So, use constant name
this.jobIdsMapping = new JobIdsMapping('job-id-mapping', redis)
}

getVirtualTreeProofInputs(outCommit: string, transferNum?: number) {
Expand Down
28 changes: 28 additions & 0 deletions zp-relayer/state/jobIdsMapping.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import type { Redis } from 'ioredis'

export class JobIdsMapping {
constructor(public name: string, private redis: Redis) {}

async add(mapping: Record<string, string>) {
if (Object.keys(mapping).length === 0) return
await this.redis.hset(this.name, mapping)
}

async remove(indices: string[]) {
if (indices.length === 0) return
await this.redis.hdel(this.name, ...indices)
}

async get(id: string): Promise<string> {
const mappedId = await this.redis.hget(this.name, id)
if (mappedId) {
return await this.get(mappedId)
} else {
return id
}
}

async clear() {
await this.redis.del(this.name)
}
}
11 changes: 10 additions & 1 deletion zp-relayer/utils/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { logger } from '@/services/appLogger'
import { SnarkProof } from 'libzkbob-rs-node'
import { TxType } from 'zp-memo-parser'
import type { Mutex } from 'async-mutex'
import { TxValidationError } from '@/validateTx'

const S_MASK = toBN('0x7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff')
const S_MAX = toBN('0x7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF5D576E7357A4501DDFE92F46681B20A0')
Expand Down Expand Up @@ -91,6 +92,10 @@ export function flattenProof(p: SnarkProof): string {
.join('')
}

export function buildPrefixedMemo(outCommit: string, txHash: string, truncatedMemo: string) {
return numToHex(toBN(outCommit)).concat(txHash.slice(2)).concat(truncatedMemo)
}

export async function setIntervalAndRun(f: () => Promise<void> | void, interval: number) {
const handler = setInterval(f, interval)
await f()
Expand All @@ -112,7 +117,11 @@ export async function withErrorLog<R>(f: () => Promise<R>): Promise<R> {
try {
return await f()
} catch (e) {
logger.error('Found error: %s', (e as Error).message)
if (e instanceof TxValidationError) {
logger.warn('Validation error: %s', (e as Error).message)
} else {
logger.error('Found error: %s', (e as Error).message)
}
throw e
}
}
Expand Down
44 changes: 26 additions & 18 deletions zp-relayer/validateTx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@ const tokenContract = new web3.eth.Contract(TokenAbi as AbiItem[], config.tokenA

const ZERO = toBN(0)

export class TxValidationError extends Error {
constructor(message: string) {
super(message)
}
}

type OptionError = Error | null
export async function checkAssertion(f: () => Promise<OptionError> | OptionError) {
const err = await f()
if (err) {
logger.warn('Assertion error: %s', err.message)
throw err
}
}
Expand All @@ -36,7 +41,7 @@ export async function checkBalance(address: string, minBalance: string) {
const balance = await tokenContract.methods.balanceOf(address).call()
const res = toBN(balance).gte(toBN(minBalance))
if (!res) {
return new Error('Not enough balance for deposit')
return new TxValidationError('Not enough balance for deposit')
}
return null
}
Expand All @@ -48,20 +53,20 @@ export function checkCommitment(treeProof: Proof, txProof: Proof) {
export function checkProof(txProof: Proof, verify: (p: SnarkProof, i: Array<string>) => boolean) {
const res = verify(txProof.proof, txProof.inputs)
if (!res) {
return new Error('Incorrect snark proof')
return new TxValidationError('Incorrect snark proof')
}
return null
}

export async function checkNullifier(nullifier: string, nullifierSet: NullifierSet) {
const inSet = await nullifierSet.isInSet(nullifier)
if (inSet === 0) return null
return new Error(`Doublespend detected in ${nullifierSet.name}`)
return new TxValidationError(`Doublespend detected in ${nullifierSet.name}`)
}

export function checkTransferIndex(contractPoolIndex: BN, transferIndex: BN) {
if (transferIndex.lte(contractPoolIndex)) return null
return new Error(`Incorrect transfer index`)
return new TxValidationError(`Incorrect transfer index`)
}

export function checkTxSpecificFields(txType: TxType, tokenAmount: BN, energyAmount: BN, txData: TxData) {
Expand All @@ -80,7 +85,7 @@ export function checkTxSpecificFields(txType: TxType, tokenAmount: BN, energyAmo
isValid = tokenAmount.lte(ZERO) && energyAmount.lte(ZERO)
}
if (!isValid) {
return new Error('Tx specific fields are incorrect')
return new TxValidationError('Tx specific fields are incorrect')
}
return null
}
Expand All @@ -89,15 +94,15 @@ export function checkNativeAmount(nativeAmount: BN | null) {
logger.debug(`Native amount: ${nativeAmount}`)
// Check native amount (relayer faucet)
if (nativeAmount && nativeAmount > config.maxFaucet) {
return new Error('Native amount too high')
return new TxValidationError('Native amount too high')
}
return null
}

export function checkFee(fee: BN) {
logger.debug(`Fee: ${fee}`)
if (fee.lt(config.relayerFee)) {
return new Error('Fee too low')
return new TxValidationError('Fee too low')
}
return null
}
Expand All @@ -111,36 +116,36 @@ export function checkDeadline(signedDeadline: BN, threshold: number) {
// Check native amount (relayer faucet)
const currentTimestamp = new BN(Math.floor(Date.now() / 1000))
if (signedDeadline <= currentTimestamp.addn(threshold)) {
return new Error(`Deadline is expired`)
return new TxValidationError(`Deadline is expired`)
}
return null
}

export function checkLimits(limits: Limits, amount: BN) {
if (amount.gt(toBN(0))) {
if (amount.gt(limits.depositCap)) {
return new Error('Single deposit cap exceeded')
return new TxValidationError('Single deposit cap exceeded')
}
if (limits.tvl.add(amount).gte(limits.tvlCap)) {
return new Error('Tvl cap exceeded')
return new TxValidationError('Tvl cap exceeded')
}
if (limits.dailyUserDepositCapUsage.add(amount).gt(limits.dailyUserDepositCap)) {
return new Error('Daily user deposit cap exceeded')
return new TxValidationError('Daily user deposit cap exceeded')
}
if (limits.dailyDepositCapUsage.add(amount).gt(limits.dailyDepositCap)) {
return new Error('Daily deposit cap exceeded')
return new TxValidationError('Daily deposit cap exceeded')
}
} else {
if (limits.dailyWithdrawalCapUsage.sub(amount).gt(limits.dailyWithdrawalCap)) {
return new Error('Daily withdrawal cap exceeded')
return new TxValidationError('Daily withdrawal cap exceeded')
}
}
return null
}

async function checkDepositEnoughBalance(address: string, requiredTokenAmount: BN) {
if (requiredTokenAmount.lte(toBN(0))) {
throw new Error('Requested balance check for token amount <= 0')
throw new TxValidationError('Requested balance check for token amount <= 0')
}

return checkBalance(address, requiredTokenAmount.toString(10))
Expand All @@ -156,7 +161,7 @@ async function getRecoveredAddress(
// Signature without `0x` prefix, size is 64*2=128
await checkAssertion(() => {
if (depositSignature !== null && checkSize(depositSignature, 128)) return null
return new Error('Invalid deposit signature')
return new TxValidationError('Invalid deposit signature size')
})
const nullifier = '0x' + numToHex(toBN(proofNullifier))
const sig = unpackSignature(depositSignature as string)
Expand All @@ -179,8 +184,11 @@ async function getRecoveredAddress(
salt: nullifier,
}
recoveredAddress = recoverSaltedPermit(message, sig)
if (recoveredAddress.toLowerCase() !== owner.toLowerCase()) {
throw new TxValidationError(`Invalid deposit signer; Restored: ${recoveredAddress}; Expected: ${owner}`)
}
} else {
throw new Error('Unsupported txtype')
throw new TxValidationError('Unsupported txtype')
}

return recoveredAddress
Expand All @@ -207,7 +215,7 @@ async function checkRoot(
}

if (root !== proofRoot) {
return new Error(`Incorrect root at index ${indexStr}: given ${proofRoot}, expected ${root}`)
return new TxValidationError(`Incorrect root at index ${indexStr}: given ${proofRoot}, expected ${root}`)
}

// If recieved correct root from contract update cache (only confirmed state)
Expand Down
9 changes: 5 additions & 4 deletions zp-relayer/workers/poolTxWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { logger } from '@/services/appLogger'
import { PoolTxResult, TxPayload } from '@/queue/poolTxQueue'
import { TX_QUEUE_NAME, OUTPLUSONE } from '@/utils/constants'
import { readNonce, updateField, RelayerKeys, updateNonce } from '@/utils/redisFields'
import { numToHex, truncateMemoTxPrefix, withErrorLog, withMutex } from '@/utils/helpers'
import { buildPrefixedMemo, truncateMemoTxPrefix, withErrorLog, withMutex } from '@/utils/helpers'
import { signTransaction, sendTransaction } from '@/tx/signAndSend'
import { Pool, pool } from '@/pool'
import { sentTxQueue } from '@/queue/sentTxQueue'
Expand Down Expand Up @@ -78,15 +78,15 @@ export async function createPoolTxWorker<T extends EstimationType>(

await updateNonce(++nonce)

logger.debug(`${logPrefix} TX hash ${txHash}`)
logger.info(`${logPrefix} TX hash ${txHash}`)

await updateField(RelayerKeys.TRANSFER_NUM, commitIndex * OUTPLUSONE)

const nullifier = getTxProofField(txProof, 'nullifier')
const outCommit = getTxProofField(txProof, 'out_commit')

const truncatedMemo = truncateMemoTxPrefix(rawMemo, txType)
const prefixedMemo = numToHex(toBN(outCommit)).concat(txHash.slice(2)).concat(truncatedMemo)
const prefixedMemo = buildPrefixedMemo(outCommit, txHash, truncatedMemo)

pool.optimisticState.updateState(commitIndex, outCommit, prefixedMemo)
logger.debug('Adding nullifier %s to OS', nullifier)
Expand All @@ -100,10 +100,11 @@ export async function createPoolTxWorker<T extends EstimationType>(
const sentJob = await sentTxQueue.add(
txHash,
{
poolJobId: job.id as string,
root: rootAfter,
outCommit,
commitIndex,
prefixedMemo,
truncatedMemo,
nullifier,
txConfig,
txPayload: tx,
Expand Down
Loading