From 52d39d79c8ef558eab7332eb35a09e17e6b0c75f Mon Sep 17 00:00:00 2001 From: Kerkesni Date: Thu, 10 Oct 2024 16:38:51 +0200 Subject: [PATCH 1/3] Shutdown when the logReader batch processing is stuck 7.x fix was not forward ported to avoid introducing memory leaks into the code. Instead of just unblocking the next batch processing call and leaving the stuck one alive, now we shutdown the whole process. 7.x fix commit: dfcc2f5786ab6da8bd3e2fc893d897aed0a77fe8 Issue: BB-526 --- lib/queuePopulator/LogReader.js | 12 +++++++ .../unit/lib/queuePopulator/LogReader.spec.js | 32 +++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/lib/queuePopulator/LogReader.js b/lib/queuePopulator/LogReader.js index 4189cdfea..750fb9989 100644 --- a/lib/queuePopulator/LogReader.js +++ b/lib/queuePopulator/LogReader.js @@ -52,6 +52,8 @@ class LogReader { this._metricsHandler = params.metricsHandler; // TODO: use a common handler for zk metrics from all extensions this._zkMetricsHandler = params.zkMetricsHandler; + + this._batchTimeoutSeconds = parseInt(process.env.BATCH_TIMEOUT_SECONDS, 10) || 300; } _setEntryBatch(entryBatch) { @@ -291,6 +293,15 @@ class LogReader { timeoutMs: params.timeoutMs, logger: this.log.newRequestLogger(), }; + // When using the RaftLogReader log consumer, The batch + // processing can get stuck sometimes for unknown reasons. + // As a temporary fix we set a timeout to kill the process. + const batchTimeoutTimer = setTimeout(() => { + this.log.error('queue populator batch timeout', { + logStats: batchState.logStats, + }); + process.emit('SIGTERM'); + }, this._batchTimeoutSeconds * 1000); async.waterfall([ next => this._processReadRecords(params, batchState, next), next => this._processPrepareEntries(batchState, next), @@ -299,6 +310,7 @@ class LogReader { next => this._processSaveLogOffset(batchState, next), ], err => { + clearTimeout(batchTimeoutTimer); if (err) { return done(err); } diff --git a/tests/unit/lib/queuePopulator/LogReader.spec.js b/tests/unit/lib/queuePopulator/LogReader.spec.js index 638878987..cf50b98f1 100644 --- a/tests/unit/lib/queuePopulator/LogReader.spec.js +++ b/tests/unit/lib/queuePopulator/LogReader.spec.js @@ -296,5 +296,37 @@ describe('LogReader', () => { }); }); }); + + it('should shutdown when batch processing is stuck', done => { + logReader._batchTimeoutSeconds = 1; + // logReader will become stuck as _processReadRecords will never + // call the callback + sinon.stub(logReader, '_processReadRecords').returns(); + let emmitted = false; + process.once('SIGTERM', () => { + emmitted = true; + }); + logReader.processLogEntries({}, () => {}); + setTimeout(() => { + assert.strictEqual(emmitted, true); + done(); + }, 2000); + }).timeout(4000); + + it('should not shutdown if timeout not reached', done => { + sinon.stub(logReader, '_processReadRecords').yields(); + sinon.stub(logReader, '_processPrepareEntries').yields(); + sinon.stub(logReader, '_processFilterEntries').yields(); + sinon.stub(logReader, '_processPublishEntries').yields(); + sinon.stub(logReader, '_processSaveLogOffset').yields(); + let emmitted = false; + process.once('SIGTERM', () => { + emmitted = true; + }); + logReader.processLogEntries({}, () => { + assert.strictEqual(emmitted, false); + done(); + }); + }); }); }); From 9d1905c5deba39467d331a151b96308dd3f9aaff Mon Sep 17 00:00:00 2001 From: Kerkesni Date: Mon, 14 Oct 2024 15:13:09 +0200 Subject: [PATCH 2/3] fail healthcheck when batch processing gets stuck Issue: BB-526 --- lib/constants.js | 1 + lib/queuePopulator/LogReader.js | 8 ++++++ lib/queuePopulator/QueuePopulator.js | 6 ++++ tests/unit/QueuePopulator.spec.js | 28 +++++++++++++++++++ .../unit/lib/queuePopulator/LogReader.spec.js | 2 ++ 5 files changed, 45 insertions(+) diff --git a/lib/constants.js b/lib/constants.js index 932793a95..bcd3334b8 100644 --- a/lib/constants.js +++ b/lib/constants.js @@ -14,6 +14,7 @@ const constants = { statusUndefined: 'UNDEFINED', statusNotReady: 'NOT_READY', statusNotConnected: 'NOT_CONNECTED', + statusTimedOut: 'TIMED_OUT', authTypeAssumeRole: 'assumeRole', authTypeAccount: 'account', authTypeService: 'service', diff --git a/lib/queuePopulator/LogReader.js b/lib/queuePopulator/LogReader.js index 750fb9989..f076af4ad 100644 --- a/lib/queuePopulator/LogReader.js +++ b/lib/queuePopulator/LogReader.js @@ -54,6 +54,7 @@ class LogReader { this._zkMetricsHandler = params.zkMetricsHandler; this._batchTimeoutSeconds = parseInt(process.env.BATCH_TIMEOUT_SECONDS, 10) || 300; + this._batchTimedOut = false; } _setEntryBatch(entryBatch) { @@ -300,6 +301,9 @@ class LogReader { this.log.error('queue populator batch timeout', { logStats: batchState.logStats, }); + this._batchTimedOut = true; + // S3C doesn't currently support restarts on healthcheck failure, + // so we just crash for now. process.emit('SIGTERM'); }, this._batchTimeoutSeconds * 1000); async.waterfall([ @@ -738,6 +742,10 @@ class LogReader { }); return statuses; } + + batchProcessTimedOut() { + return this._batchTimedOut; + } } module.exports = LogReader; diff --git a/lib/queuePopulator/QueuePopulator.js b/lib/queuePopulator/QueuePopulator.js index f16d6fd80..a5ef48155 100644 --- a/lib/queuePopulator/QueuePopulator.js +++ b/lib/queuePopulator/QueuePopulator.js @@ -592,6 +592,12 @@ class QueuePopulator { }); } }); + if (reader.batchProcessTimedOut()) { + responses.push({ + component: 'log reader', + status: constants.statusTimedOut, + }); + } }); log.debug('verbose liveness', verboseLiveness); diff --git a/tests/unit/QueuePopulator.spec.js b/tests/unit/QueuePopulator.spec.js index 583030888..cb9ef7a6f 100644 --- a/tests/unit/QueuePopulator.spec.js +++ b/tests/unit/QueuePopulator.spec.js @@ -49,6 +49,7 @@ describe('QueuePopulator', () => { const mockLogReader = sinon.spy(); mockLogReader.getProducerStatus = sinon.fake(() => prodStatus); mockLogReader.getLogInfo = sinon.fake(() => logInfo); + mockLogReader.batchProcessTimedOut = sinon.fake(() => false); qp.logReaders = [ mockLogReader, ]; @@ -72,6 +73,7 @@ describe('QueuePopulator', () => { }; mockLogReader.getProducerStatus = sinon.fake(() => prodStatus); mockLogReader.getLogInfo = sinon.fake(() => logInfo); + mockLogReader.batchProcessTimedOut = sinon.fake(() => false); qp.logReaders = [ mockLogReader, ]; @@ -91,5 +93,31 @@ describe('QueuePopulator', () => { ]) ); }); + + it('returns proper details when batch process timed out', () => { + const mockLogReader = sinon.spy(); + mockLogReader.getProducerStatus = sinon.fake(() => ({ + topicA: true, + })); + mockLogReader.getLogInfo = sinon.fake(() => {}); + mockLogReader.batchProcessTimedOut = sinon.fake(() => true); + qp.logReaders = [ + mockLogReader, + ]; + qp.zkClient = { + getState: () => zookeeper.State.SYNC_CONNECTED, + }; + qp.handleLiveness(mockRes, mockLog); + sinon.assert.calledOnceWithExactly(mockRes.writeHead, 500); + sinon.assert.calledOnceWithExactly( + mockRes.end, + JSON.stringify([ + { + component: 'log reader', + status: constants.statusTimedOut, + }, + ]) + ); + }); }); }); diff --git a/tests/unit/lib/queuePopulator/LogReader.spec.js b/tests/unit/lib/queuePopulator/LogReader.spec.js index cf50b98f1..8c6f6e526 100644 --- a/tests/unit/lib/queuePopulator/LogReader.spec.js +++ b/tests/unit/lib/queuePopulator/LogReader.spec.js @@ -309,6 +309,7 @@ describe('LogReader', () => { logReader.processLogEntries({}, () => {}); setTimeout(() => { assert.strictEqual(emmitted, true); + assert.strictEqual(logReader.batchProcessTimedOut(), true); done(); }, 2000); }).timeout(4000); @@ -325,6 +326,7 @@ describe('LogReader', () => { }); logReader.processLogEntries({}, () => { assert.strictEqual(emmitted, false); + assert.strictEqual(logReader.batchProcessTimedOut(), false); done(); }); }); From 377906274f122a35d6016d5d41cf0c720a37640b Mon Sep 17 00:00:00 2001 From: Kerkesni Date: Tue, 15 Oct 2024 15:17:58 +0200 Subject: [PATCH 3/3] only crash on S3C Issue: BB-526 --- lib/queuePopulator/LogReader.js | 7 +++- .../unit/lib/queuePopulator/LogReader.spec.js | 37 ++++++++++++++++++- 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/lib/queuePopulator/LogReader.js b/lib/queuePopulator/LogReader.js index f076af4ad..7c187d7ea 100644 --- a/lib/queuePopulator/LogReader.js +++ b/lib/queuePopulator/LogReader.js @@ -301,10 +301,13 @@ class LogReader { this.log.error('queue populator batch timeout', { logStats: batchState.logStats, }); - this._batchTimedOut = true; // S3C doesn't currently support restarts on healthcheck failure, // so we just crash for now. - process.emit('SIGTERM'); + if (process.env.CRASH_ON_BATCH_TIMEOUT) { + process.emit('SIGTERM'); + } else { + this._batchTimedOut = true; + } }, this._batchTimeoutSeconds * 1000); async.waterfall([ next => this._processReadRecords(params, batchState, next), diff --git a/tests/unit/lib/queuePopulator/LogReader.spec.js b/tests/unit/lib/queuePopulator/LogReader.spec.js index 8c6f6e526..2c515d1dd 100644 --- a/tests/unit/lib/queuePopulator/LogReader.spec.js +++ b/tests/unit/lib/queuePopulator/LogReader.spec.js @@ -296,8 +296,11 @@ describe('LogReader', () => { }); }); }); + }); - it('should shutdown when batch processing is stuck', done => { + describe('processLogEntries', () => { + it('should shutdown when batch processing is stuck and CRASH_ON_BATCH_TIMEOUT is set', done => { + process.env.CRASH_ON_BATCH_TIMEOUT = true; logReader._batchTimeoutSeconds = 1; // logReader will become stuck as _processReadRecords will never // call the callback @@ -309,12 +312,31 @@ describe('LogReader', () => { logReader.processLogEntries({}, () => {}); setTimeout(() => { assert.strictEqual(emmitted, true); + delete process.env.CRASH_ON_BATCH_TIMEOUT; + done(); + }, 2000); + }).timeout(4000); + + it('should fail healthcheck when batch processing is stuck', done => { + delete process.env.CRASH_ON_BATCH_TIMEOUT; + logReader._batchTimeoutSeconds = 1; + // logReader will become stuck as _processReadRecords will never + // call the callback + sinon.stub(logReader, '_processReadRecords').returns(); + let emmitted = false; + process.once('SIGTERM', () => { + emmitted = true; + }); + logReader.processLogEntries({}, () => {}); + setTimeout(() => { + assert.strictEqual(emmitted, false); assert.strictEqual(logReader.batchProcessTimedOut(), true); done(); }, 2000); }).timeout(4000); it('should not shutdown if timeout not reached', done => { + process.env.CRASH_ON_BATCH_TIMEOUT = true; sinon.stub(logReader, '_processReadRecords').yields(); sinon.stub(logReader, '_processPrepareEntries').yields(); sinon.stub(logReader, '_processFilterEntries').yields(); @@ -326,6 +348,19 @@ describe('LogReader', () => { }); logReader.processLogEntries({}, () => { assert.strictEqual(emmitted, false); + delete process.env.CRASH_ON_BATCH_TIMEOUT; + done(); + }); + }); + + it('should not fail healthcheck if timeout not reached', done => { + delete process.env.CRASH_ON_BATCH_TIMEOUT; + sinon.stub(logReader, '_processReadRecords').yields(); + sinon.stub(logReader, '_processPrepareEntries').yields(); + sinon.stub(logReader, '_processFilterEntries').yields(); + sinon.stub(logReader, '_processPublishEntries').yields(); + sinon.stub(logReader, '_processSaveLogOffset').yields(); + logReader.processLogEntries({}, () => { assert.strictEqual(logReader.batchProcessTimedOut(), false); done(); });