Skip to content

Commit

Permalink
Remove unused code
Browse files Browse the repository at this point in the history
  • Loading branch information
weyoss committed Feb 19, 2022
1 parent 424da04 commit 475fa3d
Show file tree
Hide file tree
Showing 10 changed files with 12 additions and 185 deletions.
16 changes: 0 additions & 16 deletions src/monitor-server/workers/websocket-heartbeat-stream.worker.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
import {
ICallback,
TWebsocketHeartbeatOnlineIdsStreamPayload,
TWorkerParameters,
} from '../../../types';
import { RedisClient } from '../../system/common/redis-client/redis-client';
import { EmptyCallbackReplyError } from '../../system/common/errors/empty-callback-reply.error';
import { ConsumerHeartbeat } from '../../system/app/consumer/consumer-heartbeat';
import { setConfiguration } from '../../system/common/configuration/configuration';
import { Worker } from '../../system/common/worker/worker';
import { each } from '../../system/lib/async';

Expand Down Expand Up @@ -47,15 +43,3 @@ export class WebsocketHeartbeatStreamWorker extends Worker {
}

export default WebsocketHeartbeatStreamWorker;

process.on('message', (payload: string) => {
const params: TWorkerParameters = JSON.parse(payload);
setConfiguration(params.config);
RedisClient.getNewInstance((err, client) => {
if (err) throw err;
else if (!client) throw new EmptyCallbackReplyError();
else {
new WebsocketHeartbeatStreamWorker(client, params, false).run();
}
});
});
14 changes: 0 additions & 14 deletions src/monitor-server/workers/websocket-main-stream.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,11 @@ import {
TQueueParams,
TWebsocketMainStreamPayload,
TWebsocketMainStreamPayloadQueue,
TWorkerParameters,
} from '../../../types';
import { redisKeys } from '../../system/common/redis-keys/redis-keys';
import { RedisClient } from '../../system/common/redis-client/redis-client';
import { MessageManager } from '../../system/app/message-manager/message-manager';
import { EmptyCallbackReplyError } from '../../system/common/errors/empty-callback-reply.error';
import { Consumer } from '../../system/app/consumer/consumer';
import { queueManager } from '../../system/app/queue-manager/queue-manager';
import { setConfiguration } from '../../system/common/configuration/configuration';
import { Worker } from '../../system/common/worker/worker';
import { each, waterfall } from '../../system/lib/async';

Expand Down Expand Up @@ -215,13 +211,3 @@ export class WebsocketMainStreamWorker extends Worker {
}

export default WebsocketMainStreamWorker;

process.on('message', (payload: string) => {
const params: TWorkerParameters = JSON.parse(payload);
setConfiguration(params.config);
RedisClient.getNewInstance((err, client) => {
if (err) throw err;
else if (!client) throw new EmptyCallbackReplyError();
else new WebsocketMainStreamWorker(client, params, false).run();
});
});
15 changes: 1 addition & 14 deletions src/monitor-server/workers/websocket-online-stream.worker.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import { ICallback, TQueueParams, TWorkerParameters } from '../../../types';
import { RedisClient } from '../../system/common/redis-client/redis-client';
import { EmptyCallbackReplyError } from '../../system/common/errors/empty-callback-reply.error';
import { ICallback, TQueueParams } from '../../../types';
import { Consumer } from '../../system/app/consumer/consumer';
import { queueManager } from '../../system/app/queue-manager/queue-manager';
import { setConfiguration } from '../../system/common/configuration/configuration';
import { Worker } from '../../system/common/worker/worker';
import { each, waterfall } from '../../system/lib/async';

Expand Down Expand Up @@ -44,13 +41,3 @@ export class WebsocketOnlineStreamWorker extends Worker {
}

export default WebsocketOnlineStreamWorker;

