Skip to content

Commit

Permalink
refactor: Replace the usage of bullMQ with the hoarder sqlite-based q…
Browse files Browse the repository at this point in the history
…ueue (#309)
  • Loading branch information
MohamedBassem committed Jul 21, 2024
1 parent edbd98d commit 9edd154
Show file tree
Hide file tree
Showing 13 changed files with 128 additions and 344 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ The demo is seeded with some content, but it's in read-only mode to prevent abus
- [tRPC](https://trpc.io) for client->server communication.
- [Puppeteer](https://pptr.dev/) for crawling the bookmarks.
- [OpenAI](https://openai.com/) because AI is so hot right now.
- [BullMQ](https://bullmq.io) for scheduling the background jobs.
- [Meilisearch](https://meilisearch.com) for the full content search.

## Why did I build it?
Expand Down
6 changes: 3 additions & 3 deletions apps/web/components/dashboard/admin/ServerStats.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -106,19 +106,19 @@ export default function ServerStats() {
<TableBody>
<TableRow>
<TableCell className="lg:w-2/3">Crawling Jobs</TableCell>
<TableCell>{serverStats.crawlStats.queuedInRedis}</TableCell>
<TableCell>{serverStats.crawlStats.queued}</TableCell>
<TableCell>{serverStats.crawlStats.pending}</TableCell>
<TableCell>{serverStats.crawlStats.failed}</TableCell>
</TableRow>
<TableRow>
<TableCell>Indexing Jobs</TableCell>
<TableCell>{serverStats.indexingStats.queuedInRedis}</TableCell>
<TableCell>{serverStats.indexingStats.queued}</TableCell>
<TableCell>-</TableCell>
<TableCell>-</TableCell>
</TableRow>
<TableRow>
<TableCell>Inference Jobs</TableCell>
<TableCell>{serverStats.inferenceStats.queuedInRedis}</TableCell>
<TableCell>{serverStats.inferenceStats.queued}</TableCell>
<TableCell>{serverStats.inferenceStats.pending}</TableCell>
<TableCell>{serverStats.inferenceStats.failed}</TableCell>
</TableRow>
Expand Down
60 changes: 29 additions & 31 deletions apps/workers/crawlerWorker.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import assert from "assert";
import * as dns from "dns";
import * as path from "node:path";
import type { Job } from "bullmq";
import type { Browser } from "puppeteer";
import { Readability } from "@mozilla/readability";
import { Mutex } from "async-mutex";
import { Worker } from "bullmq";
import DOMPurify from "dompurify";
import { eq } from "drizzle-orm";
import { execa } from "execa";
Expand Down Expand Up @@ -34,6 +32,7 @@ import {
bookmarkLinks,
bookmarks,
} from "@hoarder/db/schema";
import { DequeuedJob, Runner } from "@hoarder/queue";
import {
ASSET_TYPES,
deleteAsset,
Expand All @@ -48,7 +47,6 @@ import logger from "@hoarder/shared/logger";
import {
LinkCrawlerQueue,
OpenAIQueue,
queueConnectionDetails,
triggerSearchReindex,
zCrawlLinkRequestSchema,
} from "@hoarder/shared/queues";
Expand Down Expand Up @@ -153,37 +151,37 @@ export class CrawlerWorker {
}

logger.info("Starting crawler worker ...");
const worker = new Worker<ZCrawlLinkRequest, void>(
LinkCrawlerQueue.name,
withTimeout(
runCrawler,
/* timeoutSec */ serverConfig.crawler.jobTimeoutSec,
),
const worker = new Runner<ZCrawlLinkRequest>(
LinkCrawlerQueue,
{
run: withTimeout(
runCrawler,
/* timeoutSec */ serverConfig.crawler.jobTimeoutSec,
),
onComplete: async (job) => {
const jobId = job?.id ?? "unknown";
logger.info(`[Crawler][${jobId}] Completed successfully`);
const bookmarkId = job?.data.bookmarkId;
if (bookmarkId) {
await changeBookmarkStatus(bookmarkId, "success");
}
},
onError: async (job) => {
const jobId = job?.id ?? "unknown";
logger.error(`[Crawler][${jobId}] Crawling job failed: ${job.error}`);
const bookmarkId = job.data?.bookmarkId;
if (bookmarkId) {
await changeBookmarkStatus(bookmarkId, "failure");
}
},
},
{
pollIntervalMs: 1000,
timeoutSecs: serverConfig.crawler.jobTimeoutSec,
concurrency: serverConfig.crawler.numWorkers,
connection: queueConnectionDetails,
autorun: false,
},
);

worker.on("completed", (job) => {
const jobId = job?.id ?? "unknown";
logger.info(`[Crawler][${jobId}] Completed successfully`);
const bookmarkId = job?.data.bookmarkId;
if (bookmarkId) {
changeBookmarkStatus(bookmarkId, "success");
}
});

worker.on("failed", (job, error) => {
const jobId = job?.id ?? "unknown";
logger.error(`[Crawler][${jobId}] Crawling job failed: ${error}`);
const bookmarkId = job?.data.bookmarkId;
if (bookmarkId) {
changeBookmarkStatus(bookmarkId, "failure");
}
});

return worker;
}
}
Expand Down Expand Up @@ -600,7 +598,7 @@ async function crawlAndParseUrl(
};
}

async function runCrawler(job: Job<ZCrawlLinkRequest, void>) {
async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
const jobId = job.id ?? "unknown";

const request = zCrawlLinkRequestSchema.safeParse(job.data);
Expand Down Expand Up @@ -655,7 +653,7 @@ async function runCrawler(job: Job<ZCrawlLinkRequest, void>) {

// Enqueue openai job (if not set, assume it's true for backward compatibility)
if (job.data.runInference !== false) {
OpenAIQueue.add("openai", {
OpenAIQueue.enqueue({
bookmarkId,
});
}
Expand Down
3 changes: 3 additions & 0 deletions apps/workers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import "dotenv/config";

import serverConfig from "@hoarder/shared/config";
import logger from "@hoarder/shared/logger";
import { runQueueDBMigrations } from "@hoarder/shared/queues";

import { CrawlerWorker } from "./crawlerWorker";
import { shutdownPromise } from "./exit";
Expand All @@ -10,6 +11,8 @@ import { SearchIndexingWorker } from "./searchWorker";

async function main() {
logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`);
runQueueDBMigrations();

const [crawler, openai, search] = [
await CrawlerWorker.build(),
OpenAiWorker.build(),
Expand Down
43 changes: 22 additions & 21 deletions apps/workers/openaiWorker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import type { Job } from "bullmq";
import { Worker } from "bullmq";
import { and, Column, eq, inArray, sql } from "drizzle-orm";
import { z } from "zod";

Expand All @@ -11,12 +9,12 @@ import {
bookmarkTags,
tagsOnBookmarks,
} from "@hoarder/db/schema";
import { DequeuedJob, Runner } from "@hoarder/queue";
import { readAsset } from "@hoarder/shared/assetdb";
import serverConfig from "@hoarder/shared/config";
import logger from "@hoarder/shared/logger";
import {
OpenAIQueue,
queueConnectionDetails,
triggerSearchReindex,
zOpenAIRequestSchema,
} from "@hoarder/shared/queues";
Expand Down Expand Up @@ -63,27 +61,30 @@ async function attemptMarkTaggingStatus(
export class OpenAiWorker {
static build() {
logger.info("Starting inference worker ...");
const worker = new Worker<ZOpenAIRequest, void>(
OpenAIQueue.name,
runOpenAI,
const worker = new Runner<ZOpenAIRequest>(
OpenAIQueue,
{
connection: queueConnectionDetails,
autorun: false,
run: runOpenAI,
onComplete: async (job) => {
const jobId = job?.id ?? "unknown";
logger.info(`[inference][${jobId}] Completed successfully`);
await attemptMarkTaggingStatus(job?.data, "success");
},
onError: async (job) => {
const jobId = job?.id ?? "unknown";
logger.error(
`[inference][${jobId}] inference job failed: ${job.error}`,
);
await attemptMarkTaggingStatus(job?.data, "failure");
},
},
{
concurrency: 1,
pollIntervalMs: 1000,
timeoutSecs: 30,
},
);

worker.on("completed", (job) => {
const jobId = job?.id ?? "unknown";
logger.info(`[inference][${jobId}] Completed successfully`);
attemptMarkTaggingStatus(job?.data, "success");
});

worker.on("failed", (job, error) => {
const jobId = job?.id ?? "unknown";
logger.error(`[inference][${jobId}] inference job failed: ${error}`);
attemptMarkTaggingStatus(job?.data, "failure");
});

return worker;
}
}
Expand Down Expand Up @@ -361,7 +362,7 @@ async function connectTags(
});
}

async function runOpenAI(job: Job<ZOpenAIRequest, void>) {
async function runOpenAI(job: DequeuedJob<ZOpenAIRequest>) {
const jobId = job.id ?? "unknown";

const inferenceClient = InferenceClientFactory.build();
Expand Down
2 changes: 1 addition & 1 deletion apps/workers/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
"@hoarder/db": "workspace:^0.1.0",
"@hoarder/shared": "workspace:^0.1.0",
"@hoarder/tsconfig": "workspace:^0.1.0",
"@hoarder/queue": "workspace:^0.1.0",
"@mozilla/readability": "^0.5.0",
"@tsconfig/node21": "^21.0.1",
"async-mutex": "^0.4.1",
"bullmq": "^5.1.9",
"dompurify": "^3.0.9",
"dotenv": "^16.4.1",
"drizzle-orm": "^0.29.4",
Expand Down
39 changes: 20 additions & 19 deletions apps/workers/searchWorker.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import type { Job } from "bullmq";
import { Worker } from "bullmq";
import { eq } from "drizzle-orm";

import type { ZSearchIndexingRequest } from "@hoarder/shared/queues";
import { db } from "@hoarder/db";
import { bookmarks } from "@hoarder/db/schema";
import { DequeuedJob, Runner } from "@hoarder/queue";
import logger from "@hoarder/shared/logger";
import {
queueConnectionDetails,
SearchIndexingQueue,
zSearchIndexingRequestSchema,
} from "@hoarder/shared/queues";
Expand All @@ -16,25 +14,28 @@ import { getSearchIdxClient } from "@hoarder/shared/search";
export class SearchIndexingWorker {
static build() {
logger.info("Starting search indexing worker ...");
const worker = new Worker<ZSearchIndexingRequest, void>(
SearchIndexingQueue.name,
runSearchIndexing,
const worker = new Runner<ZSearchIndexingRequest>(
SearchIndexingQueue,
{
connection: queueConnectionDetails,
autorun: false,
run: runSearchIndexing,
onComplete: (job) => {
const jobId = job?.id ?? "unknown";
logger.info(`[search][${jobId}] Completed successfully`);
return Promise.resolve();
},
onError: (job) => {
const jobId = job?.id ?? "unknown";
logger.error(`[search][${jobId}] search job failed: ${job.error}`);
return Promise.resolve();
},
},
{
concurrency: 1,
pollIntervalMs: 1000,
timeoutSecs: 30,
},
);

worker.on("completed", (job) => {
const jobId = job?.id ?? "unknown";
logger.info(`[search][${jobId}] Completed successfully`);
});

worker.on("failed", (job, error) => {
const jobId = job?.id ?? "unknown";
logger.error(`[search][${jobId}] search job failed: ${error}`);
});

return worker;
}
}
Expand Down Expand Up @@ -112,7 +113,7 @@ async function runDelete(
await ensureTaskSuccess(searchClient, task.taskUid);
}

async function runSearchIndexing(job: Job<ZSearchIndexingRequest, void>) {
async function runSearchIndexing(job: DequeuedJob<ZSearchIndexingRequest>) {
const jobId = job.id ?? "unknown";

const request = zSearchIndexingRequestSchema.safeParse(job.data);
Expand Down
10 changes: 0 additions & 10 deletions packages/shared/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ const allEnv = z.object({
OLLAMA_BASE_URL: z.string().url().optional(),
INFERENCE_TEXT_MODEL: z.string().default("gpt-3.5-turbo-0125"),
INFERENCE_IMAGE_MODEL: z.string().default("gpt-4o-2024-05-13"),
REDIS_HOST: z.string().default("localhost"),
REDIS_PORT: z.coerce.number().default(6379),
REDIS_DB_IDX: z.coerce.number().optional(),
REDIS_PASSWORD: z.string().optional(),
CRAWLER_HEADLESS_BROWSER: stringBool("true"),
BROWSER_WEB_URL: z.string().url().optional(),
BROWSER_WEBSOCKET_URL: z.string().url().optional(),
Expand Down Expand Up @@ -58,12 +54,6 @@ const serverConfigSchema = allEnv.transform((val) => {
imageModel: val.INFERENCE_IMAGE_MODEL,
inferredTagLang: val.INFERENCE_LANG,
},
bullMQ: {
redisHost: val.REDIS_HOST,
redisPort: val.REDIS_PORT,
redisDBIdx: val.REDIS_DB_IDX,
redisPassword: val.REDIS_PASSWORD,
},
crawler: {
numWorkers: val.CRAWLER_NUM_WORKERS,
headlessBrowser: val.CRAWLER_HEADLESS_BROWSER,
Expand Down
2 changes: 1 addition & 1 deletion packages/shared/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"private": true,
"type": "module",
"dependencies": {
"bullmq": "^5.1.9",
"@hoarder/queue": "workspace:^0.1.0",
"meilisearch": "^0.37.0",
"winston": "^3.11.0",
"zod": "^3.22.4"
Expand Down
Loading

0 comments on commit 9edd154

Please sign in to comment.