Skip to content

Commit

Permalink
forward port workarround for queue populator batch getting stuck
Browse files Browse the repository at this point in the history
original commit hash: dfcc2f5

Issue: BB-526
  • Loading branch information
Kerkesni committed Sep 24, 2024
1 parent a1e9895 commit f605ea3
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 1 deletion.
7 changes: 6 additions & 1 deletion bin/queuePopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,16 @@ function queueBatch(queuePopulator, taskState) {
log.debug('skipping batch: previous one still in progress');
return undefined;
}
const onTimeout = () => {

Check warning on line 30 in bin/queuePopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

bin/queuePopulator.js#L30

Added line #L30 was not covered by tests
// reset the flag to allow a new batch to start in case the
// previous batch timed out
taskState.batchInProgress = false;

Check warning on line 33 in bin/queuePopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

bin/queuePopulator.js#L33

Added line #L33 was not covered by tests
};
log.debug('start queueing batch');
taskState.batchInProgress = true;
const maxRead = qpConfig.batchMaxRead;
const timeoutMs = qpConfig.batchTimeoutMs;
queuePopulator.processLogEntries({ maxRead, timeoutMs }, err => {
queuePopulator.processLogEntries({ maxRead, timeoutMs, onTimeout }, err => {

Check warning on line 39 in bin/queuePopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

bin/queuePopulator.js#L39

Added line #L39 was not covered by tests
taskState.batchInProgress = false;
if (err) {
log.error('an error occurred during batch processing', {
Expand Down
11 changes: 11 additions & 0 deletions lib/queuePopulator/LogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ const { metricsExtension, metricsTypeQueued } =
require('../../extensions/replication/constants');
const { transformKey } = require('../util/entry');

const BATCH_TIMEOUT_SECONDS = 300;

class LogReader {

/**
Expand Down Expand Up @@ -291,6 +293,14 @@ class LogReader {
timeoutMs: params.timeoutMs,
logger: this.log.newRequestLogger(),
};
const batchTimeoutTimer = setTimeout(() => {
this.log.error('queue populator batch timeout', {

Check warning on line 297 in lib/queuePopulator/LogReader.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

lib/queuePopulator/LogReader.js#L297

Added line #L297 was not covered by tests
logStats: batchState.logStats,
});
if (params.onTimeout) {
params.onTimeout();

Check warning on line 301 in lib/queuePopulator/LogReader.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

lib/queuePopulator/LogReader.js#L300-L301

Added lines #L300 - L301 were not covered by tests
}
}, BATCH_TIMEOUT_SECONDS * 1000);
async.waterfall([
next => this._processReadRecords(params, batchState, next),
next => this._processPrepareEntries(batchState, next),
Expand All @@ -299,6 +309,7 @@ class LogReader {
next => this._processSaveLogOffset(batchState, next),
],
err => {
clearTimeout(batchTimeoutTimer);
if (err) {
return done(err);
}
Expand Down

0 comments on commit f605ea3

Please sign in to comment.