From 6babb9e2f355feaf9bd1a8ed229c1001e6de7144 Mon Sep 17 00:00:00 2001
From: Rogger Valverde <rogger.valverde@uni.pe>
Date: Mon, 7 Oct 2024 09:53:34 -0600
Subject: [PATCH] fix(sandbox): catch exit errors (#2800)

---
 src/classes/child-pool.ts       |   5 +-
 src/classes/sandbox.ts          | 112 ++++++++++++++++++--------------
 tests/test_child-pool.ts        |  47 +++++++-------
 tests/test_sandboxed_process.ts |  63 ++++++++++++++++--
 4 files changed, 146 insertions(+), 81 deletions(-)

diff --git a/src/classes/child-pool.ts b/src/classes/child-pool.ts
index 61434c0c39..66147977aa 100644
--- a/src/classes/child-pool.ts
+++ b/src/classes/child-pool.ts
@@ -27,7 +27,7 @@ export class ChildPool {
     };
   }
 
-  async retain(processFile: string): Promise<Child> {
+  async retain(processFile: string, exitHandler: any): Promise<Child> {
     let child = this.getFree(processFile).pop();
 
     if (child) {
@@ -40,7 +40,8 @@ export class ChildPool {
       workerForkOptions: this.opts.workerForkOptions,
       workerThreadsOptions: this.opts.workerThreadsOptions,
     });
-    child.on('exit', this.remove.bind(this, child));
+
+    child.on('exit', exitHandler);
 
     try {
       await child.init();
diff --git a/src/classes/sandbox.ts b/src/classes/sandbox.ts
index c07e14cb93..e9793714d3 100644
--- a/src/classes/sandbox.ts
+++ b/src/classes/sandbox.ts
@@ -1,5 +1,6 @@
 import { ChildCommand, ParentCommand } from '../enums';
 import { ChildMessage } from '../interfaces';
+import { Child } from './child';
 import { ChildPool } from './child-pool';
 import { Job } from './job';
 
@@ -8,65 +9,78 @@ const sandbox = <T, R, N extends string>(
   childPool: ChildPool,
 ) => {
   return async function process(job: Job<T, R, N>, token?: string): Promise<R> {
-    const child = await childPool.retain(processFile);
+    let child: Child;
     let msgHandler: any;
     let exitHandler: any;
+    try {
+      const done: Promise<R> = new Promise((resolve, reject) => {
+        const initChild = async () => {
+          try {
+            exitHandler = (exitCode: any, signal: any) => {
+              reject(
+                new Error(
+                  'Unexpected exit code: ' + exitCode + ' signal: ' + signal,
+                ),
+              );
+            };
 
-    await child.send({
-      cmd: ChildCommand.Start,
-      job: job.asJSONSandbox(),
-      token,
-    });
+            child = await childPool.retain(processFile, exitHandler);
 
-    const done: Promise<R> = new Promise((resolve, reject) => {
-      msgHandler = async (msg: ChildMessage) => {
-        switch (msg.cmd) {
-          case ParentCommand.Completed:
-            resolve(msg.value);
-            break;
-          case ParentCommand.Failed:
-          case ParentCommand.Error: {
-            const err = new Error();
-            Object.assign(err, msg.value);
-            reject(err);
-            break;
-          }
-          case ParentCommand.Progress:
-            await job.updateProgress(msg.value);
-            break;
-          case ParentCommand.Log:
-            await job.log(msg.value);
-            break;
-          case ParentCommand.MoveToDelayed:
-            await job.moveToDelayed(msg.value?.timestamp, msg.value?.token);
-            break;
-          case ParentCommand.Update:
-            await job.updateData(msg.value);
-            break;
-        }
-      };
+            msgHandler = async (msg: ChildMessage) => {
+              switch (msg.cmd) {
+                case ParentCommand.Completed:
+                  resolve(msg.value);
+                  break;
+                case ParentCommand.Failed:
+                case ParentCommand.Error: {
+                  const err = new Error();
+                  Object.assign(err, msg.value);
+                  reject(err);
+                  break;
+                }
+                case ParentCommand.Progress:
+                  await job.updateProgress(msg.value);
+                  break;
+                case ParentCommand.Log:
+                  await job.log(msg.value);
+                  break;
+                case ParentCommand.MoveToDelayed:
+                  await job.moveToDelayed(
+                    msg.value?.timestamp,
+                    msg.value?.token,
+                  );
+                  break;
+                case ParentCommand.Update:
+                  await job.updateData(msg.value);
+                  break;
+              }
+            };
 
-      exitHandler = (exitCode: any, signal: any) => {
-        reject(
-          new Error('Unexpected exit code: ' + exitCode + ' signal: ' + signal),
-        );
-      };
+            child.on('message', msgHandler);
 
-      child.on('message', msgHandler);
-      child.on('exit', exitHandler);
-    });
+            child.send({
+              cmd: ChildCommand.Start,
+              job: job.asJSONSandbox(),
+              token,
+            });
+          } catch (error) {
+            reject(error);
+          }
+        };
+        initChild();
+      });
 
-    try {
       await done;
       return done;
     } finally {
-      child.off('message', msgHandler);
-      child.off('exit', exitHandler);
-
-      if (child.exitCode !== null || /SIG.*/.test(`${child.signalCode}`)) {
-        childPool.remove(child);
-      } else {
-        childPool.release(child);
+      if (child) {
+        child.off('message', msgHandler);
+        child.off('exit', exitHandler);
+        if (child.exitCode !== null || /SIG.*/.test(`${child.signalCode}`)) {
+          childPool.remove(child);
+        } else {
+          childPool.release(child);
+        }
       }
     }
   };
diff --git a/tests/test_child-pool.ts b/tests/test_child-pool.ts
index 99ec80064d..c53769a447 100644
--- a/tests/test_child-pool.ts
+++ b/tests/test_child-pool.ts
@@ -2,6 +2,7 @@ import { expect } from 'chai';
 import { ChildPool } from '../src/classes';
 import { join } from 'path';
 
+const NoopProc = () => {};
 describe('Child pool for Child Processes', () => {
   sandboxProcessTests();
 });
@@ -32,70 +33,70 @@ function sandboxProcessTests(
 
     it('should return same child if free', async () => {
       const processor = __dirname + '/fixtures/fixture_processor_bar.js';
-      const child = await pool.retain(processor);
+      const child = await pool.retain(processor, NoopProc);
       expect(child).to.be.ok;
       pool.release(child);
       expect(pool.retained).to.be.empty;
-      const newChild = await pool.retain(processor);
+      const newChild = await pool.retain(processor, NoopProc);
       expect(child).to.be.eql(newChild);
     });
 
     it('should return a new child if reused the last free one', async () => {
       const processor = __dirname + '/fixtures/fixture_processor_bar.js';
-      let child = await pool.retain(processor);
+      let child = await pool.retain(processor, NoopProc);
       expect(child).to.be.ok;
       pool.release(child);
       expect(pool.retained).to.be.empty;
-      let newChild = await pool.retain(processor);
+      let newChild = await pool.retain(processor, NoopProc);
       expect(child).to.be.eql(newChild);
       child = newChild;
-      newChild = await pool.retain(processor);
+      newChild = await pool.retain(processor, NoopProc);
       expect(child).not.to.be.eql(newChild);
     });
 
     it('should return a new child if none free', async () => {
       const processor = __dirname + '/fixtures/fixture_processor_bar.js';
-      const child = await pool.retain(processor);
+      const child = await pool.retain(processor, NoopProc);
       expect(child).to.be.ok;
       expect(pool.retained).not.to.be.empty;
-      const newChild = await pool.retain(processor);
+      const newChild = await pool.retain(processor, NoopProc);
       expect(child).to.not.be.eql(newChild);
     });
 
     it('should return a new child if killed', async () => {
       const processor = __dirname + '/fixtures/fixture_processor_bar.js';
-      const child = await pool.retain(processor);
+      const child = await pool.retain(processor, NoopProc);
       expect(child).to.be.ok;
       await pool.kill(child);
       expect(pool.retained).to.be.empty;
-      const newChild = await pool.retain(processor);
+      const newChild = await pool.retain(processor, NoopProc);
       expect(child).to.not.be.eql(newChild);
     });
 
     it('should return a new child if many retained and none free', async () => {
       const processor = __dirname + '/fixtures/fixture_processor_bar.js';
       const children = await Promise.all([
-        pool.retain(processor),
-        pool.retain(processor),
-        pool.retain(processor),
-        pool.retain(processor),
-        pool.retain(processor),
-        pool.retain(processor),
+        pool.retain(processor, NoopProc),
+        pool.retain(processor, NoopProc),
+        pool.retain(processor, NoopProc),
+        pool.retain(processor, NoopProc),
+        pool.retain(processor, NoopProc),
+        pool.retain(processor, NoopProc),
       ]);
       expect(children).to.have.length(6);
-      const child = await pool.retain(processor);
+      const child = await pool.retain(processor, NoopProc);
       expect(children).not.to.include(child);
     }).timeout(10000);
 
     it('should return an old child if many retained and one free', async () => {
       const processor = __dirname + '/fixtures/fixture_processor_bar.js';
       const children = await Promise.all([
-        pool.retain(processor),
-        pool.retain(processor),
-        pool.retain(processor),
-        pool.retain(processor),
-        pool.retain(processor),
-        pool.retain(processor),
+        pool.retain(processor, NoopProc),
+        pool.retain(processor, NoopProc),
+        pool.retain(processor, NoopProc),
+        pool.retain(processor, NoopProc),
+        pool.retain(processor, NoopProc),
+        pool.retain(processor, NoopProc),
       ]);
 
       expect(children).to.have.length(6);
@@ -108,7 +109,7 @@ function sandboxProcessTests(
       const processor = __dirname + '/fixtures/fixture_processor_bar.js';
       process.execArgv.push('--no-warnings');
 
-      const child = await pool.retain(processor);
+      const child = await pool.retain(processor, NoopProc);
       expect(child).to.be.ok;
       if (!useWorkerThreads) {
         expect(child.childProcess.spawnargs).to.include('--no-warnings');
diff --git a/tests/test_sandboxed_process.ts b/tests/test_sandboxed_process.ts
index da6e13e015..4cf155dafb 100644
--- a/tests/test_sandboxed_process.ts
+++ b/tests/test_sandboxed_process.ts
@@ -77,6 +77,44 @@ describe('Sandboxed process using child processes', () => {
 
       await worker.close();
     });
+
+    it('should allow to pass workerForkOptions with timeout', async function () {
+      const processFile = __dirname + '/fixtures/fixture_processor.js';
+
+      const workerForkOptions = {
+        timeout: 50,
+      } as any;
+      const worker = new Worker(queueName, processFile, {
+        autorun: false,
+        connection,
+        prefix,
+        drainDelay: 1,
+        useWorkerThreads: false,
+        workerForkOptions,
+      });
+
+      const failing = new Promise<void>((resolve, reject) => {
+        worker.on('failed', async (job, error) => {
+          try {
+            expect([
+              'Unexpected exit code: null signal: SIGTERM',
+              'Unexpected exit code: 0 signal: null',
+            ]).to.include(error.message);
+            resolve();
+          } catch (err) {
+            reject(err);
+          }
+        });
+      });
+
+      await queue.add('test', { foo: 'bar' });
+
+      worker.run();
+
+      await failing;
+
+      await worker.close();
+    });
   });
 });
 
@@ -1019,12 +1057,20 @@ function sandboxProcessTests(
         useWorkerThreads,
       });
 
-      const job = await queue.add('test', { exitCode: 1 });
+      const failing = new Promise<void>((resolve, reject) => {
+        worker.on('failed', async (job, error) => {
+          try {
+            expect(error.message).to.be.equal('Broken file processor');
+            resolve();
+          } catch (err) {
+            reject(err);
+          }
+        });
+      });
 
-      await expect(job.waitUntilFinished(queueEvents)).to.be.rejectedWith(
-        'Broken file processor',
-      );
+      await queue.add('test', { exitCode: 1 });
 
+      await failing;
       await worker.close();
     });
 
@@ -1050,7 +1096,7 @@ function sandboxProcessTests(
       });
     });
 
-    it('should remove exited process', async () => {
+    it('should release exited process', async () => {
       const processFile = __dirname + '/fixtures/fixture_processor_exit.js';
 
       const worker = new Worker(queueName, processFile, {
@@ -1072,7 +1118,7 @@ function sandboxProcessTests(
             expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf(
               0,
             );
-            expect(worker['childPool'].getAllFree()).to.have.lengthOf(0);
+            expect(worker['childPool'].getAllFree()).to.have.lengthOf(1);
             resolve();
           } catch (err) {
             reject(err);
@@ -1097,7 +1143,10 @@ function sandboxProcessTests(
       });
 
       // acquire and release a child here so we know it has it's full termination handler setup
-      const initializedChild = await worker['childPool'].retain(processFile);
+      const initializedChild = await worker['childPool'].retain(
+        processFile,
+        () => {},
+      );
       await worker['childPool'].release(initializedChild);
 
       // await this After we've added the job