Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redisForJobQueue の接続を使い回す #268

Merged
merged 1 commit into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions packages/backend/src/GlobalModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,22 @@ const $redisForTimelines: Provider = {
inject: [DI.config],
};

const $redisForJobQueue: Provider = {
provide: DI.redisForJobQueue,
useFactory: (config: Config) => {
return new Redis.Redis({
...config.redisForJobQueue,
keyPrefix: undefined,
});
},
inject: [DI.config],
};

@Global()
@Module({
imports: [RepositoryModule],
providers: [$config, $db, $meilisearch, $redis, $redisForPub, $redisForSub, $redisForTimelines],
exports: [$config, $db, $meilisearch, $redis, $redisForPub, $redisForSub, $redisForTimelines, RepositoryModule],
providers: [$config, $db, $meilisearch, $redis, $redisForPub, $redisForSub, $redisForTimelines, $redisForJobQueue],
exports: [$config, $db, $meilisearch, $redis, $redisForPub, $redisForSub, $redisForTimelines, $redisForJobQueue, RepositoryModule],
})
export class GlobalModule implements OnApplicationShutdown {
constructor(
Expand All @@ -92,6 +103,7 @@ export class GlobalModule implements OnApplicationShutdown {
@Inject(DI.redisForPub) private redisForPub: Redis.Redis,
@Inject(DI.redisForSub) private redisForSub: Redis.Redis,
@Inject(DI.redisForTimelines) private redisForTimelines: Redis.Redis,
@Inject(DI.redisForJobQueue) private redisForJobQueue: Redis.Redis,
) {}

public async dispose(): Promise<void> {
Expand All @@ -109,6 +121,7 @@ export class GlobalModule implements OnApplicationShutdown {
this.redisForPub.disconnect(),
this.redisForSub.disconnect(),
this.redisForTimelines.disconnect(),
this.redisForJobQueue.disconnect(),
]);
}

Expand Down
33 changes: 17 additions & 16 deletions packages/backend/src/core/QueueModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import { setTimeout } from 'node:timers/promises';
import { Inject, Module, OnApplicationShutdown } from '@nestjs/common';
import * as Bull from 'bullmq';
import * as Redis from 'ioredis';
import { DI } from '@/di-symbols.js';
import type { Config } from '@/config.js';
import { QUEUE, baseQueueOptions } from '@/queue/const.js';
Expand All @@ -23,50 +24,50 @@ export type WebhookDeliverQueue = Bull.Queue<WebhookDeliverJobData>;

const $system: Provider = {
provide: 'queue:system',
useFactory: (config: Config) => new Bull.Queue(QUEUE.SYSTEM, baseQueueOptions(config, QUEUE.SYSTEM)),
inject: [DI.config],
useFactory: (config: Config, redisForJobQueue: Redis.Redis) => new Bull.Queue(QUEUE.SYSTEM, baseQueueOptions(config, QUEUE.SYSTEM, redisForJobQueue)),
inject: [DI.config, DI.redisForJobQueue],
};

const $endedPollNotification: Provider = {
provide: 'queue:endedPollNotification',
useFactory: (config: Config) => new Bull.Queue(QUEUE.ENDED_POLL_NOTIFICATION, baseQueueOptions(config, QUEUE.ENDED_POLL_NOTIFICATION)),
inject: [DI.config],
useFactory: (config: Config, redisForJobQueue: Redis.Redis) => new Bull.Queue(QUEUE.ENDED_POLL_NOTIFICATION, baseQueueOptions(config, QUEUE.ENDED_POLL_NOTIFICATION, redisForJobQueue)),
inject: [DI.config, DI.redisForJobQueue],
};

const $deliver: Provider = {
provide: 'queue:deliver',
useFactory: (config: Config) => new Bull.Queue(QUEUE.DELIVER, baseQueueOptions(config, QUEUE.DELIVER)),
inject: [DI.config],
useFactory: (config: Config, redisForJobQueue: Redis.Redis) => new Bull.Queue(QUEUE.DELIVER, baseQueueOptions(config, QUEUE.DELIVER, redisForJobQueue)),
inject: [DI.config, DI.redisForJobQueue],
};

const $inbox: Provider = {
provide: 'queue:inbox',
useFactory: (config: Config) => new Bull.Queue(QUEUE.INBOX, baseQueueOptions(config, QUEUE.INBOX)),
inject: [DI.config],
useFactory: (config: Config, redisForJobQueue: Redis.Redis) => new Bull.Queue(QUEUE.INBOX, baseQueueOptions(config, QUEUE.INBOX, redisForJobQueue)),
inject: [DI.config, DI.redisForJobQueue],
};

const $db: Provider = {
provide: 'queue:db',
useFactory: (config: Config) => new Bull.Queue(QUEUE.DB, baseQueueOptions(config, QUEUE.DB)),
inject: [DI.config],
useFactory: (config: Config, redisForJobQueue: Redis.Redis) => new Bull.Queue(QUEUE.DB, baseQueueOptions(config, QUEUE.DB, redisForJobQueue)),
inject: [DI.config, DI.redisForJobQueue],
};

const $relationship: Provider = {
provide: 'queue:relationship',
useFactory: (config: Config) => new Bull.Queue(QUEUE.RELATIONSHIP, baseQueueOptions(config, QUEUE.RELATIONSHIP)),
inject: [DI.config],
useFactory: (config: Config, redisForJobQueue: Redis.Redis) => new Bull.Queue(QUEUE.RELATIONSHIP, baseQueueOptions(config, QUEUE.RELATIONSHIP, redisForJobQueue)),
inject: [DI.config, DI.redisForJobQueue],
};

const $objectStorage: Provider = {
provide: 'queue:objectStorage',
useFactory: (config: Config) => new Bull.Queue(QUEUE.OBJECT_STORAGE, baseQueueOptions(config, QUEUE.OBJECT_STORAGE)),
inject: [DI.config],
useFactory: (config: Config, redisForJobQueue: Redis.Redis) => new Bull.Queue(QUEUE.OBJECT_STORAGE, baseQueueOptions(config, QUEUE.OBJECT_STORAGE, redisForJobQueue)),
inject: [DI.config, DI.redisForJobQueue],
};

const $webhookDeliver: Provider = {
provide: 'queue:webhookDeliver',
useFactory: (config: Config) => new Bull.Queue(QUEUE.WEBHOOK_DELIVER, baseQueueOptions(config, QUEUE.WEBHOOK_DELIVER)),
inject: [DI.config],
useFactory: (config: Config, redisForJobQueue: Redis.Redis) => new Bull.Queue(QUEUE.WEBHOOK_DELIVER, baseQueueOptions(config, QUEUE.WEBHOOK_DELIVER, redisForJobQueue)),
inject: [DI.config, DI.redisForJobQueue],
};

@Module({
Expand Down
8 changes: 6 additions & 2 deletions packages/backend/src/daemons/QueueStatsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import { Inject, Injectable } from '@nestjs/common';
import Xev from 'xev';
import * as Bull from 'bullmq';
import * as Redis from 'ioredis';
import { QueueService } from '@/core/QueueService.js';
import { bindThis } from '@/decorators.js';
import { DI } from '@/di-symbols.js';
Expand All @@ -25,6 +26,9 @@ export class QueueStatsService implements OnApplicationShutdown {
@Inject(DI.config)
private config: Config,

@Inject(DI.redisForJobQueue)
private redisForJobQueue: Redis.Redis,

private queueService: QueueService,
) {
}
Expand All @@ -43,8 +47,8 @@ export class QueueStatsService implements OnApplicationShutdown {
let activeDeliverJobs = 0;
let activeInboxJobs = 0;

const deliverQueueEvents = new Bull.QueueEvents(QUEUE.DELIVER, baseQueueOptions(this.config, QUEUE.DELIVER));
const inboxQueueEvents = new Bull.QueueEvents(QUEUE.INBOX, baseQueueOptions(this.config, QUEUE.INBOX));
const deliverQueueEvents = new Bull.QueueEvents(QUEUE.DELIVER, baseQueueOptions(this.config, QUEUE.DELIVER, this.redisForJobQueue));
const inboxQueueEvents = new Bull.QueueEvents(QUEUE.INBOX, baseQueueOptions(this.config, QUEUE.INBOX, this.redisForJobQueue));

deliverQueueEvents.on('active', () => {
activeDeliverJobs++;
Expand Down
1 change: 1 addition & 0 deletions packages/backend/src/di-symbols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export const DI = {
redisForPub: Symbol('redisForPub'),
redisForSub: Symbol('redisForSub'),
redisForTimelines: Symbol('redisForTimelines'),
redisForJobQueue: Symbol('redisForJobQueue'),

//#region Repositories
usersRepository: Symbol('usersRepository'),
Expand Down
20 changes: 12 additions & 8 deletions packages/backend/src/queue/QueueProcessorService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
import * as Bull from 'bullmq';
import * as Redis from 'ioredis';
import type { Config } from '@/config.js';
import { DI } from '@/di-symbols.js';
import type Logger from '@/logger.js';
Expand Down Expand Up @@ -84,6 +85,9 @@ export class QueueProcessorService implements OnApplicationShutdown {
@Inject(DI.config)
private config: Config,

@Inject(DI.redisForJobQueue)
private redisForJobQueue: Redis.Redis,

private queueLoggerService: QueueLoggerService,
private webhookDeliverProcessorService: WebhookDeliverProcessorService,
private endedPollNotificationProcessorService: EndedPollNotificationProcessorService,
Expand Down Expand Up @@ -146,7 +150,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
default: throw new Error(`unrecognized job type ${job.name} for system`);
}
}, {
...baseQueueOptions(this.config, QUEUE.SYSTEM),
...baseQueueOptions(this.config, QUEUE.SYSTEM, this.redisForJobQueue),
autorun: false,
});

Expand Down Expand Up @@ -185,7 +189,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
default: throw new Error(`unrecognized job type ${job.name} for db`);
}
}, {
...baseQueueOptions(this.config, QUEUE.DB),
...baseQueueOptions(this.config, QUEUE.DB, this.redisForJobQueue),
autorun: false,
});

Expand All @@ -201,7 +205,7 @@ export class QueueProcessorService implements OnApplicationShutdown {

//#region deliver
this.deliverQueueWorker = new Bull.Worker(QUEUE.DELIVER, (job) => this.deliverProcessorService.process(job), {
...baseQueueOptions(this.config, QUEUE.DELIVER),
...baseQueueOptions(this.config, QUEUE.DELIVER, this.redisForJobQueue),
autorun: false,
concurrency: this.config.deliverJobConcurrency ?? 128,
limiter: {
Expand All @@ -225,7 +229,7 @@ export class QueueProcessorService implements OnApplicationShutdown {

//#region inbox
this.inboxQueueWorker = new Bull.Worker(QUEUE.INBOX, (job) => this.inboxProcessorService.process(job), {
...baseQueueOptions(this.config, QUEUE.INBOX),
...baseQueueOptions(this.config, QUEUE.INBOX, this.redisForJobQueue),
autorun: false,
concurrency: this.config.inboxJobConcurrency ?? 16,
limiter: {
Expand All @@ -249,7 +253,7 @@ export class QueueProcessorService implements OnApplicationShutdown {

//#region webhook deliver
this.webhookDeliverQueueWorker = new Bull.Worker(QUEUE.WEBHOOK_DELIVER, (job) => this.webhookDeliverProcessorService.process(job), {
...baseQueueOptions(this.config, QUEUE.WEBHOOK_DELIVER),
...baseQueueOptions(this.config, QUEUE.WEBHOOK_DELIVER, this.redisForJobQueue),
autorun: false,
concurrency: 64,
limiter: {
Expand Down Expand Up @@ -281,7 +285,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
default: throw new Error(`unrecognized job type ${job.name} for relationship`);
}
}, {
...baseQueueOptions(this.config, QUEUE.RELATIONSHIP),
...baseQueueOptions(this.config, QUEUE.RELATIONSHIP, this.redisForJobQueue),
autorun: false,
concurrency: this.config.relashionshipJobConcurrency ?? 16,
limiter: {
Expand All @@ -308,7 +312,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
default: throw new Error(`unrecognized job type ${job.name} for objectStorage`);
}
}, {
...baseQueueOptions(this.config, QUEUE.OBJECT_STORAGE),
...baseQueueOptions(this.config, QUEUE.OBJECT_STORAGE, this.redisForJobQueue),
autorun: false,
concurrency: 16,
});
Expand All @@ -325,7 +329,7 @@ export class QueueProcessorService implements OnApplicationShutdown {

//#region ended poll notification
this.endedPollNotificationQueueWorker = new Bull.Worker(QUEUE.ENDED_POLL_NOTIFICATION, (job) => this.endedPollNotificationProcessorService.process(job), {
...baseQueueOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION),
...baseQueueOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION, this.redisForJobQueue),
autorun: false,
});
//#endregion
Expand Down
8 changes: 3 additions & 5 deletions packages/backend/src/queue/const.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import { Config } from '@/config.js';
import type * as Bull from 'bullmq';
import type * as Redis from 'ioredis';

export const QUEUE = {
DELIVER: 'deliver',
Expand All @@ -17,12 +18,9 @@ export const QUEUE = {
WEBHOOK_DELIVER: 'webhookDeliver',
};

export function baseQueueOptions(config: Config, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.QueueOptions {
export function baseQueueOptions(config: Config, queueName: typeof QUEUE[keyof typeof QUEUE], redisConnection: Redis.Redis): Bull.QueueOptions {
return {
connection: {
...config.redisForJobQueue,
keyPrefix: undefined,
},
connection: redisConnection,
prefix: config.redisForJobQueue.prefix ? `${config.redisForJobQueue.prefix}:queue:${queueName}` : `queue:${queueName}`,
};
}
Loading