Skip to content

Commit

Permalink
Shutdown when the logReader batch processing is stuck
Browse files Browse the repository at this point in the history
7.x fix was not forward ported to avoid introducing memory leaks
into the code. Instead of just unblocking the next batch processing
call and leaving the stuck one alive, now we shutdown the whole process.

7.x fix commit: dfcc2f5

Issue: BB-526
  • Loading branch information
Kerkesni committed Oct 10, 2024
1 parent a1e9895 commit 9c67ba8
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 0 deletions.
12 changes: 12 additions & 0 deletions lib/queuePopulator/LogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class LogReader {
this._metricsHandler = params.metricsHandler;
// TODO: use a common handler for zk metrics from all extensions
this._zkMetricsHandler = params.zkMetricsHandler;

this.batchTimeoutSeconds = parseInt(process.env.BATCH_TIMEOUT_SECONDS, 10) || 300;
}

_setEntryBatch(entryBatch) {
Expand Down Expand Up @@ -291,6 +293,15 @@ class LogReader {
timeoutMs: params.timeoutMs,
logger: this.log.newRequestLogger(),
};
// When using the RaftLogReader log consumer, The batch
// processing can get stuck sometimes for unknown reasons.
// As a temporary fix we set a timeout to kill the process.
const batchTimeoutTimer = setTimeout(() => {
this.log.error('queue populator batch timeout', {
logStats: batchState.logStats,
});
process.emit('SIGTERM');
}, this.batchTimeoutSeconds * 1000);
async.waterfall([
next => this._processReadRecords(params, batchState, next),
next => this._processPrepareEntries(batchState, next),
Expand All @@ -299,6 +310,7 @@ class LogReader {
next => this._processSaveLogOffset(batchState, next),
],
err => {
clearTimeout(batchTimeoutTimer);
if (err) {
return done(err);
}
Expand Down
34 changes: 34 additions & 0 deletions tests/unit/lib/queuePopulator/LogReader.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -244,4 +244,38 @@ describe('LogReader', () => {
});
});
});

describe('processLogEntries', () => {
it('should shutdown when batch processing is stuck', done => {
logReader.batchTimeoutSeconds = 1;
// logReader will become stuck as _processReadRecords will never
// call the callback
sinon.stub(logReader, '_processReadRecords').returns();
let emmitted = false;
process.once('SIGTERM', () => {
emmitted = true;
});
logReader.processLogEntries({}, () => {});
setTimeout(() => {
assert.strictEqual(emmitted, true);
done();
}, 2000);
}).timeout(4000);

it('should not shutdown if timeout not reached', done => {
sinon.stub(logReader, '_processReadRecords').yields();
sinon.stub(logReader, '_processPrepareEntries').yields();
sinon.stub(logReader, '_processFilterEntries').yields();
sinon.stub(logReader, '_processPublishEntries').yields();
sinon.stub(logReader, '_processSaveLogOffset').yields();
let emmitted = false;
process.once('SIGTERM', () => {
emmitted = true;
});
logReader.processLogEntries({}, () => {
assert.strictEqual(emmitted, false);
done();
});
});
});
});

0 comments on commit 9c67ba8

Please sign in to comment.