Skip to content

Commit

Permalink
feat: add testcase
Browse files Browse the repository at this point in the history
  • Loading branch information
elrrrrrrr committed Jul 4, 2023
1 parent 8830230 commit 7399dbb
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 63 deletions.
6 changes: 6 additions & 0 deletions app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import path from 'path';
import { readFile } from 'fs/promises';
import { Application } from 'egg';
import { ChangesStreamService } from './app/core/service/ChangesStreamService';
import { HookTriggerWorker } from './app/core/woker/HookTriggerWorker';
declare module 'egg' {
interface Application {
binaryHTML: string;
Expand All @@ -26,6 +27,7 @@ export default class CnpmcoreAppHook {
app.getLogger('sqlLogger').info('[%s] %s', duration, sql);
},
};

}

// https://eggjs.org/zh-cn/basics/app-start.html
Expand All @@ -34,6 +36,10 @@ export default class CnpmcoreAppHook {
const filepath = path.join(this.app.baseDir, 'app/port/binary.html');
const text = await readFile(filepath, 'utf-8');
this.app.binaryHTML = text.replace('{{registry}}', this.app.config.cnpmcore.registry);

// 由于 bullmq 内使用了异步 Worker 消费任务,脱离了 egg ctx 生命周期
// 需要手动初始化,内部使用 getEggObject 获取 egg 对象
new HookTriggerWorker(this.app);
}

// 应用退出时执行
Expand Down
7 changes: 6 additions & 1 deletion app/common/typing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,13 @@ export interface QueueAdapter {
length(key: string): Promise<number>;
}

export type JobData = {
taskId: string;
targetName: string;
};

export interface MQAdapterType {
addJobs(key: string, taskId: string): Promise<boolean>;
addJobs(key: string, data: JobData): Promise<boolean>;
pause(key: string): void;
resume(key: string): void;
}
Expand Down
2 changes: 1 addition & 1 deletion app/core/service/CreateHookTriggerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export class CreateHookTriggerService extends AbstractService {
} catch (e) {
e.message = 'create trigger failed: ' + e.message;
await this.taskService.finishTask(task, TaskState.Fail, `[${isoNow()}][Hooks] ${e.stack} \n`);
return;
throw e;
}
}

Expand Down
2 changes: 1 addition & 1 deletion app/core/service/HookTriggerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export class HookTriggerService {
e.message = 'trigger hook failed: ' + e.message;
task.error = e.message;
await this.taskService.finishTask(task, TaskState.Fail, `[${isoNow()}][TriggerHooks] ${e.stack} \n`);
return;
throw e;
}
}

