diff --git a/yarn.lock b/yarn.lock index 78d9a330..2b5acd1b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1259,10 +1259,10 @@ builtin-status-codes@^3.0.0: resolved "https://registry.yarnpkg.com/builtin-status-codes/-/builtin-status-codes-3.0.0.tgz#85982878e21b98e1c66425e03d0174788f569ee8" integrity sha1-hZgoeOIbmOHGZCXgPQF0eI9Wnug= -bullmq@3.2.0: - version "3.2.0" - resolved "https://registry.yarnpkg.com/bullmq/-/bullmq-3.2.0.tgz#5e58bc574c0e51963ba3bc55ccb165c6564b7dad" - integrity sha512-jR0xM6NGdY/2d3GDVdNRgfXGxWZdE3BCmmUMA29oO+Z2stluPUgjlidIvwUzZ4hDKAG56NLr8+MiF3NoWxe+OA== +bullmq@3.5.0: + version "3.5.0" + resolved "https://registry.yarnpkg.com/bullmq/-/bullmq-3.5.0.tgz#d757546c1d5f055267f73dcab306d06caf2efa7d" + integrity sha512-wdL6JJ78WULKeA2uYlq47YuR2f+4MArl37eKhe5JwNg08fCYQql1x7YqlupJBP6b2Dqkh21IPg1yZULCl9k3xw== dependencies: cron-parser "^4.6.0" glob "^8.0.3" diff --git a/zp-relayer/endpoints.ts b/zp-relayer/endpoints.ts index 76836fe3..b0b855b9 100644 --- a/zp-relayer/endpoints.ts +++ b/zp-relayer/endpoints.ts @@ -12,6 +12,7 @@ import { checkSendTransactionsErrors, } from './validation/validation' import { sentTxQueue, SentTxState } from './queue/sentTxQueue' +import type { Queue } from 'bullmq' async function sendTransactions(req: Request, res: Response, next: NextFunction) { const errors = checkSendTransactionsErrors(req.body) @@ -129,9 +130,23 @@ async function getJob(req: Request, res: Response) { const jobId = req.params.id async function getPoolJobState(requestedJobId: string): Promise { + const INCONSISTENCY_ERR = 'Internal job inconsistency' + + // Should be used in places where job is expected to exist + const safeGetJob = async (queue: Queue, id: string) => { + const job = await queue.getJob(id) + if (!job) { + throw new Error(INCONSISTENCY_ERR) + } + return job + } + const jobId = await pool.state.jobIdsMapping.get(requestedJobId) - let job = await poolTxQueue.getJob(jobId) - if (!job) return null + + const poolJobState = await poolTxQueue.getJobState(jobId) + if (poolJobState === 'unknown') return null + + const job = await safeGetJob(poolTxQueue, jobId) // Default result object let result: GetJobResponse = { @@ -143,31 +158,27 @@ async function getJob(req: Request, res: Response) { txHash: null, } - const poolJobState = await job.getState() if (poolJobState === 'completed') { // Transaction was included in optimistic state, waiting to be mined - if (job.returnvalue === null) { - job = await poolTxQueue.getJob(jobId) - // Sanity check - if (!job || job.returnvalue === null) throw new Error('Internal job inconsistency') - } + + // Sanity check + if (job.returnvalue === null) throw new Error(INCONSISTENCY_ERR) const sentJobId = job.returnvalue[0][1] - let sentJob = await sentTxQueue.getJob(sentJobId) + + const sentJobState = await sentTxQueue.getJobState(sentJobId) // Should not happen here, but need to verify to be sure - if (!sentJob) throw new Error('Sent job not found') + if (sentJobState === 'unknown') throw new Error('Sent job not found') - const sentJobState = await sentJob.getState() + const sentJob = await safeGetJob(sentTxQueue, sentJobId) if (sentJobState === 'waiting' || sentJobState === 'active' || sentJobState === 'delayed') { // Transaction is in re-send loop const txHash = sentJob.data.prevAttempts.at(-1)?.[0] result.state = JobStatus.SENT result.txHash = txHash || null } else if (sentJobState === 'completed') { - if (sentJob.returnvalue === null) { - sentJob = await sentTxQueue.getJob(sentJobId) - // Sanity check - if (!sentJob || sentJob.returnvalue === null) throw new Error('Internal job inconsistency') - } + // Sanity check + if (sentJob.returnvalue === null) throw new Error(INCONSISTENCY_ERR) + const [txState, txHash] = sentJob.returnvalue if (txState === SentTxState.MINED) { // Transaction mined successfully @@ -182,12 +193,11 @@ async function getJob(req: Request, res: Response) { } } } else if (poolJobState === 'failed') { - // Either validation or tx sendind failed - if (!job.finishedOn) { - job = await poolTxQueue.getJob(jobId) - // Sanity check - if (!job || !job.finishedOn) throw new Error('Internal job inconsistency') - } + // Either validation or tx sending failed + + // Sanity check + if (!job.finishedOn) throw new Error(INCONSISTENCY_ERR) + result.state = JobStatus.FAILED result.failedReason = job.failedReason result.finishedOn = job.finishedOn || null diff --git a/zp-relayer/package.json b/zp-relayer/package.json index d3a5a1f5..e6b77302 100644 --- a/zp-relayer/package.json +++ b/zp-relayer/package.json @@ -19,7 +19,7 @@ "ajv": "8.11.0", "async-mutex": "^0.3.2", "bignumber.js": "9.1.0", - "bullmq": "3.2.0", + "bullmq": "3.5.0", "cors": "^2.8.5", "dotenv": "^10.0.0", "express": "^4.17.1",