diff --git a/.husky/pre-commit b/.husky/pre-commit index 48087ddca7..336cbd628b 100755 --- a/.husky/pre-commit +++ b/.husky/pre-commit @@ -1,4 +1,4 @@ #!/usr/bin/env sh . "$(dirname -- "$0")/_/husky.sh" -npx npm-run-all pretty:quick lint:staged +yarn npm-run-all pretty:quick lint:staged diff --git a/src/classes/child-pool.ts b/src/classes/child-pool.ts index 73abf2adba..61434c0c39 100644 --- a/src/classes/child-pool.ts +++ b/src/classes/child-pool.ts @@ -1,11 +1,11 @@ import * as path from 'path'; import { Child } from './child'; +import { SandboxedOptions } from '../interfaces'; const CHILD_KILL_TIMEOUT = 30_000; -interface ChildPoolOpts { +interface ChildPoolOpts extends SandboxedOptions { mainFile?: string; - useWorkerThreads?: boolean; } export class ChildPool { @@ -16,8 +16,15 @@ export class ChildPool { constructor({ mainFile = path.join(process.cwd(), 'dist/cjs/classes/main.js'), useWorkerThreads, + workerForkOptions, + workerThreadsOptions, }: ChildPoolOpts) { - this.opts = { mainFile, useWorkerThreads }; + this.opts = { + mainFile, + useWorkerThreads, + workerForkOptions, + workerThreadsOptions, + }; } async retain(processFile: string): Promise { @@ -30,6 +37,8 @@ export class ChildPool { child = new Child(this.opts.mainFile, processFile, { useWorkerThreads: this.opts.useWorkerThreads, + workerForkOptions: this.opts.workerForkOptions, + workerThreadsOptions: this.opts.workerThreadsOptions, }); child.on('exit', this.remove.bind(this, child)); diff --git a/src/classes/child.ts b/src/classes/child.ts index 5804855e46..45413409ba 100644 --- a/src/classes/child.ts +++ b/src/classes/child.ts @@ -1,7 +1,8 @@ import { ChildProcess, fork } from 'child_process'; import { AddressInfo, createServer } from 'net'; -import { Worker, WorkerOptions as WorkerThreadsOptions } from 'worker_threads'; +import { Worker } from 'worker_threads'; import { ChildCommand, ParentCommand } from '../enums'; +import { SandboxedOptions } from '../interfaces'; import { EventEmitter } from 'events'; /** @@ -40,11 +41,8 @@ export class Child extends EventEmitter { constructor( private mainFile: string, public processFile: string, - private opts: { - useWorkerThreads: boolean; - workerThreadsOptions?: WorkerThreadsOptions; - } = { - useWorkerThreads: false + private opts: SandboxedOptions = { + useWorkerThreads: false, }, ) { super(); @@ -86,12 +84,15 @@ export class Child extends EventEmitter { stdin: true, stdout: true, stderr: true, - ...(this.opts.workerThreadsOptions ? this.opts.workerThreadsOptions : {}) + ...(this.opts.workerThreadsOptions + ? this.opts.workerThreadsOptions + : {}), }); } else { this.childProcess = parent = fork(this.mainFile, [], { execArgv, stdio: 'pipe', + ...(this.opts.workerForkOptions ? this.opts.workerForkOptions : {}), }); } diff --git a/src/classes/worker.ts b/src/classes/worker.ts index 881a25ef9a..7c96552956 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -287,6 +287,8 @@ export class Worker< this.childPool = new ChildPool({ mainFile: mainFilePath, useWorkerThreads: this.opts.useWorkerThreads, + workerForkOptions: this.opts.workerForkOptions, + workerThreadsOptions: this.opts.workerThreadsOptions, }); this.processFn = sandbox( diff --git a/src/interfaces/index.ts b/src/interfaces/index.ts index 65dfed0396..132c9bd6ed 100644 --- a/src/interfaces/index.ts +++ b/src/interfaces/index.ts @@ -22,4 +22,5 @@ export * from './repeatable-options'; export * from './repeat-options'; export * from './sandboxed-job-processor'; export * from './sandboxed-job'; +export * from './sandboxed-options'; export * from './worker-options'; diff --git a/src/interfaces/sandboxed-options.ts b/src/interfaces/sandboxed-options.ts new file mode 100644 index 0000000000..1dbca7e64c --- /dev/null +++ b/src/interfaces/sandboxed-options.ts @@ -0,0 +1,29 @@ +import { ForkOptions } from 'child_process'; +import { WorkerOptions as WorkerThreadsOptions } from 'worker_threads'; + +export interface SandboxedOptions { + /** + * Use Worker Threads instead of Child Processes. + * Note: This option can only be used when specifying + * a file for the processor argument. + * + * @default false + */ + useWorkerThreads?: boolean; + + /** + * Support passing Worker Fork Options. + * Note: This option can only be used when specifying + * a file for the processor argument and useWorkerThreads is passed as false (default value). + * @see {@link https://nodejs.org/api/child_process.html#child_processforkmodulepath-args-options} + */ + workerForkOptions?: ForkOptions; + + /** + * Support passing Worker Threads Options. + * Note: This option can only be used when specifying + * a file for the processor argument and useWorkerThreads is passed as true. + * @see {@link https://nodejs.org/api/worker_threads.html#new-workerfilename-options} + */ + workerThreadsOptions?: WorkerThreadsOptions; +} diff --git a/src/interfaces/worker-options.ts b/src/interfaces/worker-options.ts index 5cfb94d93a..73324e145d 100644 --- a/src/interfaces/worker-options.ts +++ b/src/interfaces/worker-options.ts @@ -1,10 +1,10 @@ -import { WorkerOptions as WorkerThreadsOptions } from 'worker_threads'; import { Job } from '../classes/job'; import { AdvancedOptions } from './advanced-options'; import { QueueBaseOptions } from './queue-options'; import { RateLimiterOptions } from './rate-limiter-options'; import { MetricsOptions } from './metrics-options'; import { KeepJobs } from './keep-jobs'; +import { SandboxedOptions } from './sandboxed-options'; /** * An async function that receives `Job`s and handles them. @@ -14,7 +14,7 @@ export type Processor = ( token?: string, ) => Promise; -export interface WorkerOptions extends QueueBaseOptions { +export interface WorkerOptions extends QueueBaseOptions, SandboxedOptions { /** * Optional worker name. The name will be stored on every job * processed by this worker instance, and can be used to monitor @@ -136,23 +136,6 @@ export interface WorkerOptions extends QueueBaseOptions { * More advanced options. */ settings?: AdvancedOptions; - - /** - * Use Worker Threads instead of Child Processes. - * Note: This option can only be used when specifying - * a file for the processor argument. - * - * @default false - */ - useWorkerThreads?: boolean; - - /** - * Support passing Worker Threads Options. - * Note: This option can only be used when specifying - * a file for the processor argument and useWorkerThreads is passed as true. - * @see {@link https://nodejs.org/api/worker_threads.html#new-workerfilename-options} - */ - workerThreadsOptions?: WorkerThreadsOptions; } export interface GetNextJobOptions { diff --git a/tests/test_sandboxed_process.ts b/tests/test_sandboxed_process.ts index 8f1fe238fb..da6e13e015 100644 --- a/tests/test_sandboxed_process.ts +++ b/tests/test_sandboxed_process.ts @@ -11,6 +11,73 @@ const { stdout, stderr } = require('test-console'); describe('Sandboxed process using child processes', () => { sandboxProcessTests(); + + describe('custom cases', () => { + const redisHost = process.env.REDIS_HOST || 'localhost'; + const prefix = process.env.BULLMQ_TEST_PREFIX || 'bull'; + let queue: Queue; + let queueEvents: QueueEvents; + let queueName: string; + + let connection; + before(async function () { + connection = new IORedis(redisHost, { maxRetriesPerRequest: null }); + }); + + beforeEach(async function () { + queueName = `test-${v4()}`; + queue = new Queue(queueName, { connection, prefix }); + queueEvents = new QueueEvents(queueName, { connection, prefix }); + await queueEvents.waitUntilReady(); + }); + + afterEach(async function () { + await queue.close(); + await queueEvents.close(); + await removeAllQueueData(new IORedis(), queueName); + }); + + afterAll(async function () { + await connection.quit(); + }); + + it('should allow to pass workerForkOptions', async function () { + const processFile = __dirname + '/fixtures/fixture_processor.js'; + + const worker = new Worker(queueName, processFile, { + connection, + prefix, + drainDelay: 1, + useWorkerThreads: false, + workerForkOptions: { + serialization: 'advanced', + }, + }); + + const completing = new Promise((resolve, reject) => { + worker.on('completed', async (job: Job, value: any) => { + try { + expect(job.returnvalue).to.be.eql(42); + expect(job.data).to.be.eql({ foo: 'bar' }); + expect(value).to.be.eql(42); + expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf( + 0, + ); + expect(worker['childPool'].free[processFile]).to.have.lengthOf(1); + resolve(); + } catch (err) { + reject(err); + } + }); + }); + + await queue.add('test', { foo: 'bar' }); + + await completing; + + await worker.close(); + }); + }); }); describe('Sandboxed process using worker threads', () => { @@ -55,9 +122,9 @@ describe('Sandboxed process using worker threads', () => { useWorkerThreads: true, workerThreadsOptions: { resourceLimits: { - maxOldGenerationSizeMb: 1 - } - } + maxOldGenerationSizeMb: 10, + }, + }, }); const completing = new Promise((resolve, reject) => {