diff --git a/lib/queuePopulator/LogReader.js b/lib/queuePopulator/LogReader.js index 4189cdfea..0beacfe27 100644 --- a/lib/queuePopulator/LogReader.js +++ b/lib/queuePopulator/LogReader.js @@ -52,6 +52,8 @@ class LogReader { this._metricsHandler = params.metricsHandler; // TODO: use a common handler for zk metrics from all extensions this._zkMetricsHandler = params.zkMetricsHandler; + + this.batchTimeoutSeconds = parseInt(process.env.BATCH_TIMEOUT_SECONDS, 10) || 300; } _setEntryBatch(entryBatch) { @@ -291,6 +293,15 @@ class LogReader { timeoutMs: params.timeoutMs, logger: this.log.newRequestLogger(), }; + // When using the RaftLogReader log consumer, The batch + // processing can get stuck sometimes for unknown reasons. + // As a temporary fix we set a timeout to kill the process. + const batchTimeoutTimer = setTimeout(() => { + this.log.error('queue populator batch timeout', { + logStats: batchState.logStats, + }); + process.emit('SIGTERM'); + }, this.batchTimeoutSeconds * 1000); async.waterfall([ next => this._processReadRecords(params, batchState, next), next => this._processPrepareEntries(batchState, next), @@ -299,6 +310,7 @@ class LogReader { next => this._processSaveLogOffset(batchState, next), ], err => { + clearTimeout(batchTimeoutTimer); if (err) { return done(err); } diff --git a/tests/unit/lib/queuePopulator/LogReader.spec.js b/tests/unit/lib/queuePopulator/LogReader.spec.js index 638878987..a374dd54a 100644 --- a/tests/unit/lib/queuePopulator/LogReader.spec.js +++ b/tests/unit/lib/queuePopulator/LogReader.spec.js @@ -296,5 +296,37 @@ describe('LogReader', () => { }); }); }); + + it('should shutdown when batch processing is stuck', done => { + logReader.batchTimeoutSeconds = 1; + // logReader will become stuck as _processReadRecords will never + // call the callback + sinon.stub(logReader, '_processReadRecords').returns(); + let emmitted = false; + process.once('SIGTERM', () => { + emmitted = true; + }); + logReader.processLogEntries({}, () => {}); + setTimeout(() => { + assert.strictEqual(emmitted, true); + done(); + }, 2000); + }).timeout(4000); + + it('should not shutdown if timeout not reached', done => { + sinon.stub(logReader, '_processReadRecords').yields(); + sinon.stub(logReader, '_processPrepareEntries').yields(); + sinon.stub(logReader, '_processFilterEntries').yields(); + sinon.stub(logReader, '_processPublishEntries').yields(); + sinon.stub(logReader, '_processSaveLogOffset').yields(); + let emmitted = false; + process.once('SIGTERM', () => { + emmitted = true; + }); + logReader.processLogEntries({}, () => { + assert.strictEqual(emmitted, false); + done(); + }); + }); }); });