From 29b548125981153628f2f27211e7700bcdb5a890 Mon Sep 17 00:00:00 2001 From: Kerkesni Date: Thu, 10 Oct 2024 16:38:51 +0200 Subject: [PATCH] Shutdown when the logReader batch processing is stuck 7.x fix was not forward ported to avoid introducing memory leaks into the code. Instead of just unblocking the next batch processing call and leaving the stuck one alive, now we shutdown the whole process. 7.x fix commit: dfcc2f5786ab6da8bd3e2fc893d897aed0a77fe8 Issue: BB-526 --- lib/queuePopulator/LogReader.js | 12 +++++++ .../unit/lib/queuePopulator/LogReader.spec.js | 32 +++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/lib/queuePopulator/LogReader.js b/lib/queuePopulator/LogReader.js index 4189cdfea..0beacfe27 100644 --- a/lib/queuePopulator/LogReader.js +++ b/lib/queuePopulator/LogReader.js @@ -52,6 +52,8 @@ class LogReader { this._metricsHandler = params.metricsHandler; // TODO: use a common handler for zk metrics from all extensions this._zkMetricsHandler = params.zkMetricsHandler; + + this.batchTimeoutSeconds = parseInt(process.env.BATCH_TIMEOUT_SECONDS, 10) || 300; } _setEntryBatch(entryBatch) { @@ -291,6 +293,15 @@ class LogReader { timeoutMs: params.timeoutMs, logger: this.log.newRequestLogger(), }; + // When using the RaftLogReader log consumer, The batch + // processing can get stuck sometimes for unknown reasons. + // As a temporary fix we set a timeout to kill the process. + const batchTimeoutTimer = setTimeout(() => { + this.log.error('queue populator batch timeout', { + logStats: batchState.logStats, + }); + process.emit('SIGTERM'); + }, this.batchTimeoutSeconds * 1000); async.waterfall([ next => this._processReadRecords(params, batchState, next), next => this._processPrepareEntries(batchState, next), @@ -299,6 +310,7 @@ class LogReader { next => this._processSaveLogOffset(batchState, next), ], err => { + clearTimeout(batchTimeoutTimer); if (err) { return done(err); } diff --git a/tests/unit/lib/queuePopulator/LogReader.spec.js b/tests/unit/lib/queuePopulator/LogReader.spec.js index 638878987..a374dd54a 100644 --- a/tests/unit/lib/queuePopulator/LogReader.spec.js +++ b/tests/unit/lib/queuePopulator/LogReader.spec.js @@ -296,5 +296,37 @@ describe('LogReader', () => { }); }); }); + + it('should shutdown when batch processing is stuck', done => { + logReader.batchTimeoutSeconds = 1; + // logReader will become stuck as _processReadRecords will never + // call the callback + sinon.stub(logReader, '_processReadRecords').returns(); + let emmitted = false; + process.once('SIGTERM', () => { + emmitted = true; + }); + logReader.processLogEntries({}, () => {}); + setTimeout(() => { + assert.strictEqual(emmitted, true); + done(); + }, 2000); + }).timeout(4000); + + it('should not shutdown if timeout not reached', done => { + sinon.stub(logReader, '_processReadRecords').yields(); + sinon.stub(logReader, '_processPrepareEntries').yields(); + sinon.stub(logReader, '_processFilterEntries').yields(); + sinon.stub(logReader, '_processPublishEntries').yields(); + sinon.stub(logReader, '_processSaveLogOffset').yields(); + let emmitted = false; + process.once('SIGTERM', () => { + emmitted = true; + }); + logReader.processLogEntries({}, () => { + assert.strictEqual(emmitted, false); + done(); + }); + }); }); });