Skip to content

Commit

Permalink
feat: hook mq
Browse files Browse the repository at this point in the history
  • Loading branch information
elrrrrrrr committed Jul 4, 2023
1 parent 18af011 commit 7bb896b
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 64 deletions.
6 changes: 6 additions & 0 deletions app/common/typing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ export interface QueueAdapter {
length(key: string): Promise<number>;
}

export interface MQAdapterType {
addJobs(key: string, taskId: string): Promise<boolean>;
pause(key: string): void;
resume(key: string): void;
}

export interface AuthUrlResult {
loginUrl: string;
doneUrl: string;
Expand Down
2 changes: 1 addition & 1 deletion app/core/service/CreateHookTriggerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ export class CreateHookTriggerService extends AbstractService {
private async createTriggerTasks(hooks: Array<Hook>, hookEvent: HookEvent) {
await pMap(hooks, async hook => {
const triggerHookTask = Task.createTriggerHookTask(hookEvent, hook.hookId);
await this.taskService.createTask(triggerHookTask, true);
await this.taskService.createTask(triggerHookTask, false, true);
}, { concurrency: 5 });
}
}
19 changes: 14 additions & 5 deletions app/core/service/TaskService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { AbstractService } from '../../common/AbstractService';
import { TaskRepository } from '../../repository/TaskRepository';
import { Task, CreateSyncPackageTaskData } from '../entity/Task';
import { QueueAdapter } from '../../common/typing';
import { MQAdapter } from '../../infra/MQAdapter';

@SingletonProto({
accessLevel: AccessLevel.PUBLIC,
Expand All @@ -21,11 +22,14 @@ export class TaskService extends AbstractService {
@Inject()
private readonly queueAdapter: QueueAdapter;

@Inject()
private readonly mqAdapter: MQAdapter;

public async getTaskQueueLength(taskType: TaskType) {
return await this.queueAdapter.length(taskType);
}

public async createTask(task: Task, addTaskQueueOnExists: boolean) {
public async createTask(task: Task, addTaskQueueOnExists: boolean, useMQ = false) {
const existsTask = await this.taskRepository.findTaskByTargetName(task.targetName, task.type);
if (existsTask) {
// 如果任务还未被触发,就不继续重复创建
Expand Down Expand Up @@ -60,10 +64,15 @@ export class TaskService extends AbstractService {
return existsTask;
}
await this.taskRepository.saveTask(task);
await this.queueAdapter.push<string>(task.type, task.taskId);
const queueLength = await this.getTaskQueueLength(task.type);
this.logger.info('[TaskService.createTask:new] taskType: %s, targetName: %s, taskId: %s, queue size: %s',
task.type, task.targetName, task.taskId, queueLength);

if (useMQ) {
await this.mqAdapter.addJobs(task.type, task.taskId);
} else {
await this.queueAdapter.push<string>(task.type, task.taskId);
const queueLength = await this.getTaskQueueLength(task.type);
this.logger.info('[TaskService.createTask:new] taskType: %s, targetName: %s, taskId: %s, queue size: %s',
task.type, task.targetName, task.taskId, queueLength);
}
return task;
}

Expand Down
79 changes: 79 additions & 0 deletions app/core/woker/AbstractWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { Inject, LifecycleDestroy, LifecycleInit } from '@eggjs/tegg';
import { MQAdapter } from '../../infra/MQAdapter';
import { Job, UnrecoverableError, Worker } from 'bullmq';
import { EggAppConfig, EggLogger } from 'egg';
import { TaskService } from '../service/TaskService';

export abstract class AbstractWorker {
@Inject()
private readonly queueAdapter: MQAdapter;

@Inject()
private readonly config: EggAppConfig;

@Inject()
private readonly taskService: TaskService;

@Inject()
private readonly logger: EggLogger;

@Inject()
protected worker: Worker;

queueKey: string;
configKey: string;
service;

async initWorker() {
throw new Error('should implements in subclass');
}

@LifecycleInit()
protected async init() {
this.initWorker();
const queue = this.queueAdapter.initQueue(this.queueKey);
this.worker = new Worker(
queue.name,
async (job: Job) => {
const startTime = Date.now();
const task = await this.taskService.findTask(job.data.taskId);
if (!task) {
throw new UnrecoverableError('task not found');
}

this.logger.info(`[${this.queueKey}Worker:subscribe:executeTask:start][%s] taskId: %s, targetName: %s, attempts: %s, params: %j, updatedAt: %s, delay %sms`,
this.worker.concurrency, task.taskId, task.targetName, task.attempts, task.data, task.updatedAt,
startTime - task.updatedAt.getTime());
if (this.worker.concurrency !== this.config.cnpmcore[this.configKey]) {
this.worker.concurrency = this.config.cnpmcore[this.configKey];
}

// TODO bullmq 移除了 timeout 配置,需要自己实现一个 promise.race 执行 timeout
await this.service.executeTask(job.data);
},
{
concurrency: this.config.cnpmcore[this.configKey],
},
);

this.worker.on('completed', (job: Job) => {
this.logger.info(`[${this.queueKey}Worker:subscribe:executeTask:success][%s] taskId: %s, targetName: %s, use %sms`,
job.data.taskId, job.data.targetName, Date.now() - job.timestamp);
});

this.worker.on('failed', (job?: Job) => {
if (!job) {
return;
}
this.logger.info(`[${this.queueKey}Worker:subscribe:executeTask:failed][%s] taskId: %s, targetName: %s, attemptsMade %s`,
job.data.taskId, job.data.targetName, job.attemptsMade);
});

}

@LifecycleDestroy()
protected async destroy() {
await this.worker.close();
}

}
17 changes: 17 additions & 0 deletions app/core/woker/HookTriggerWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { Inject } from '@eggjs/tegg';
import { TaskType } from '../../common/enum/Task';
import { HookTriggerService } from '../service/HookTriggerService';
import { AbstractWorker } from './AbstractWorker';

export class HookTriggerWorker extends AbstractWorker {

@Inject()
private readonly hookTriggerService: HookTriggerService;

async initWoker(): Promise<void> {
this.queueKey = TaskType.TriggerHook;
this.service = this.hookTriggerService;
this.configKey = 'triggerHookWorkerMaxConcurrentTasks';
}

}
71 changes: 71 additions & 0 deletions app/infra/MQAdapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import {
AccessLevel,
Inject,
SingletonProto,
} from '@eggjs/tegg';
import { Redis } from 'ioredis';
import { JobsOptions, Queue } from 'bullmq';
import { MQAdapterType } from '../common/typing';

/**
* Use sort set to keep queue in order and keep same value only insert once
*/
@SingletonProto({
accessLevel: AccessLevel.PUBLIC,
name: 'mqAdapter',
})
export class MQAdapter implements MQAdapterType {
@Inject()
private readonly redis: Redis; // 由 redis 插件引入

private queueMap: Record<string, Queue>;

private getQueueName(key: string) {
return `CNPMCORE_MQ_V1_${key}`;
}

initQueue(key: string) {
const queueName = this.getQueueName(key);
if (!this.queueMap[key]) {
this.queueMap[key] = new Queue(queueName, {
connection: this.redis,
});
}

return this.queueMap[key];
}

/**
* If queue has the same item, return false
* If queue not has the same item, return true
*/
async addJobs(key: string, taskId: string, options?: JobsOptions): Promise<boolean> {
try {
const queue = this.initQueue(key);
await queue.add(key, { jobId: taskId },
{
removeOnComplete: true,
removeOnFail: true,
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
...options,
},
);
return true;
} catch (e) {
return false;
}
}

async pause(key: string) {
await this.initQueue(key).pause();
}

async resume(key: string) {
await this.initQueue(key).pause();
}

}
58 changes: 0 additions & 58 deletions app/port/schedule/TriggerHookWorker.ts

This file was deleted.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
"base-x": "^3.0.9",
"base64url": "^3.0.1",
"bson-objectid": "^2.0.1",
"bullmq": "^4.2.0",
"dayjs": "^1.10.7",
"egg": "^3.9.2",
"egg-cors": "^2.2.3",
Expand Down
Empty file.

0 comments on commit 7bb896b

Please sign in to comment.