From 3b3fbe27558dc0f58f5cc0915f791db4d213a6c0 Mon Sep 17 00:00:00 2001 From: Nguyen Minh Tuan Date: Fri, 16 Aug 2024 12:55:47 +0700 Subject: [PATCH] fix(rabbitmq): scheduler declare exchange (#28) and simplify scheduler --- scheduler/src/auto-listener.ts | 17 ------- .../if-job-is-doing-too-long.ts | 35 +++++++------- .../it's-time-to-run-job-v2.ts | 6 +-- .../background-tasks/it's-time-to-run-job.ts | 47 ++++++++++--------- scheduler/src/bus.ts | 3 -- scheduler/src/cfg.ts | 9 ---- scheduler/src/index.ts | 1 - scheduler/src/listeners/on-new-job-v1.ts | 22 --------- 8 files changed, 47 insertions(+), 93 deletions(-) delete mode 100644 scheduler/src/auto-listener.ts delete mode 100644 scheduler/src/bus.ts delete mode 100644 scheduler/src/listeners/on-new-job-v1.ts diff --git a/scheduler/src/auto-listener.ts b/scheduler/src/auto-listener.ts deleted file mode 100644 index 3b891d7..0000000 --- a/scheduler/src/auto-listener.ts +++ /dev/null @@ -1,17 +0,0 @@ -/* eslint-disable @typescript-eslint/no-var-requires */ -import fs from "fs"; -import path from "path"; -import { toJson } from "./utils"; -import logger from "./loggers/logger"; - -export const setup = (dir: string) => { - const filepaths = fs.readdirSync(dir).filter(x => x.endsWith(".js")); - const loadedPaths = []; - for (const filepath of filepaths) { - const relativeFilepath = `${dir}/${filepath}`; - require(path.resolve(relativeFilepath)).setup(); - loadedPaths.push(relativeFilepath); - } - logger.info(`Loaded listeners: ${toJson(loadedPaths)}`); - return loadedPaths; -}; diff --git a/scheduler/src/background-tasks/if-job-is-doing-too-long.ts b/scheduler/src/background-tasks/if-job-is-doing-too-long.ts index b3f20e9..94e4057 100644 --- a/scheduler/src/background-tasks/if-job-is-doing-too-long.ts +++ b/scheduler/src/background-tasks/if-job-is-doing-too-long.ts @@ -1,20 +1,23 @@ import ms from "ms"; -import { bus } from "../bus"; -import { cfg, CollectionName, JobStatus, AppEvent } from "../cfg"; -import { mongoConnectionPool } from "../connections"; +import { cfg, CollectionName, JobStatus, ExchangeName } from "../cfg"; +import { mongoConnectionPool, rabbitmqConnectionPool } from "../connections"; import { DKHPTDJob } from "../entities"; import logger from "../loggers/logger"; -import { loop } from "../utils"; +import { loop, toBuffer } from "../utils"; -export const setup = () => loop.infinity(async () => { - const cursor = mongoConnectionPool.getClient().db(cfg.DATABASE_NAME).collection(CollectionName.DKHPTD).find({ - doingAt: { $lt: Date.now() - ms("1m") }, /* less than now - 1 minute then it's probaly dead or error */ - status: JobStatus.DOING, - }, { sort: { timeToStart: 1 } }); - while (await cursor.hasNext()) { - const entry = await cursor.next(); - const job = new DKHPTDJob(entry); - logger.info(`Found stale job ${job._id}`); - bus.emit(AppEvent.STALE_JOB, job._id); - } -}, ms("10s")); +export const setup = () => { + rabbitmqConnectionPool.getChannel().assertExchange(ExchangeName.MAYBE_STALE_JOB, "fanout"); + loop.infinity(async () => { + const cursor = mongoConnectionPool.getClient().db(cfg.DATABASE_NAME).collection(CollectionName.DKHPTD).find({ + doingAt: { $lt: Date.now() - ms("1m") }, /* less than now - 1 minute then it's probaly dead or error */ + status: JobStatus.DOING, + }, { sort: { timeToStart: 1 } }); + while (await cursor.hasNext()) { + const entry = await cursor.next(); + const job = new DKHPTDJob(entry); + const jobId = job._id; + logger.info(`Found stale job ${jobId}`); + rabbitmqConnectionPool.getChannel().publish(ExchangeName.MAYBE_STALE_JOB, "", toBuffer(jobId.toHexString())); + } + }, ms("10s")); +} diff --git a/scheduler/src/background-tasks/it's-time-to-run-job-v2.ts b/scheduler/src/background-tasks/it's-time-to-run-job-v2.ts index cc0172d..90e539c 100644 --- a/scheduler/src/background-tasks/it's-time-to-run-job-v2.ts +++ b/scheduler/src/background-tasks/it's-time-to-run-job-v2.ts @@ -1,7 +1,6 @@ import { ObjectId } from "mongodb"; import ms from "ms"; -import { bus } from "../bus"; -import { cfg, CollectionName, JobStatus, AppEvent } from "../cfg"; +import { cfg, CollectionName, JobStatus } from "../cfg"; import { mongoConnectionPool } from "../connections"; import { DKHPTDJobV2 } from "../entities"; import logger from "../loggers/logger"; @@ -15,7 +14,8 @@ export const setup = () => loop.infinity(async () => { while (await cursor.hasNext()) { const entry = await cursor.next(); const job = decryptJobV2(new DKHPTDJobV2(entry)); - bus.emit(AppEvent.NEW_JOB_V2, toJobV2WorkerMessage(job)); + logger.info("New job V2: " + job._id) + // TODO: send to execute await mongoConnectionPool.getClient() .db(cfg.DATABASE_NAME).collection(CollectionName.DKHPTDV2).updateOne({ _id: new ObjectId(job._id) }, { $set: { diff --git a/scheduler/src/background-tasks/it's-time-to-run-job.ts b/scheduler/src/background-tasks/it's-time-to-run-job.ts index 44304f0..ea19249 100644 --- a/scheduler/src/background-tasks/it's-time-to-run-job.ts +++ b/scheduler/src/background-tasks/it's-time-to-run-job.ts @@ -1,27 +1,30 @@ import { ObjectId } from "mongodb"; -import { bus } from "../bus"; -import { AppEvent, cfg, CollectionName, JobStatus } from "../cfg"; -import { mongoConnectionPool } from "../connections"; +import { cfg, CollectionName, JobStatus, QueueName } from "../cfg"; +import { mongoConnectionPool, rabbitmqConnectionPool } from "../connections"; import { DKHPTDJob } from "../entities"; import logger from "../loggers/logger"; import ms from "ms"; -import { loop, toJobWorkerMessage } from "../utils"; +import { loop, toBuffer, toJobWorkerMessage } from "../utils"; -export const setup = () => loop.infinity(async () => { - const cursor = mongoConnectionPool.getClient().db(cfg.DATABASE_NAME).collection(CollectionName.DKHPTD).find({ - timeToStart: { $lt: Date.now() }, /* less than now then it's time to run */ - status: JobStatus.READY, - }, { sort: { timeToStart: 1 } }); - while (await cursor.hasNext()) { - const entry = await cursor.next(); - const job = new DKHPTDJob(entry); - bus.emit(AppEvent.NEW_JOB, toJobWorkerMessage(job)); - await mongoConnectionPool.getClient() - .db(cfg.DATABASE_NAME).collection(CollectionName.DKHPTD).updateOne({ _id: new ObjectId(job._id) }, { - $set: { - status: JobStatus.DOING, - doingAt: Date.now(), - }, - }); - } -}, ms("10s")); +export const setup = () => { + rabbitmqConnectionPool.getChannel().assertQueue(QueueName.RUN_JOB, {}); + loop.infinity(async () => { + const cursor = mongoConnectionPool.getClient().db(cfg.DATABASE_NAME).collection(CollectionName.DKHPTD).find({ + timeToStart: { $lt: Date.now() }, /* less than now then it's time to run */ + status: JobStatus.READY, + }, { sort: { timeToStart: 1 } }); + while (await cursor.hasNext()) { + const entry = await cursor.next(); + const job = new DKHPTDJob(entry); + logger.info("New job: " + job._id) + rabbitmqConnectionPool.getChannel().sendToQueue(QueueName.RUN_JOB, toBuffer(toJobWorkerMessage(job))); + await mongoConnectionPool.getClient() + .db(cfg.DATABASE_NAME).collection(CollectionName.DKHPTD).updateOne({ _id: new ObjectId(job._id) }, { + $set: { + status: JobStatus.DOING, + doingAt: Date.now(), + }, + }); + } + }, ms("10s")); +} diff --git a/scheduler/src/bus.ts b/scheduler/src/bus.ts deleted file mode 100644 index b7a6f24..0000000 --- a/scheduler/src/bus.ts +++ /dev/null @@ -1,3 +0,0 @@ -import EventEmitter from "events"; - -export const bus = new EventEmitter(); \ No newline at end of file diff --git a/scheduler/src/cfg.ts b/scheduler/src/cfg.ts index 1177329..ad7b397 100644 --- a/scheduler/src/cfg.ts +++ b/scheduler/src/cfg.ts @@ -47,12 +47,3 @@ export const ExchangeName = { MAYBE_STALE_JOB_V1: "maybe-stale-job-v1", MAYBE_STALE_JOB_V2: "maybe-stale-job-v2", }; - -export const AppEvent = { - NEW_JOB: nextStr(), - STALE_JOB: nextStr(), - NEW_JOB_V1: nextStr(), - STALE_JOB_V1: nextStr(), - NEW_JOB_V2: nextStr(), - STALE_JOB_V2: nextStr(), -}; diff --git a/scheduler/src/index.ts b/scheduler/src/index.ts index 3e92158..ae8db70 100644 --- a/scheduler/src/index.ts +++ b/scheduler/src/index.ts @@ -24,7 +24,6 @@ async function main() { } rabbitmqConnectionPool.addChannel(channel); require("./auto-background-task").setup("./dist/background-tasks"); - require("./auto-listener").setup("./dist/listeners"); }); }); } diff --git a/scheduler/src/listeners/on-new-job-v1.ts b/scheduler/src/listeners/on-new-job-v1.ts deleted file mode 100644 index 0562788..0000000 --- a/scheduler/src/listeners/on-new-job-v1.ts +++ /dev/null @@ -1,22 +0,0 @@ -import crypto from "crypto"; - -import { bus } from "../bus"; -import { cfg, QueueName, AppEvent } from "../cfg"; -import { rabbitmqConnectionPool } from "../connections"; -import { c } from "../cypher"; -import logger from "../loggers/logger"; -import { toJson, toBuffer } from "../utils"; - -// DEPRECATED -export const setup = () => { - // rabbitmqConnectionPool.getChannel().assertQueue(QueueName.RUN_JOB_V1, {}); - // bus.on(AppEvent.NEW_JOB_V1, (job) => logger.info("New job V1: " + job.id)); - // bus.on(AppEvent.NEW_JOB_V1, (job) => { - // const iv = crypto.randomBytes(16).toString("hex"); - // rabbitmqConnectionPool.getChannel().sendToQueue(QueueName.RUN_JOB_V1, toBuffer(c(cfg.AMQP_ENCRYPTION_KEY).e(toJson(job), iv)), { - // headers: { - // iv: iv, - // } - // }); - // }); -};