Skip to content

Commit

Permalink
Remove fallback job fetching from /job endpoint (#114)
Browse files Browse the repository at this point in the history
  • Loading branch information
Leonid Tyurin authored Dec 21, 2022
1 parent d57b21e commit 48a6779
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 27 deletions.
8 changes: 4 additions & 4 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1259,10 +1259,10 @@ builtin-status-codes@^3.0.0:
resolved "https://registry.yarnpkg.com/builtin-status-codes/-/builtin-status-codes-3.0.0.tgz#85982878e21b98e1c66425e03d0174788f569ee8"
integrity sha1-hZgoeOIbmOHGZCXgPQF0eI9Wnug=

bullmq@3.2.0:
version "3.2.0"
resolved "https://registry.yarnpkg.com/bullmq/-/bullmq-3.2.0.tgz#5e58bc574c0e51963ba3bc55ccb165c6564b7dad"
integrity sha512-jR0xM6NGdY/2d3GDVdNRgfXGxWZdE3BCmmUMA29oO+Z2stluPUgjlidIvwUzZ4hDKAG56NLr8+MiF3NoWxe+OA==
bullmq@3.5.0:
version "3.5.0"
resolved "https://registry.yarnpkg.com/bullmq/-/bullmq-3.5.0.tgz#d757546c1d5f055267f73dcab306d06caf2efa7d"
integrity sha512-wdL6JJ78WULKeA2uYlq47YuR2f+4MArl37eKhe5JwNg08fCYQql1x7YqlupJBP6b2Dqkh21IPg1yZULCl9k3xw==
dependencies:
cron-parser "^4.6.0"
glob "^8.0.3"
Expand Down
54 changes: 32 additions & 22 deletions zp-relayer/endpoints.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
checkSendTransactionsErrors,
} from './validation/validation'
import { sentTxQueue, SentTxState } from './queue/sentTxQueue'
import type { Queue } from 'bullmq'

async function sendTransactions(req: Request, res: Response, next: NextFunction) {
const errors = checkSendTransactionsErrors(req.body)
Expand Down Expand Up @@ -129,9 +130,23 @@ async function getJob(req: Request, res: Response) {
const jobId = req.params.id

async function getPoolJobState(requestedJobId: string): Promise<GetJobResponse | null> {
const INCONSISTENCY_ERR = 'Internal job inconsistency'

// Should be used in places where job is expected to exist
const safeGetJob = async (queue: Queue, id: string) => {
const job = await queue.getJob(id)
if (!job) {
throw new Error(INCONSISTENCY_ERR)
}
return job
}

const jobId = await pool.state.jobIdsMapping.get(requestedJobId)
let job = await poolTxQueue.getJob(jobId)
if (!job) return null

const poolJobState = await poolTxQueue.getJobState(jobId)
if (poolJobState === 'unknown') return null

const job = await safeGetJob(poolTxQueue, jobId)

// Default result object
let result: GetJobResponse = {
Expand All @@ -143,31 +158,27 @@ async function getJob(req: Request, res: Response) {
txHash: null,
}

const poolJobState = await job.getState()
if (poolJobState === 'completed') {
// Transaction was included in optimistic state, waiting to be mined
if (job.returnvalue === null) {
job = await poolTxQueue.getJob(jobId)
// Sanity check
if (!job || job.returnvalue === null) throw new Error('Internal job inconsistency')
}

// Sanity check
if (job.returnvalue === null) throw new Error(INCONSISTENCY_ERR)
const sentJobId = job.returnvalue[0][1]
let sentJob = await sentTxQueue.getJob(sentJobId)

const sentJobState = await sentTxQueue.getJobState(sentJobId)
// Should not happen here, but need to verify to be sure
if (!sentJob) throw new Error('Sent job not found')
if (sentJobState === 'unknown') throw new Error('Sent job not found')

const sentJobState = await sentJob.getState()
const sentJob = await safeGetJob(sentTxQueue, sentJobId)
if (sentJobState === 'waiting' || sentJobState === 'active' || sentJobState === 'delayed') {
// Transaction is in re-send loop
const txHash = sentJob.data.prevAttempts.at(-1)?.[0]
result.state = JobStatus.SENT
result.txHash = txHash || null
} else if (sentJobState === 'completed') {
if (sentJob.returnvalue === null) {
sentJob = await sentTxQueue.getJob(sentJobId)
// Sanity check
if (!sentJob || sentJob.returnvalue === null) throw new Error('Internal job inconsistency')
}
// Sanity check
if (sentJob.returnvalue === null) throw new Error(INCONSISTENCY_ERR)

const [txState, txHash] = sentJob.returnvalue
if (txState === SentTxState.MINED) {
// Transaction mined successfully
Expand All @@ -182,12 +193,11 @@ async function getJob(req: Request, res: Response) {
}
}
} else if (poolJobState === 'failed') {
// Either validation or tx sendind failed
if (!job.finishedOn) {
job = await poolTxQueue.getJob(jobId)
// Sanity check
if (!job || !job.finishedOn) throw new Error('Internal job inconsistency')
}
// Either validation or tx sending failed

// Sanity check
if (!job.finishedOn) throw new Error(INCONSISTENCY_ERR)

result.state = JobStatus.FAILED
result.failedReason = job.failedReason
result.finishedOn = job.finishedOn || null
Expand Down
2 changes: 1 addition & 1 deletion zp-relayer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"ajv": "8.11.0",
"async-mutex": "^0.3.2",
"bignumber.js": "9.1.0",
"bullmq": "3.2.0",
"bullmq": "3.5.0",
"cors": "^2.8.5",
"dotenv": "^10.0.0",
"express": "^4.17.1",
Expand Down

0 comments on commit 48a6779

Please sign in to comment.