Expand Down
2 changes: 1 addition & 1 deletion app/core/service/TaskService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export class TaskService extends AbstractService {
await this.taskRepository.saveTask(task);

if (useMQ) {
await this.mqAdapter.addJobs(task.type, task.taskId);
await this.mqAdapter.addJobs(task.type, task);
} else {
await this.queueAdapter.push<string>(task.type, task.taskId);
const queueLength = await this.getTaskQueueLength(task.type);
Expand Down
83 changes: 44 additions & 39 deletions app/core/woker/AbstractWorker.ts
Original file line number Diff line number Diff line change
@@ -1,79 +1,84 @@
import { Inject, LifecycleDestroy, LifecycleInit } from '@eggjs/tegg';
import { MQAdapter } from '../../infra/MQAdapter';
import { Job, UnrecoverableError, Worker } from 'bullmq';
import { EggAppConfig, EggLogger } from 'egg';
import { Application } from 'egg';
import { TaskService } from '../service/TaskService';

export abstract class AbstractWorker {
@Inject()
private readonly queueAdapter: MQAdapter;

@Inject()
private readonly config: EggAppConfig;

@Inject()
private readonly taskService: TaskService;

@Inject()
private readonly logger: EggLogger;
constructor(app: Application) {
this.app = app;
this.registerWorker();
}

protected worker: Worker;

app: Application;
queueKey: string;
configKey: string;
service;

queueAdapter: MQAdapter;
taskService: TaskService;
serviceClass: any;
service: any;

async initWorker() {
throw new Error('should implements in subclass');
await this.initWorkerInfo();
this.queueAdapter = await this.app.getEggObject(MQAdapter);
this.taskService = await this.app.getEggObject(TaskService);
this.service = await this.app.getEggObject(this.serviceClass);
}

@LifecycleInit()
protected async init() {
async initWorkerInfo() {
throw new Error('not implement');
}

async registerWorker() {
await this.initWorker();
const queue = this.queueAdapter.initQueue(this.queueKey);
this.worker = new Worker(
queue.name,
async (job: Job) => {
const startTime = Date.now();
const task = await this.taskService.findTask(job.data.taskId);
if (!task) {
throw new UnrecoverableError('task not found');
}
await this.app.runInAnonymousContextScope(async ctx => {
await ctx.beginModuleScope(async () => {
console.log('开始干活了');
const startTime = Date.now();
const task = await this.taskService.findTask(job.data.taskId);
if (!task) {
throw new UnrecoverableError('task not found');
}

this.logger.info(`[${this.queueKey}Worker:subscribe:executeTask:start][%s] taskId: %s, targetName: %s, attempts: %s, params: %j, updatedAt: %s, delay %sms`,
this.worker.concurrency, task.taskId, task.targetName, task.attempts, task.data, task.updatedAt,
startTime - task.updatedAt.getTime());
if (this.worker.concurrency !== this.config.cnpmcore[this.configKey]) {
this.worker.concurrency = this.config.cnpmcore[this.configKey];
}
if (this.worker.concurrency !== this.app.config.cnpmcore[this.configKey]) {
this.worker.concurrency = this.app.config.cnpmcore[this.configKey];
}
this.app.logger.info(`[${this.queueKey}_worker:subscribe:executeTask:start] taskId: %s, targetName: %s, attempts: %s, params: %j, updatedAt: %s, delay %sms`,
task.taskId, task.targetName, task.attempts, task.data, task.updatedAt,
startTime - task.updatedAt.getTime());

// TODO bullmq 移除了 timeout 配置,需要自己实现一个 promise.race 执行 timeout
await this.service.executeTask(job.data);
// TODO bullmq 移除了 timeout 配置,需要自己实现一个 promise.race 执行 timeout
console.log('搞');
await this.service.executeTask(task as any);
console.log('搞好');
});
});
},
{
concurrency: this.config.cnpmcore[this.configKey],
autorun: true,
concurrency: this.app.config.cnpmcore[this.configKey],
},
);

this.worker.on('completed', (job: Job) => {
this.logger.info(`[${this.queueKey}Worker:subscribe:executeTask:success][%s] taskId: %s, targetName: %s, use %sms`,
console.log('干好了');
this.app.logger.info(`[${this.queueKey}_worker:subscribe:executeTask:success] taskId: %s, targetName: %s, use %sms`,
job.data.taskId, job.data.targetName, Date.now() - job.timestamp);
});

this.worker.on('failed', (job?: Job) => {
if (!job) {
return;
}
this.logger.info(`[${this.queueKey}Worker:subscribe:executeTask:failed][%s] taskId: %s, targetName: %s, attemptsMade %s`,
this.app.logger.info(`[${this.queueKey}_worker:subscribe:executeTask:failed] taskId: %s, targetName: %s, attemptsMade %s`,
job.data.taskId, job.data.targetName, job.attemptsMade);
});

}

@LifecycleDestroy()
protected async destroy() {
await this.worker.close();
}

}
10 changes: 2 additions & 8 deletions app/core/woker/HookTriggerWorker.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
import { Inject, SingletonProto } from '@eggjs/tegg';
import { TaskType } from '../../common/enum/Task';
import { HookTriggerService } from '../service/HookTriggerService';
import { AbstractWorker } from './AbstractWorker';

@SingletonProto()
export class HookTriggerWorker extends AbstractWorker {

@Inject()
private readonly hookTriggerService: HookTriggerService;

async initWorker(): Promise<void> {
async initWorkerInfo(): Promise<void> {
this.queueKey = TaskType.TriggerHook;
this.service = this.hookTriggerService;
this.serviceClass = HookTriggerService;
this.configKey = 'triggerHookWorkerMaxConcurrentTasks';
}

}
8 changes: 5 additions & 3 deletions app/infra/MQAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
} from '@eggjs/tegg';
import { Redis } from 'ioredis';
import { JobsOptions, Queue } from 'bullmq';
import { MQAdapterType } from '../common/typing';
import { JobData, MQAdapterType } from '../common/typing';

/**
* Use sort set to keep queue in order and keep same value only insert once
Expand Down Expand Up @@ -39,10 +39,10 @@ export class MQAdapter implements MQAdapterType {
* If queue has the same item, return false
* If queue not has the same item, return true
*/
async addJobs(key: string, taskId: string, options?: JobsOptions): Promise<boolean> {
async addJobs(key: string, { taskId, targetName } : JobData, options?: JobsOptions): Promise<boolean> {
try {
const queue = this.initQueue(key);
await queue.add(key, { jobId: taskId },
await queue.add(key, { taskId, targetName },
{
removeOnComplete: true,
removeOnFail: true,
Expand All @@ -51,6 +51,8 @@ export class MQAdapter implements MQAdapterType {
type: 'exponential',
delay: 1000,
},
// remove duplicate job
jobId: taskId,
...options,
},
);
Expand Down
70 changes: 61 additions & 9 deletions test/core/worker/HookTriggerWorker.test.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,71 @@
import { app } from 'egg-mock/bootstrap';
import { Change } from '../../../app/core/entity/Change';
import assert from 'assert';
import { HookTriggerWorker } from '../../../app/core/woker/HookTriggerWorker';
import { setTimeout } from 'node:timers/promises';
import { PACKAGE_VERSION_ADDED } from '../../../app/core/event';
import { ChangeRepository } from '../../../app/repository/ChangeRepository';
import { HookManageService } from '../../../app/core/service/HookManageService';
import { HookType } from '../../../app/common/enum/Hook';
import { HookEvent } from '../../../app/core/entity/HookEvent';
import { Task } from '../../../app/core/entity/Task';
import { CreateHookTriggerService } from '../../../app/core/service/CreateHookTriggerService';
import { TestUtil } from '../../TestUtil';
import { UserRepository } from '../../../app/repository/UserRepository';
import { TaskState } from '../../../app/common/enum/Task';

describe('test/core/worker/HookTriggerWorker.test.ts', () => {
let hookTriggerWorker: HookTriggerWorker;

beforeEach(async () => {
hookTriggerWorker = await app.getEggObject(HookTriggerWorker);
});
describe('trigger hook', () => {

let change: Change;
let hookManageService: HookManageService;
let taskId: string;
const pkgName = '@cnpmcore/foo';
beforeEach(async () => {
app.mockLog();
const { name: username } = await TestUtil.createUser();
await TestUtil.createPackage({ name: pkgName });
change = Change.create({
type: PACKAGE_VERSION_ADDED,
targetName: pkgName,
data: {
version: '1.0.0',
},
});
app.mockHttpclient('http://foo.com', 'POST', {
status: 200,
});
const changeRepository = await app.getEggObject(ChangeRepository);
await changeRepository.addChange(change);
const userRepository = await app.getEggObject(UserRepository);
const user = await userRepository.findUserByName(username);
const userId = user!.userId;
hookManageService = await app.getEggObject(HookManageService);
await hookManageService.createHook({
type: HookType.Package,
ownerId: userId,
name: pkgName,
endpoint: 'http://foo.com',
secret: 'mock_secret',
});

});

it('should work', async () => {

describe('initWorker', () => {
it('should init worker', async () => {
await app.ready();
assert.equal(hookTriggerWorker.configKey, 'triggerHookWorkerMaxConcurrentTasks');
assert.equal(hookTriggerWorker.queueKey, 'trigger_hook');
const task = Task.createCreateHookTask(HookEvent.createPublishEvent(pkgName, change.changeId, '1.0.0', 'latest'));
taskId = task.taskId;
const createHookTriggerService = await app.getEggObject(CreateHookTriggerService);
await createHookTriggerService.executeTask(task);
assert.equal(task?.state, TaskState.Success);
assert(taskId);
await setTimeout(100);

app.expectLog('trigger_hook_worker:subscribe:executeTask:start');
app.expectLog('trigger_hook_worker:subscribe:executeTask:success');

});
});

});
61 changes: 61 additions & 0 deletions test/infra/MQAdapter.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import assert from 'assert';
import { setTimeout } from 'node:timers/promises';
import { Job, Worker } from 'bullmq';
import { app } from 'egg-mock/bootstrap';
import { MQAdapter } from '../../app/infra/MQAdapter';

describe('test/infra/MQAdapter.test.ts', () => {
let mqAdapter: MQAdapter;

beforeEach(async () => {
mqAdapter = await app.getEggObject(MQAdapter);
});

it('should remove duplicate task', async () => {
const queue = mqAdapter.initQueue('banana');

await mqAdapter.addJobs('banana', { taskId: '1', targetName: 'okk' });
await mqAdapter.addJobs('banana', { taskId: '2', targetName: 'okk' });
await mqAdapter.addJobs('banana', { taskId: '1', targetName: 'okk' });

const len = await queue.count();
assert.equal(len, 2);

});

it('should retry failed task', async () => {

const queue = mqAdapter.initQueue('apple');

// retry 1 time;
await mqAdapter.addJobs('apple', { taskId: '3', targetName: 'apple' }, {
attempts: 2,
backoff: {
type: 'exponential',
delay: 1,
},
});

let failed = 0;

const worker = new Worker(queue.name, async (job: Job) => {
// console.log(job.data);
throw new Error(`${job.data.taskId} error`);
});

worker.on('failed', job => {
// console.log('failed', job?.data?.taskId);
job && failed++;
});

let len = await queue.count();
assert.equal(len, 1);

// retry triggered
await setTimeout(50);

assert.equal(failed, 2);
len = await queue.count();
assert.equal(len, 0);
});
});

0 comments on commit 7399dbb

Please sign in to comment.