Skip to content

Commit

Permalink
feat: allow running/sandboxing message handlers using worker threads
Browse files Browse the repository at this point in the history
  • Loading branch information
weyoss committed Jan 27, 2024
1 parent aad9010 commit 53095bd
Show file tree
Hide file tree
Showing 20 changed files with 426 additions and 217 deletions.
138 changes: 138 additions & 0 deletions src/lib/consumer/message-handler/consume-message-worker-thread.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright (c)
* Weyoss <weyoss@protonmail.com>
* https://github.com/weyoss
*
* This source code is licensed under the MIT license found in the LICENSE file
* in the root directory of this source tree.
*/

import {
isMainThread,
parentPort,
workerData, // type-coverage:ignore-line
MessagePort,
} from 'worker_threads';
import {
IMessageTransferable,
TConsumerMessageHandlerFn,
} from '../../../../types';

export enum EWorkerThreadMessageCodeExit {
WORKER_DATA_REQUIRED = 100,
INVALID_HANDLER_TYPE,
HANDLER_IMPORT_ERROR,
UNCAUGHT_EXCEPTION,
TERMINATED,
}

export enum EWorkerThreadMessageCodeConsume {
OK = 200,
MESSAGE_PROCESSING_ERROR,
MESSAGE_PROCESSING_CAUGHT_ERROR,
}

export type TWorkerThreadMessageCode =
| EWorkerThreadMessageCodeExit
| EWorkerThreadMessageCodeConsume;

export type TWorkerThreadError = {
name: string;
message: string;
};

export type TWorkerThreadMessage = {
code: TWorkerThreadMessageCode;
error: TWorkerThreadError | null;
};

function getHandlerFn(
filename: string,
cb: (fn: TConsumerMessageHandlerFn) => void,
) {
import(filename)
.then(
(
importedModule:
| { default?: TConsumerMessageHandlerFn }
| TConsumerMessageHandlerFn,
) => {
const fn =
typeof importedModule !== 'function' && importedModule.default
? importedModule.default
: importedModule;
if (typeof fn !== 'function') {
exit(EWorkerThreadMessageCodeExit.INVALID_HANDLER_TYPE);
} else cb(fn);
},
)
.catch((err: unknown) => {
console.error(err);
exit(EWorkerThreadMessageCodeExit.HANDLER_IMPORT_ERROR);
});
}

function formatMessage(
code: TWorkerThreadMessageCode,
err?: unknown,
): TWorkerThreadMessage {
const error =
err && err instanceof Error
? { name: err.name, message: err.message }
: null;
return {
code,
error,
};
}

function postMessage(
messagePort: MessagePort,
code: TWorkerThreadMessageCode,
err?: unknown,
) {
const msg = formatMessage(code, err);
messagePort.postMessage(msg);
}

function exit(code: TWorkerThreadMessageCode, err?: unknown) {
parentPort && postMessage(parentPort, code, err);
process.exit(code);
}

if (!isMainThread && parentPort) {
const messagePort: MessagePort = parentPort;

// type-coverage:ignore-next-line
if (!workerData) {
exit(EWorkerThreadMessageCodeExit.WORKER_DATA_REQUIRED);
}

getHandlerFn(workerData, (handlerFn) => {
messagePort.on('message', (msg: IMessageTransferable) => {
try {
handlerFn(msg, (err) => {
if (err) {
postMessage(
messagePort,
EWorkerThreadMessageCodeConsume.MESSAGE_PROCESSING_ERROR,
err,
);
} else {
postMessage(messagePort, EWorkerThreadMessageCodeConsume.OK);
}
});
} catch (err: unknown) {
postMessage(
messagePort,
EWorkerThreadMessageCodeConsume.MESSAGE_PROCESSING_CAUGHT_ERROR,
err,
);
}
});
});

process.on('uncaughtException', (err) => {
exit(EWorkerThreadMessageCodeExit.UNCAUGHT_EXCEPTION, err);
});
}
91 changes: 91 additions & 0 deletions src/lib/consumer/message-handler/consume-message-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c)
* Weyoss <weyoss@protonmail.com>
* https://github.com/weyoss
*
* This source code is licensed under the MIT license found in the LICENSE file
* in the root directory of this source tree.
*/

import path from 'path';
import { Worker } from 'worker_threads';
import { ICallback } from 'redis-smq-common';
import { IMessageParams } from '../../../../types';
import {
EWorkerThreadMessageCodeExit,
EWorkerThreadMessageCodeConsume,
TWorkerThreadMessage,
} from './consume-message-worker-thread';
import { ConsumerMessageHandlerWorkerError } from './errors';

