diff --git a/src/monitor-server/workers/websocket-heartbeat-stream.worker.ts b/src/monitor-server/workers/websocket-heartbeat-stream.worker.ts index f3e8e0bd..64bb3a3a 100644 --- a/src/monitor-server/workers/websocket-heartbeat-stream.worker.ts +++ b/src/monitor-server/workers/websocket-heartbeat-stream.worker.ts @@ -1,12 +1,8 @@ import { ICallback, TWebsocketHeartbeatOnlineIdsStreamPayload, - TWorkerParameters, } from '../../../types'; -import { RedisClient } from '../../system/common/redis-client/redis-client'; -import { EmptyCallbackReplyError } from '../../system/common/errors/empty-callback-reply.error'; import { ConsumerHeartbeat } from '../../system/app/consumer/consumer-heartbeat'; -import { setConfiguration } from '../../system/common/configuration/configuration'; import { Worker } from '../../system/common/worker/worker'; import { each } from '../../system/lib/async'; @@ -47,15 +43,3 @@ export class WebsocketHeartbeatStreamWorker extends Worker { } export default WebsocketHeartbeatStreamWorker; - -process.on('message', (payload: string) => { - const params: TWorkerParameters = JSON.parse(payload); - setConfiguration(params.config); - RedisClient.getNewInstance((err, client) => { - if (err) throw err; - else if (!client) throw new EmptyCallbackReplyError(); - else { - new WebsocketHeartbeatStreamWorker(client, params, false).run(); - } - }); -}); diff --git a/src/monitor-server/workers/websocket-main-stream.worker.ts b/src/monitor-server/workers/websocket-main-stream.worker.ts index 0466179b..b8164446 100644 --- a/src/monitor-server/workers/websocket-main-stream.worker.ts +++ b/src/monitor-server/workers/websocket-main-stream.worker.ts @@ -3,15 +3,11 @@ import { TQueueParams, TWebsocketMainStreamPayload, TWebsocketMainStreamPayloadQueue, - TWorkerParameters, } from '../../../types'; import { redisKeys } from '../../system/common/redis-keys/redis-keys'; -import { RedisClient } from '../../system/common/redis-client/redis-client'; import { MessageManager } from '../../system/app/message-manager/message-manager'; -import { EmptyCallbackReplyError } from '../../system/common/errors/empty-callback-reply.error'; import { Consumer } from '../../system/app/consumer/consumer'; import { queueManager } from '../../system/app/queue-manager/queue-manager'; -import { setConfiguration } from '../../system/common/configuration/configuration'; import { Worker } from '../../system/common/worker/worker'; import { each, waterfall } from '../../system/lib/async'; @@ -215,13 +211,3 @@ export class WebsocketMainStreamWorker extends Worker { } export default WebsocketMainStreamWorker; - -process.on('message', (payload: string) => { - const params: TWorkerParameters = JSON.parse(payload); - setConfiguration(params.config); - RedisClient.getNewInstance((err, client) => { - if (err) throw err; - else if (!client) throw new EmptyCallbackReplyError(); - else new WebsocketMainStreamWorker(client, params, false).run(); - }); -}); diff --git a/src/monitor-server/workers/websocket-online-stream.worker.ts b/src/monitor-server/workers/websocket-online-stream.worker.ts index a023bd2f..e0d3ea9b 100644 --- a/src/monitor-server/workers/websocket-online-stream.worker.ts +++ b/src/monitor-server/workers/websocket-online-stream.worker.ts @@ -1,9 +1,6 @@ -import { ICallback, TQueueParams, TWorkerParameters } from '../../../types'; -import { RedisClient } from '../../system/common/redis-client/redis-client'; -import { EmptyCallbackReplyError } from '../../system/common/errors/empty-callback-reply.error'; +import { ICallback, TQueueParams } from '../../../types'; import { Consumer } from '../../system/app/consumer/consumer'; import { queueManager } from '../../system/app/queue-manager/queue-manager'; -import { setConfiguration } from '../../system/common/configuration/configuration'; import { Worker } from '../../system/common/worker/worker'; import { each, waterfall } from '../../system/lib/async'; @@ -44,13 +41,3 @@ export class WebsocketOnlineStreamWorker extends Worker { } export default WebsocketOnlineStreamWorker; - -process.on('message', (payload: string) => { - const params: TWorkerParameters = JSON.parse(payload); - setConfiguration(params.config); - RedisClient.getNewInstance((err, client) => { - if (err) throw err; - else if (!client) throw new EmptyCallbackReplyError(); - else new WebsocketOnlineStreamWorker(client, params, false).run(); - }); -}); diff --git a/src/monitor-server/workers/websocket-rate-stream.worker.ts b/src/monitor-server/workers/websocket-rate-stream.worker.ts index 4a605673..9b08816a 100644 --- a/src/monitor-server/workers/websocket-rate-stream.worker.ts +++ b/src/monitor-server/workers/websocket-rate-stream.worker.ts @@ -1,7 +1,5 @@ -import { ICallback, TQueueParams, TWorkerParameters } from '../../../types'; +import { ICallback, TQueueParams } from '../../../types'; import { redisKeys } from '../../system/common/redis-keys/redis-keys'; -import { RedisClient } from '../../system/common/redis-client/redis-client'; -import { EmptyCallbackReplyError } from '../../system/common/errors/empty-callback-reply.error'; import { ConsumerHeartbeat } from '../../system/app/consumer/consumer-heartbeat'; import { TimeSeries } from '../../system/common/time-series/time-series'; import { QueuePublishedTimeSeries } from '../../system/app/producer/producer-time-series/queue-published-time-series'; @@ -13,7 +11,6 @@ import { GlobalDeadLetteredTimeSeries } from '../../system/app/consumer/consumer import { ConsumerAcknowledgedTimeSeries } from '../../system/app/consumer/consumer-time-series/consumer-acknowledged-time-series'; import { ConsumerDeadLetteredTimeSeries } from '../../system/app/consumer/consumer-time-series/consumer-dead-lettered-time-series'; import { consumerQueues } from '../../system/app/consumer/consumer-queues'; -import { setConfiguration } from '../../system/common/configuration/configuration'; import { Worker } from '../../system/common/worker/worker'; import { each, waterfall } from '../../system/lib/async'; @@ -292,15 +289,3 @@ export class WebsocketRateStreamWorker extends Worker { } export default WebsocketRateStreamWorker; - -process.on('message', (payload: string) => { - const params: TWorkerParameters = JSON.parse(payload); - setConfiguration(params.config); - RedisClient.getNewInstance((err, client) => { - if (err) throw err; - else if (!client) throw new EmptyCallbackReplyError(); - else { - new WebsocketRateStreamWorker(client, params, false).run(); - } - }); -}); diff --git a/src/system/common/worker/worker-runner/worker-runner.ts b/src/system/common/worker/worker-runner/worker-runner.ts index d18d0b9f..3ed8748f 100644 --- a/src/system/common/worker/worker-runner/worker-runner.ts +++ b/src/system/common/worker/worker-runner/worker-runner.ts @@ -1,4 +1,3 @@ -import { ChildProcess, fork } from 'child_process'; import { join } from 'path'; import { readdir } from 'fs'; import { @@ -8,7 +7,6 @@ import { TWorkerParameters, } from '../../../../../types'; import { PowerManager } from '../../power-manager/power-manager'; -import { WorkerRunnerError } from './worker-runner.error'; import { EventEmitter } from 'events'; import { Ticker } from '../../ticker/ticker'; import { LockManager } from '../../lock-manager/lock-manager'; @@ -29,10 +27,9 @@ export class WorkerRunner< private readonly powerManager: PowerManager; private readonly ticker: Ticker; private readonly lockManager: LockManager; - private readonly workerThreads: ChildProcess[] = []; private readonly redisClient: RedisClient; private readonly logger: ICompatibleLogger; - private readonly workerPool: WorkerPool | null = null; + private readonly workerPool: WorkerPool; private initialized = false; constructor( @@ -40,7 +37,7 @@ export class WorkerRunner< workersDir: string, keyLock: string, workerParameters: WorkerParameters, - workerPool?: WorkerPool, + workerPool: WorkerPool, ) { super(); this.powerManager = new PowerManager(); @@ -50,9 +47,7 @@ export class WorkerRunner< this.logger = getNamespacedLogger(this.constructor.name); this.lockManager = new LockManager(redisClient, keyLock, 10000, false); this.ticker = new Ticker(this.onTick, 1000); - if (workerPool) { - this.workerPool = workerPool; - } + this.workerPool = workerPool; } private onTick = (): void => { @@ -78,11 +73,6 @@ export class WorkerRunner< }); }; - private onProcessExit = (): void => { - this.workerThreads.forEach((i) => i.kill()); - if (this.workerPool) this.workerPool.clear(() => void 0); - }; - private init = (cb: ICallback): void => { readdir(this.workersDir, undefined, (err, reply) => { if (err) cb(err); @@ -91,11 +81,7 @@ export class WorkerRunner< reply ?? [], (filename: string, _, done) => { if (filename.match(/\.worker\.js$/)) { - if (this.workerPool) this.addToWorkerPool(filename, done); - else { - this.forkWorkerThread(filename); - done(); - } + this.addToWorkerPool(filename, done); } else done(); }, cb, @@ -136,36 +122,6 @@ export class WorkerRunner< .catch(cb); }; - private forkWorkerThread = (filename: string): void => { - const filepath = join(this.workersDir, filename); - const thread = fork(filepath); - thread.on('error', (err) => { - if (this.powerManager.isGoingUp() || this.powerManager.isRunning()) { - this.emit(events.ERROR, err); - } - }); - thread.on('exit', (code, signal) => { - if (this.powerManager.isGoingUp() || this.powerManager.isRunning()) { - this.emit( - events.ERROR, - new WorkerRunnerError( - `Thread [${filepath}] exited with code ${code} and signal ${signal}`, - ), - ); - } - }); - thread.send(JSON.stringify(this.workerParameters)); - this.workerThreads.push(thread); - }; - - private shutdownWorkerThreads = (cb: ICallback): void => { - const thread = this.workerThreads.pop(); - if (thread) { - thread.once('exit', () => this.shutdownWorkerThreads(cb)); - thread.kill('SIGHUP'); - } else cb(); - }; - private clearWorkerPool = (cb: ICallback): void => { if (this.workerPool) this.workerPool.clear(cb); else cb(); @@ -182,7 +138,6 @@ export class WorkerRunner< run = (): void => { this.powerManager.goingUp(); - process.once('exit', this.onProcessExit); this.ticker.nextTick(); this.powerManager.commit(); this.emit(events.UP); @@ -190,20 +145,11 @@ export class WorkerRunner< quit = (cb: ICallback): void => { this.powerManager.goingDown(); - waterfall( - [ - this.stopTicker, - this.shutdownWorkerThreads, - this.clearWorkerPool, - this.releaseLock, - ], - () => { - process.removeListener('exit', this.onProcessExit); - this.initialized = false; - this.powerManager.commit(); - this.emit(events.DOWN); - cb(); - }, - ); + waterfall([this.stopTicker, this.clearWorkerPool, this.releaseLock], () => { + this.initialized = false; + this.powerManager.commit(); + this.emit(events.DOWN); + cb(); + }); }; } diff --git a/src/system/workers/delay.worker.ts b/src/system/workers/delay.worker.ts index a7c5138f..2572ceab 100644 --- a/src/system/workers/delay.worker.ts +++ b/src/system/workers/delay.worker.ts @@ -1,11 +1,9 @@ import { RedisClient } from '../common/redis-client/redis-client'; import { redisKeys } from '../common/redis-keys/redis-keys'; import { ICallback, IConsumerWorkerParameters } from '../../../types'; -import { EmptyCallbackReplyError } from '../common/errors/empty-callback-reply.error'; import { Message } from '../app/message/message'; import { broker } from '../common/broker/broker'; import { Worker } from '../common/worker/worker'; -import { setConfiguration } from '../common/configuration/configuration'; import { each } from '../lib/async'; export class DelayWorker extends Worker { @@ -51,13 +49,3 @@ export class DelayWorker extends Worker { } export default DelayWorker; - -process.on('message', (payload: string) => { - const params: IConsumerWorkerParameters = JSON.parse(payload); - setConfiguration(params.config); - RedisClient.getNewInstance((err, client) => { - if (err) throw err; - else if (!client) throw new EmptyCallbackReplyError(); - else new DelayWorker(client, params, false).run(); - }); -}); diff --git a/src/system/workers/heartbeat-monitor.worker.ts b/src/system/workers/heartbeat-monitor.worker.ts index a10b5160..3357d545 100644 --- a/src/system/workers/heartbeat-monitor.worker.ts +++ b/src/system/workers/heartbeat-monitor.worker.ts @@ -1,9 +1,6 @@ import { ICallback, IConsumerWorkerParameters } from '../../../types'; import { ConsumerHeartbeat } from '../app/consumer/consumer-heartbeat'; -import { RedisClient } from '../common/redis-client/redis-client'; -import { EmptyCallbackReplyError } from '../common/errors/empty-callback-reply.error'; import { Worker } from '../common/worker/worker'; -import { setConfiguration } from '../common/configuration/configuration'; import { waterfall } from '../lib/async'; export class HeartbeatMonitorWorker extends Worker { @@ -26,13 +23,3 @@ export class HeartbeatMonitorWorker extends Worker { } export default HeartbeatMonitorWorker; - -process.on('message', (payload: string) => { - const params: IConsumerWorkerParameters = JSON.parse(payload); - setConfiguration(params.config); - RedisClient.getNewInstance((err, client) => { - if (err) throw err; - else if (!client) throw new EmptyCallbackReplyError(); - else new HeartbeatMonitorWorker(client, params, false).run(); - }); -}); diff --git a/src/system/workers/requeue.worker.ts b/src/system/workers/requeue.worker.ts index 952892c6..8638ca0b 100644 --- a/src/system/workers/requeue.worker.ts +++ b/src/system/workers/requeue.worker.ts @@ -2,10 +2,8 @@ import { RedisClient } from '../common/redis-client/redis-client'; import { redisKeys } from '../common/redis-keys/redis-keys'; import { Message } from '../app/message/message'; import { ICallback, IConsumerWorkerParameters } from '../../../types'; -import { EmptyCallbackReplyError } from '../common/errors/empty-callback-reply.error'; import { PanicError } from '../common/errors/panic.error'; import { Worker } from '../common/worker/worker'; -import { setConfiguration } from '../common/configuration/configuration'; import { each } from '../lib/async'; export class RequeueWorker extends Worker { @@ -67,13 +65,3 @@ export class RequeueWorker extends Worker { } export default RequeueWorker; - -process.on('message', (payload: string) => { - const params: IConsumerWorkerParameters = JSON.parse(payload); - setConfiguration(params.config); - RedisClient.getNewInstance((err, client) => { - if (err) throw err; - else if (!client) throw new EmptyCallbackReplyError(); - else new RequeueWorker(client, params, false).run(); - }); -}); diff --git a/src/system/workers/schedule.worker.ts b/src/system/workers/schedule.worker.ts index 47f9e73e..c9f10f0d 100644 --- a/src/system/workers/schedule.worker.ts +++ b/src/system/workers/schedule.worker.ts @@ -1,11 +1,9 @@ import { ICallback, IConsumerWorkerParameters } from '../../../types'; import { redisKeys } from '../common/redis-keys/redis-keys'; -import { RedisClient } from '../common/redis-client/redis-client'; import { EmptyCallbackReplyError } from '../common/errors/empty-callback-reply.error'; import { Message } from '../app/message/message'; import { ELuaScriptName } from '../common/redis-client/lua-scripts'; import { Worker } from '../common/worker/worker'; -import { setConfiguration } from '../common/configuration/configuration'; import { each, waterfall } from '../lib/async'; export class ScheduleWorker extends Worker { @@ -104,13 +102,3 @@ export class ScheduleWorker extends Worker { } export default ScheduleWorker; - -process.on('message', (payload: string) => { - const params: IConsumerWorkerParameters = JSON.parse(payload); - setConfiguration(params.config); - RedisClient.getNewInstance((err, client) => { - if (err) throw err; - else if (!client) throw new EmptyCallbackReplyError(); - else new ScheduleWorker(client, params, false).run(); - }); -}); diff --git a/src/system/workers/time-series.worker.ts b/src/system/workers/time-series.worker.ts index 86454b93..ffe3186f 100644 --- a/src/system/workers/time-series.worker.ts +++ b/src/system/workers/time-series.worker.ts @@ -4,9 +4,7 @@ import { IConsumerWorkerParameters, TQueueParams, } from '../../../types'; -import { EmptyCallbackReplyError } from '../common/errors/empty-callback-reply.error'; import { Worker } from '../common/worker/worker'; -import { setConfiguration } from '../common/configuration/configuration'; import { queueManager } from '../app/queue-manager/queue-manager'; import { eachOf, waterfall } from '../lib/async'; import { QueueAcknowledgedTimeSeries } from '../app/consumer/consumer-time-series/queue-acknowledged-time-series'; @@ -139,13 +137,3 @@ export class TimeSeriesWorker extends Worker { } export default TimeSeriesWorker; - -process.on('message', (payload: string) => { - const params: IConsumerWorkerParameters = JSON.parse(payload); - setConfiguration(params.config); - RedisClient.getNewInstance((err, client) => { - if (err) throw err; - else if (!client) throw new EmptyCallbackReplyError(); - else new TimeSeriesWorker(client, params, false).run(); - }); -});