process.on('message', (payload: string) => {
const params: TWorkerParameters = JSON.parse(payload);
setConfiguration(params.config);
RedisClient.getNewInstance((err, client) => {
if (err) throw err;
else if (!client) throw new EmptyCallbackReplyError();
else new WebsocketOnlineStreamWorker(client, params, false).run();
});
});
17 changes: 1 addition & 16 deletions src/monitor-server/workers/websocket-rate-stream.worker.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import { ICallback, TQueueParams, TWorkerParameters } from '../../../types';
import { ICallback, TQueueParams } from '../../../types';
import { redisKeys } from '../../system/common/redis-keys/redis-keys';
import { RedisClient } from '../../system/common/redis-client/redis-client';
import { EmptyCallbackReplyError } from '../../system/common/errors/empty-callback-reply.error';
import { ConsumerHeartbeat } from '../../system/app/consumer/consumer-heartbeat';
import { TimeSeries } from '../../system/common/time-series/time-series';
import { QueuePublishedTimeSeries } from '../../system/app/producer/producer-time-series/queue-published-time-series';
Expand All @@ -13,7 +11,6 @@ import { GlobalDeadLetteredTimeSeries } from '../../system/app/consumer/consumer
import { ConsumerAcknowledgedTimeSeries } from '../../system/app/consumer/consumer-time-series/consumer-acknowledged-time-series';
import { ConsumerDeadLetteredTimeSeries } from '../../system/app/consumer/consumer-time-series/consumer-dead-lettered-time-series';
import { consumerQueues } from '../../system/app/consumer/consumer-queues';
import { setConfiguration } from '../../system/common/configuration/configuration';
import { Worker } from '../../system/common/worker/worker';
import { each, waterfall } from '../../system/lib/async';

Expand Down Expand Up @@ -292,15 +289,3 @@ export class WebsocketRateStreamWorker extends Worker {
}

export default WebsocketRateStreamWorker;

process.on('message', (payload: string) => {
const params: TWorkerParameters = JSON.parse(payload);
setConfiguration(params.config);
RedisClient.getNewInstance((err, client) => {
if (err) throw err;
else if (!client) throw new EmptyCallbackReplyError();
else {
new WebsocketRateStreamWorker(client, params, false).run();
}
});
});
74 changes: 10 additions & 64 deletions src/system/common/worker/worker-runner/worker-runner.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { ChildProcess, fork } from 'child_process';
import { join } from 'path';
import { readdir } from 'fs';
import {
Expand All @@ -8,7 +7,6 @@ import {
TWorkerParameters,
} from '../../../../../types';
import { PowerManager } from '../../power-manager/power-manager';
import { WorkerRunnerError } from './worker-runner.error';
import { EventEmitter } from 'events';
import { Ticker } from '../../ticker/ticker';
import { LockManager } from '../../lock-manager/lock-manager';
Expand All @@ -29,18 +27,17 @@ export class WorkerRunner<
private readonly powerManager: PowerManager;
private readonly ticker: Ticker;
private readonly lockManager: LockManager;
private readonly workerThreads: ChildProcess[] = [];
private readonly redisClient: RedisClient;
private readonly logger: ICompatibleLogger;
private readonly workerPool: WorkerPool | null = null;
private readonly workerPool: WorkerPool;
private initialized = false;

constructor(
redisClient: RedisClient,
workersDir: string,
keyLock: string,
workerParameters: WorkerParameters,
workerPool?: WorkerPool,
workerPool: WorkerPool,
) {
super();
this.powerManager = new PowerManager();
Expand All @@ -50,9 +47,7 @@ export class WorkerRunner<
this.logger = getNamespacedLogger(this.constructor.name);
this.lockManager = new LockManager(redisClient, keyLock, 10000, false);
this.ticker = new Ticker(this.onTick, 1000);
if (workerPool) {
this.workerPool = workerPool;
}
this.workerPool = workerPool;
}

private onTick = (): void => {
Expand All @@ -78,11 +73,6 @@ export class WorkerRunner<
});
};

private onProcessExit = (): void => {
this.workerThreads.forEach((i) => i.kill());
if (this.workerPool) this.workerPool.clear(() => void 0);
};