export class ConsumeMessageWorker {
protected messageHandlerFilename;
protected messageHandlerThread: Worker | null = null;

constructor(messageHandlerFilename: string) {
this.messageHandlerFilename = messageHandlerFilename;
}

protected getMessageHandlerThread(): Worker {
if (!this.messageHandlerThread) {
this.messageHandlerThread = new Worker(
path.resolve(__dirname, './consume-message-worker-thread.js'),
{
workerData: this.messageHandlerFilename,
},
);
this.messageHandlerThread.on('messageerror', (err) => {
console.error(err);
});
this.messageHandlerThread.on('error', (err) => {
console.error(err);
});
this.messageHandlerThread.on('exit', () => {
this.messageHandlerThread = null;
});
}
return this.messageHandlerThread;
}

consume(message: IMessageParams, cb: ICallback<void>): void {
const worker = this.getMessageHandlerThread();

const cleanUp = () => {
worker
.removeListener('message', onMessage)
.removeListener('exit', onExit);
};

const onMessage = (msg: TWorkerThreadMessage) => {
cleanUp();
if (msg.code !== EWorkerThreadMessageCodeConsume.OK) {
console.error(`ConsumerMessageHandlerWorkerError`, msg);
cb(new ConsumerMessageHandlerWorkerError(msg));
} else cb();
};

const onExit = () => {
cleanUp();
const msg = {
code: EWorkerThreadMessageCodeExit.TERMINATED,
error: null,
};
console.error('ConsumerMessageHandlerWorkerError', msg);
cb(new ConsumerMessageHandlerWorkerError(msg));
};

worker.once('message', onMessage);
worker.once('exit', onExit);
worker.postMessage(message);
}

quit(cb: ICallback<void>) {
const callback = () => {
this.messageHandlerThread = null;
cb();
};
if (this.messageHandlerThread) {
this.messageHandlerThread.terminate().then(callback).catch(callback);
} else cb();
}
}
35 changes: 32 additions & 3 deletions src/lib/consumer/message-handler/consume-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import {
EConsumeMessageUnacknowledgedCause,
EMessageProperty,
EMessagePropertyStatus,
IMessageTransferable,
IQueueParsedParams,
TConsumerMessageHandler,
} from '../../../../types';
import { redisKeys } from '../../../common/redis-keys/redis-keys';
import { MessageHandler } from './message-handler';
Expand All @@ -26,7 +28,7 @@ import { processingQueue } from './processing-queue';
import { ERetryAction } from './retry-message';
import { ELuaScriptName } from '../../../common/redis-client/redis-client';
import { Configuration } from '../../../config/configuration';
import { _createConsumableMessage } from '../../message/_create-consumable-message';
import { ConsumeMessageWorker } from './consume-message-worker';

export class ConsumeMessage {
protected messageHandler: MessageHandler;
Expand All @@ -35,6 +37,7 @@ export class ConsumeMessage {
protected keyQueueProcessing: string;
protected keyQueueAcknowledged: string;
protected queue: IQueueParsedParams;
protected consumeMessageWorker: ConsumeMessageWorker | null = null;

constructor(
messageHandler: MessageHandler,
Expand Down Expand Up @@ -134,6 +137,25 @@ export class ConsumeMessage {
);
}

protected getConsumeMessageWorker(messageHandlerFilename: string) {
if (!this.consumeMessageWorker) {
this.consumeMessageWorker = new ConsumeMessageWorker(
messageHandlerFilename,
);
}
return this.consumeMessageWorker;
}

protected invokeMessageHandler(
messageHandler: TConsumerMessageHandler,
msg: IMessageTransferable,
cb: ICallback<void>,
): void {
if (typeof messageHandler === 'string') {
this.getConsumeMessageWorker(messageHandler).consume(msg, cb);
} else messageHandler(msg, cb);
}

protected consumeMessage(msg: MessageEnvelope): void {
let isTimeout = false;
let timer: NodeJS.Timeout | null = null;
Expand Down Expand Up @@ -173,8 +195,9 @@ export class ConsumeMessage {
}
}
};
this.messageHandler.getHandler()(
_createConsumableMessage(msg),
this.invokeMessageHandler(
this.messageHandler.getHandler(),
msg.transfer(),
onConsumed,
);
} catch (error: unknown) {
Expand All @@ -194,4 +217,10 @@ export class ConsumeMessage {
);
} else this.consumeMessage(message);
}

quit(cb: ICallback<void>) {
if (this.consumeMessageWorker) {
this.consumeMessageWorker.quit(cb);
} else cb();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c)
* Weyoss <weyoss@protonmail.com>
* https://github.com/weyoss
*
* This source code is licensed under the MIT license found in the LICENSE file
* in the root directory of this source tree.
*/

import { TWorkerThreadMessage } from '../consume-message-worker-thread';
import { ConsumerMessageHandlerError } from './consumer-message-handler.error';

export class ConsumerMessageHandlerWorkerError extends ConsumerMessageHandlerError {
constructor(msg: TWorkerThreadMessage) {
const { code, error } = msg;
super(
`Error code: ${code}.${
error ? `Message: ${error.name}: ${error.message}` : ''
}`,
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright (c)
* Weyoss <weyoss@protonmail.com>
* https://github.com/weyoss
*
* This source code is licensed under the MIT license found in the LICENSE file
* in the root directory of this source tree.
*/

import { ConsumerError } from '../../errors';

export class ConsumerMessageHandlerError extends ConsumerError {
constructor(msg?: string) {
super(msg);
}
}
11 changes: 11 additions & 0 deletions src/lib/consumer/message-handler/errors/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright (c)
* Weyoss <weyoss@protonmail.com>
* https://github.com/weyoss
*
* This source code is licensed under the MIT license found in the LICENSE file
* in the root directory of this source tree.
*/

export { ConsumerMessageHandlerError } from './consumer-message-handler.error';
export { ConsumerMessageHandlerWorkerError } from './consumer-message-handler-worker.error';
6 changes: 5 additions & 1 deletion src/lib/consumer/message-handler/message-handler-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@ export class MessageHandlerRunner {
client,
handlerParams,
);
handler.run(cb);
handler.run((err) => {
if (err)
this.removeMessageHandler(handlerParams.queue, () => cb(err));
else cb();
});
}
});
}
Expand Down
Loading

0 comments on commit 53095bd

Please sign in to comment.