Skip to content

Commit

Permalink
fix(rabbitmq): scheduler declare exchange (#28)
Browse files Browse the repository at this point in the history
and simplify scheduler
  • Loading branch information
tuana9a authored Aug 16, 2024
1 parent 23d2ba7 commit 3b3fbe2
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 93 deletions.
17 changes: 0 additions & 17 deletions scheduler/src/auto-listener.ts

This file was deleted.

35 changes: 19 additions & 16 deletions scheduler/src/background-tasks/if-job-is-doing-too-long.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
import ms from "ms";
import { bus } from "../bus";
import { cfg, CollectionName, JobStatus, AppEvent } from "../cfg";
import { mongoConnectionPool } from "../connections";
import { cfg, CollectionName, JobStatus, ExchangeName } from "../cfg";
import { mongoConnectionPool, rabbitmqConnectionPool } from "../connections";
import { DKHPTDJob } from "../entities";
import logger from "../loggers/logger";
import { loop } from "../utils";
import { loop, toBuffer } from "../utils";

export const setup = () => loop.infinity(async () => {
const cursor = mongoConnectionPool.getClient().db(cfg.DATABASE_NAME).collection(CollectionName.DKHPTD).find({
doingAt: { $lt: Date.now() - ms("1m") }, /* less than now - 1 minute then it's probaly dead or error */
status: JobStatus.DOING,
}, { sort: { timeToStart: 1 } });
while (await cursor.hasNext()) {
const entry = await cursor.next();
const job = new DKHPTDJob(entry);
logger.info(`Found stale job ${job._id}`);
bus.emit(AppEvent.STALE_JOB, job._id);
}
}, ms("10s"));
export const setup = () => {
rabbitmqConnectionPool.getChannel().assertExchange(ExchangeName.MAYBE_STALE_JOB, "fanout");
loop.infinity(async () => {
const cursor = mongoConnectionPool.getClient().db(cfg.DATABASE_NAME).collection(CollectionName.DKHPTD).find({
doingAt: { $lt: Date.now() - ms("1m") }, /* less than now - 1 minute then it's probaly dead or error */
status: JobStatus.DOING,
}, { sort: { timeToStart: 1 } });
while (await cursor.hasNext()) {
const entry = await cursor.next();
const job = new DKHPTDJob(entry);
const jobId = job._id;
logger.info(`Found stale job ${jobId}`);
rabbitmqConnectionPool.getChannel().publish(ExchangeName.MAYBE_STALE_JOB, "", toBuffer(jobId.toHexString()));
}
}, ms("10s"));
}
6 changes: 3 additions & 3 deletions scheduler/src/background-tasks/it's-time-to-run-job-v2.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { ObjectId } from "mongodb";
import ms from "ms";
import { bus } from "../bus";
import { cfg, CollectionName, JobStatus, AppEvent } from "../cfg";
import { cfg, CollectionName, JobStatus } from "../cfg";
import { mongoConnectionPool } from "../connections";
import { DKHPTDJobV2 } from "../entities";
import logger from "../loggers/logger";
Expand All @@ -15,7 +14,8 @@ export const setup = () => loop.infinity(async () => {
while (await cursor.hasNext()) {
const entry = await cursor.next();
const job = decryptJobV2(new DKHPTDJobV2(entry));
bus.emit(AppEvent.NEW_JOB_V2, toJobV2WorkerMessage(job));
logger.info("New job V2: " + job._id)
// TODO: send to execute
await mongoConnectionPool.getClient()
.db(cfg.DATABASE_NAME).collection(CollectionName.DKHPTDV2).updateOne({ _id: new ObjectId(job._id) }, {
$set: {
Expand Down
47 changes: 25 additions & 22 deletions scheduler/src/background-tasks/it's-time-to-run-job.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,30 @@
import { ObjectId } from "mongodb";
import { bus } from "../bus";
import { AppEvent, cfg, CollectionName, JobStatus } from "../cfg";
import { mongoConnectionPool } from "../connections";
import { cfg, CollectionName, JobStatus, QueueName } from "../cfg";
import { mongoConnectionPool, rabbitmqConnectionPool } from "../connections";
import { DKHPTDJob } from "../entities";
import logger from "../loggers/logger";
import ms from "ms";
import { loop, toJobWorkerMessage } from "../utils";
import { loop, toBuffer, toJobWorkerMessage } from "../utils";

export const setup = () => loop.infinity(async () => {
const cursor = mongoConnectionPool.getClient().db(cfg.DATABASE_NAME).collection(CollectionName.DKHPTD).find({
timeToStart: { $lt: Date.now() }, /* less than now then it's time to run */
status: JobStatus.READY,
}, { sort: { timeToStart: 1 } });
while (await cursor.hasNext()) {
const entry = await cursor.next();
const job = new DKHPTDJob(entry);
bus.emit(AppEvent.NEW_JOB, toJobWorkerMessage(job));
await mongoConnectionPool.getClient()
.db(cfg.DATABASE_NAME).collection(CollectionName.DKHPTD).updateOne({ _id: new ObjectId(job._id) }, {
$set: {
status: JobStatus.DOING,
doingAt: Date.now(),
},
});
}
}, ms("10s"));
export const setup = () => {
rabbitmqConnectionPool.getChannel().assertQueue(QueueName.RUN_JOB, {});
loop.infinity(async () => {
const cursor = mongoConnectionPool.getClient().db(cfg.DATABASE_NAME).collection(CollectionName.DKHPTD).find({
timeToStart: { $lt: Date.now() }, /* less than now then it's time to run */
status: JobStatus.READY,
}, { sort: { timeToStart: 1 } });
while (await cursor.hasNext()) {
const entry = await cursor.next();
const job = new DKHPTDJob(entry);
logger.info("New job: " + job._id)
rabbitmqConnectionPool.getChannel().sendToQueue(QueueName.RUN_JOB, toBuffer(toJobWorkerMessage(job)));
await mongoConnectionPool.getClient()
.db(cfg.DATABASE_NAME).collection(CollectionName.DKHPTD).updateOne({ _id: new ObjectId(job._id) }, {
$set: {
status: JobStatus.DOING,
doingAt: Date.now(),
},
});
}
}, ms("10s"));
}
3 changes: 0 additions & 3 deletions scheduler/src/bus.ts

This file was deleted.

9 changes: 0 additions & 9 deletions scheduler/src/cfg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,3 @@ export const ExchangeName = {
MAYBE_STALE_JOB_V1: "maybe-stale-job-v1",
MAYBE_STALE_JOB_V2: "maybe-stale-job-v2",
};

export const AppEvent = {
NEW_JOB: nextStr(),
STALE_JOB: nextStr(),
NEW_JOB_V1: nextStr(),
STALE_JOB_V1: nextStr(),
NEW_JOB_V2: nextStr(),
STALE_JOB_V2: nextStr(),
};
1 change: 0 additions & 1 deletion scheduler/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ async function main() {
}
rabbitmqConnectionPool.addChannel(channel);
require("./auto-background-task").setup("./dist/background-tasks");
require("./auto-listener").setup("./dist/listeners");
});
});
}
Expand Down
22 changes: 0 additions & 22 deletions scheduler/src/listeners/on-new-job-v1.ts

This file was deleted.

0 comments on commit 3b3fbe2

Please sign in to comment.