private init = (cb: ICallback<void>): void => {
readdir(this.workersDir, undefined, (err, reply) => {
if (err) cb(err);
Expand All @@ -91,11 +81,7 @@ export class WorkerRunner<
reply ?? [],
(filename: string, _, done) => {
if (filename.match(/\.worker\.js$/)) {
if (this.workerPool) this.addToWorkerPool(filename, done);
else {
this.forkWorkerThread(filename);
done();
}
this.addToWorkerPool(filename, done);
} else done();
},
cb,
Expand Down Expand Up @@ -136,36 +122,6 @@ export class WorkerRunner<
.catch(cb);
};

private forkWorkerThread = (filename: string): void => {
const filepath = join(this.workersDir, filename);
const thread = fork(filepath);
thread.on('error', (err) => {
if (this.powerManager.isGoingUp() || this.powerManager.isRunning()) {
this.emit(events.ERROR, err);
}
});
thread.on('exit', (code, signal) => {
if (this.powerManager.isGoingUp() || this.powerManager.isRunning()) {
this.emit(
events.ERROR,
new WorkerRunnerError(
`Thread [${filepath}] exited with code ${code} and signal ${signal}`,
),
);
}
});
thread.send(JSON.stringify(this.workerParameters));
this.workerThreads.push(thread);
};

private shutdownWorkerThreads = (cb: ICallback<void>): void => {
const thread = this.workerThreads.pop();
if (thread) {
thread.once('exit', () => this.shutdownWorkerThreads(cb));
thread.kill('SIGHUP');
} else cb();
};

private clearWorkerPool = (cb: ICallback<void>): void => {
if (this.workerPool) this.workerPool.clear(cb);
else cb();
Expand All @@ -182,28 +138,18 @@ export class WorkerRunner<

run = (): void => {
this.powerManager.goingUp();
process.once('exit', this.onProcessExit);
this.ticker.nextTick();
this.powerManager.commit();
this.emit(events.UP);
};

quit = (cb: ICallback<void>): void => {
this.powerManager.goingDown();
waterfall(
[
this.stopTicker,
this.shutdownWorkerThreads,
this.clearWorkerPool,
this.releaseLock,
],
() => {
process.removeListener('exit', this.onProcessExit);
this.initialized = false;
this.powerManager.commit();
this.emit(events.DOWN);
cb();
},
);
waterfall([this.stopTicker, this.clearWorkerPool, this.releaseLock], () => {
this.initialized = false;
this.powerManager.commit();
this.emit(events.DOWN);
cb();
});
};
}
12 changes: 0 additions & 12 deletions src/system/workers/delay.worker.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import { RedisClient } from '../common/redis-client/redis-client';
import { redisKeys } from '../common/redis-keys/redis-keys';
import { ICallback, IConsumerWorkerParameters } from '../../../types';
import { EmptyCallbackReplyError } from '../common/errors/empty-callback-reply.error';
import { Message } from '../app/message/message';
import { broker } from '../common/broker/broker';
import { Worker } from '../common/worker/worker';
import { setConfiguration } from '../common/configuration/configuration';
import { each } from '../lib/async';

export class DelayWorker extends Worker<IConsumerWorkerParameters> {
Expand Down Expand Up @@ -51,13 +49,3 @@ export class DelayWorker extends Worker<IConsumerWorkerParameters> {
}

export default DelayWorker;

process.on('message', (payload: string) => {
const params: IConsumerWorkerParameters = JSON.parse(payload);
setConfiguration(params.config);
RedisClient.getNewInstance((err, client) => {
if (err) throw err;
else if (!client) throw new EmptyCallbackReplyError();
else new DelayWorker(client, params, false).run();
});
});
13 changes: 0 additions & 13 deletions src/system/workers/heartbeat-monitor.worker.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import { ICallback, IConsumerWorkerParameters } from '../../../types';
import { ConsumerHeartbeat } from '../app/consumer/consumer-heartbeat';
import { RedisClient } from '../common/redis-client/redis-client';
import { EmptyCallbackReplyError } from '../common/errors/empty-callback-reply.error';
import { Worker } from '../common/worker/worker';
import { setConfiguration } from '../common/configuration/configuration';
import { waterfall } from '../lib/async';

export class HeartbeatMonitorWorker extends Worker<IConsumerWorkerParameters> {
Expand All @@ -26,13 +23,3 @@ export class HeartbeatMonitorWorker extends Worker<IConsumerWorkerParameters> {
}

export default HeartbeatMonitorWorker;

process.on('message', (payload: string) => {
const params: IConsumerWorkerParameters = JSON.parse(payload);
setConfiguration(params.config);
RedisClient.getNewInstance((err, client) => {
if (err) throw err;
else if (!client) throw new EmptyCallbackReplyError();
else new HeartbeatMonitorWorker(client, params, false).run();
});
});
12 changes: 0 additions & 12 deletions src/system/workers/requeue.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ import { RedisClient } from '../common/redis-client/redis-client';
import { redisKeys } from '../common/redis-keys/redis-keys';
import { Message } from '../app/message/message';
import { ICallback, IConsumerWorkerParameters } from '../../../types';
import { EmptyCallbackReplyError } from '../common/errors/empty-callback-reply.error';
import { PanicError } from '../common/errors/panic.error';
import { Worker } from '../common/worker/worker';
import { setConfiguration } from '../common/configuration/configuration';
import { each } from '../lib/async';

export class RequeueWorker extends Worker<IConsumerWorkerParameters> {
Expand Down Expand Up @@ -67,13 +65,3 @@ export class RequeueWorker extends Worker<IConsumerWorkerParameters> {
}

export default RequeueWorker;

process.on('message', (payload: string) => {
const params: IConsumerWorkerParameters = JSON.parse(payload);
setConfiguration(params.config);
RedisClient.getNewInstance((err, client) => {
if (err) throw err;
else if (!client) throw new EmptyCallbackReplyError();
else new RequeueWorker(client, params, false).run();
});
});
12 changes: 0 additions & 12 deletions src/system/workers/schedule.worker.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import { ICallback, IConsumerWorkerParameters } from '../../../types';
import { redisKeys } from '../common/redis-keys/redis-keys';
import { RedisClient } from '../common/redis-client/redis-client';
import { EmptyCallbackReplyError } from '../common/errors/empty-callback-reply.error';
import { Message } from '../app/message/message';
import { ELuaScriptName } from '../common/redis-client/lua-scripts';
import { Worker } from '../common/worker/worker';
import { setConfiguration } from '../common/configuration/configuration';
import { each, waterfall } from '../lib/async';

export class ScheduleWorker extends Worker<IConsumerWorkerParameters> {
Expand Down Expand Up @@ -104,13 +102,3 @@ export class ScheduleWorker extends Worker<IConsumerWorkerParameters> {
}

export default ScheduleWorker;

process.on('message', (payload: string) => {
const params: IConsumerWorkerParameters = JSON.parse(payload);
setConfiguration(params.config);
RedisClient.getNewInstance((err, client) => {
if (err) throw err;
else if (!client) throw new EmptyCallbackReplyError();
else new ScheduleWorker(client, params, false).run();
});
});
12 changes: 0 additions & 12 deletions src/system/workers/time-series.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import {
IConsumerWorkerParameters,
TQueueParams,
} from '../../../types';
import { EmptyCallbackReplyError } from '../common/errors/empty-callback-reply.error';
import { Worker } from '../common/worker/worker';
import { setConfiguration } from '../common/configuration/configuration';
import { queueManager } from '../app/queue-manager/queue-manager';
import { eachOf, waterfall } from '../lib/async';
import { QueueAcknowledgedTimeSeries } from '../app/consumer/consumer-time-series/queue-acknowledged-time-series';
Expand Down Expand Up @@ -139,13 +137,3 @@ export class TimeSeriesWorker extends Worker<IConsumerWorkerParameters> {
}

export default TimeSeriesWorker;

process.on('message', (payload: string) => {
const params: IConsumerWorkerParameters = JSON.parse(payload);
setConfiguration(params.config);
RedisClient.getNewInstance((err, client) => {
if (err) throw err;
else if (!client) throw new EmptyCallbackReplyError();
else new TimeSeriesWorker(client, params, false).run();
});
});

0 comments on commit 475fa3d

Please sign in to comment.