Skip to content

Commit

Permalink
feat(worker-fork): allow passing fork options (#2795)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Oct 1, 2024
1 parent 626d9cd commit f7a4292
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 33 deletions.
2 changes: 1 addition & 1 deletion .husky/pre-commit
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env sh
. "$(dirname -- "$0")/_/husky.sh"

npx npm-run-all pretty:quick lint:staged
yarn npm-run-all pretty:quick lint:staged
15 changes: 12 additions & 3 deletions src/classes/child-pool.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import * as path from 'path';
import { Child } from './child';
import { SandboxedOptions } from '../interfaces';

const CHILD_KILL_TIMEOUT = 30_000;

interface ChildPoolOpts {
interface ChildPoolOpts extends SandboxedOptions {
mainFile?: string;
useWorkerThreads?: boolean;
}

export class ChildPool {
Expand All @@ -16,8 +16,15 @@ export class ChildPool {
constructor({
mainFile = path.join(process.cwd(), 'dist/cjs/classes/main.js'),
useWorkerThreads,
workerForkOptions,
workerThreadsOptions,
}: ChildPoolOpts) {
this.opts = { mainFile, useWorkerThreads };
this.opts = {
mainFile,
useWorkerThreads,
workerForkOptions,
workerThreadsOptions,
};
}

async retain(processFile: string): Promise<Child> {
Expand All @@ -30,6 +37,8 @@ export class ChildPool {

child = new Child(this.opts.mainFile, processFile, {
useWorkerThreads: this.opts.useWorkerThreads,
workerForkOptions: this.opts.workerForkOptions,
workerThreadsOptions: this.opts.workerThreadsOptions,
});
child.on('exit', this.remove.bind(this, child));

Expand Down
15 changes: 8 additions & 7 deletions src/classes/child.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { ChildProcess, fork } from 'child_process';
import { AddressInfo, createServer } from 'net';
import { Worker, WorkerOptions as WorkerThreadsOptions } from 'worker_threads';
import { Worker } from 'worker_threads';
import { ChildCommand, ParentCommand } from '../enums';
import { SandboxedOptions } from '../interfaces';
import { EventEmitter } from 'events';

/**
Expand Down Expand Up @@ -40,11 +41,8 @@ export class Child extends EventEmitter {
constructor(
private mainFile: string,
public processFile: string,
private opts: {
useWorkerThreads: boolean;
workerThreadsOptions?: WorkerThreadsOptions;
} = {
useWorkerThreads: false
private opts: SandboxedOptions = {
useWorkerThreads: false,
},
) {
super();
Expand Down Expand Up @@ -86,12 +84,15 @@ export class Child extends EventEmitter {
stdin: true,
stdout: true,
stderr: true,
...(this.opts.workerThreadsOptions ? this.opts.workerThreadsOptions : {})
...(this.opts.workerThreadsOptions
? this.opts.workerThreadsOptions
: {}),
});
} else {
this.childProcess = parent = fork(this.mainFile, [], {
execArgv,
stdio: 'pipe',
...(this.opts.workerForkOptions ? this.opts.workerForkOptions : {}),
});
}

Expand Down
2 changes: 2 additions & 0 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ export class Worker<
this.childPool = new ChildPool({
mainFile: mainFilePath,
useWorkerThreads: this.opts.useWorkerThreads,
workerForkOptions: this.opts.workerForkOptions,
workerThreadsOptions: this.opts.workerThreadsOptions,
});

this.processFn = sandbox<DataType, ResultType, NameType>(
Expand Down
1 change: 1 addition & 0 deletions src/interfaces/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ export * from './repeatable-options';
export * from './repeat-options';
export * from './sandboxed-job-processor';
export * from './sandboxed-job';
export * from './sandboxed-options';
export * from './worker-options';
29 changes: 29 additions & 0 deletions src/interfaces/sandboxed-options.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { ForkOptions } from 'child_process';
import { WorkerOptions as WorkerThreadsOptions } from 'worker_threads';

export interface SandboxedOptions {
/**
* Use Worker Threads instead of Child Processes.
* Note: This option can only be used when specifying
* a file for the processor argument.
*
* @default false
*/
useWorkerThreads?: boolean;

/**
* Support passing Worker Fork Options.
* Note: This option can only be used when specifying
* a file for the processor argument and useWorkerThreads is passed as false (default value).
* @see {@link https://nodejs.org/api/child_process.html#child_processforkmodulepath-args-options}
*/
workerForkOptions?: ForkOptions;

/**
* Support passing Worker Threads Options.
* Note: This option can only be used when specifying
* a file for the processor argument and useWorkerThreads is passed as true.
* @see {@link https://nodejs.org/api/worker_threads.html#new-workerfilename-options}
*/
workerThreadsOptions?: WorkerThreadsOptions;
}
21 changes: 2 additions & 19 deletions src/interfaces/worker-options.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { WorkerOptions as WorkerThreadsOptions } from 'worker_threads';
import { Job } from '../classes/job';
import { AdvancedOptions } from './advanced-options';
import { QueueBaseOptions } from './queue-options';
import { RateLimiterOptions } from './rate-limiter-options';
import { MetricsOptions } from './metrics-options';
import { KeepJobs } from './keep-jobs';
import { SandboxedOptions } from './sandboxed-options';

/**
* An async function that receives `Job`s and handles them.
Expand All @@ -14,7 +14,7 @@ export type Processor<T = any, R = any, N extends string = string> = (
token?: string,
) => Promise<R>;

export interface WorkerOptions extends QueueBaseOptions {
export interface WorkerOptions extends QueueBaseOptions, SandboxedOptions {
/**
* Optional worker name. The name will be stored on every job
* processed by this worker instance, and can be used to monitor
Expand Down Expand Up @@ -136,23 +136,6 @@ export interface WorkerOptions extends QueueBaseOptions {
* More advanced options.
*/
settings?: AdvancedOptions;

/**
* Use Worker Threads instead of Child Processes.
* Note: This option can only be used when specifying
* a file for the processor argument.
*
* @default false
*/
useWorkerThreads?: boolean;

/**
* Support passing Worker Threads Options.
* Note: This option can only be used when specifying
* a file for the processor argument and useWorkerThreads is passed as true.
* @see {@link https://nodejs.org/api/worker_threads.html#new-workerfilename-options}
*/
workerThreadsOptions?: WorkerThreadsOptions;
}

export interface GetNextJobOptions {
Expand Down
73 changes: 70 additions & 3 deletions tests/test_sandboxed_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,73 @@ const { stdout, stderr } = require('test-console');

describe('Sandboxed process using child processes', () => {
sandboxProcessTests();

describe('custom cases', () => {
const redisHost = process.env.REDIS_HOST || 'localhost';
const prefix = process.env.BULLMQ_TEST_PREFIX || 'bull';
let queue: Queue;
let queueEvents: QueueEvents;
let queueName: string;

let connection;
before(async function () {
connection = new IORedis(redisHost, { maxRetriesPerRequest: null });
});

beforeEach(async function () {
queueName = `test-${v4()}`;
queue = new Queue(queueName, { connection, prefix });
queueEvents = new QueueEvents(queueName, { connection, prefix });
await queueEvents.waitUntilReady();
});

afterEach(async function () {
await queue.close();
await queueEvents.close();
await removeAllQueueData(new IORedis(), queueName);
});

afterAll(async function () {
await connection.quit();
});

it('should allow to pass workerForkOptions', async function () {
const processFile = __dirname + '/fixtures/fixture_processor.js';

const worker = new Worker(queueName, processFile, {
connection,
prefix,
drainDelay: 1,
useWorkerThreads: false,
workerForkOptions: {
serialization: 'advanced',
},
});

const completing = new Promise<void>((resolve, reject) => {
worker.on('completed', async (job: Job, value: any) => {
try {
expect(job.returnvalue).to.be.eql(42);
expect(job.data).to.be.eql({ foo: 'bar' });
expect(value).to.be.eql(42);
expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf(
0,
);
expect(worker['childPool'].free[processFile]).to.have.lengthOf(1);
resolve();
} catch (err) {
reject(err);
}
});
});

await queue.add('test', { foo: 'bar' });

await completing;

await worker.close();
});
});
});

describe('Sandboxed process using worker threads', () => {
Expand Down Expand Up @@ -55,9 +122,9 @@ describe('Sandboxed process using worker threads', () => {
useWorkerThreads: true,
workerThreadsOptions: {
resourceLimits: {
maxOldGenerationSizeMb: 1
}
}
maxOldGenerationSizeMb: 10,
},
},
});

const completing = new Promise<void>((resolve, reject) => {
Expand Down

0 comments on commit f7a4292

Please sign in to comment.