Skip to content

Commit

Permalink
Merge branch 'bugfix/BB-455-clean-shutdown-backbeat-consumer' into q/8.6
Browse files Browse the repository at this point in the history
  • Loading branch information
bert-e committed Oct 30, 2023
2 parents 3cba5de + 304d3d5 commit 02beb15
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 27 deletions.
66 changes: 40 additions & 26 deletions lib/BackbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ const CONCURRENCY_DEFAULT = 1;
const CLIENT_ID = 'BackbeatConsumer';
const { withTopicPrefix } = require('./util/topic');

const UNASSIGN_STATUS = {
IDLE: 'idle',
DRAINED: 'drained',
TIMEOUT: 'timeout',
};

/**
* Stats on how we are consuming Kafka
* @typedef {Object} ConsumerStats
Expand Down Expand Up @@ -447,7 +453,6 @@ class BackbeatConsumer extends EventEmitter {
});
}


/**
* Track processing of a task.
* @param {*} entry - kafka entry being processed
Expand Down Expand Up @@ -580,29 +585,40 @@ class BackbeatConsumer extends EventEmitter {
logger.bind(this._log)('rdkafka.commit failed', { e: e.toString(), assignment });
}

try {
this._consumer.unassign();
} catch (e) {
// Ignore exceptions if we are not connected
const logger = this._consumer.isConnected() ? this._log.error : this._log.info;
logger.bind(this._log)('rdkafka.unassign failed', { e: e.toString(), assignment });
const doUnassign = () => {
try {
this._consumer.unassign();
} catch (e) {
// Ignore exceptions if we are not connected
const logger = this._consumer.isConnected() ? this._log.error : this._log.info;
logger.bind(this._log)('rdkafka.unassign failed', { e: e.toString(), assignment });
}

this.emit('unassign', status);
};

// publish offsets to zookeeper
if (this._kafkaBacklogMetricsConfig) {
this._publishOffsetsCron(doUnassign);
} else {
doUnassign();
}
});

if (!this._processingQueue || this._processingQueue.idle()) {
unassign('idle');
unassign(UNASSIGN_STATUS.IDLE);
return;
}

this._processingQueue.drain = () => unassign('drained');
this._processingQueue.drain = () => unassign(UNASSIGN_STATUS.DRAINED);

// Set timeout of `max.poll.interval.ms`), to ensure we complete the rebalance
// eventually (even if a task is really stuck), and don't get kicked out of the consumer
// group. If the timeout is reached, since one task (or more) tasks are stuck, we
// disconnect the consumer, so that the healthcheck will fail and the process will get
// restarted.
this._drainProcessQueueTimeout = setTimeout(() => {
unassign('timeout');
unassign(UNASSIGN_STATUS.TIMEOUT);

this._log.error('rdkafka.rebalance timeout: consumer stuck, disconnecting');
this._consumer.disconnect();
Expand Down Expand Up @@ -902,31 +918,29 @@ class BackbeatConsumer extends EventEmitter {
return setTimeout(() => this.close(cb), 1000);
}
this._circuitBreaker.stop();
return async.series([
return async.waterfall([
next => {
if (this._consumer) {
try {
this._consumer.commit();
} catch (e) {
// Ignore exceptions if we are not connected
const logger = this._consumer.isConnected() ? this._log.error : this._log.debug;
logger.bind(this._log)('commit failed', { e: e.toString() });
if (this._consumer.isConnected()) {
const subscription = this._consumer.subscription() || [];
if (subscription.length > 0) {
this._consumer.unsubscribe();
// Wait for partition unassign to complete before
// disconnecting, the rebalance callback will handle
// waiting for current jobs to complete as well as commit
// the latest offsets
this.once('unassign', () => next());
return;
}
}
if (this._kafkaBacklogMetricsConfig) {
// publish offsets to zookeeper
return this._publishOffsetsCron(() => next());
}
return process.nextTick(next);
process.nextTick(next);
},
next => {
if (this._zookeeper) {
this._zookeeper.close();
}
if (this._consumer) {
this._consumer.unsubscribe();
if (this._consumer.isConnected()) {
this._consumer.disconnect();
this._consumer.on('disconnected', () => next());
this._consumer.once('disconnected', () => next());
} else {
process.nextTick(next);
}
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "backbeat",
"version": "8.6.28",
"version": "8.6.29",
"description": "Asynchronous queue and job manager",
"main": "index.js",
"scripts": {
Expand Down
168 changes: 168 additions & 0 deletions tests/functional/lib/BackbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const { metrics } = require('arsenal');
const zookeeperHelper = require('../../../lib/clients/zookeeper');
const BackbeatProducer = require('../../../lib/BackbeatProducer');
const BackbeatConsumer = require('../../../lib/BackbeatConsumer');
const { CircuitBreaker } = require('breakbeat').CircuitBreaker;
const { promMetricNames } =
require('../../../lib/constants').kafkaBacklogMetrics;
const zookeeperConf = { connectionString: 'localhost:2181' };
Expand Down Expand Up @@ -637,6 +638,11 @@ describe('BackbeatConsumer with circuit breaker', () => {
afterEach(done => {
consumedMessages = [];
consumer.removeAllListeners('consumed');
// resetting the circuit breaker to avoid having
// a timeout when closing the consumer, as it depends
// on a revoke rebalance event that only gets triggered
// by polling (calling consumer.consume())
consumer._circuitBreaker = new CircuitBreaker();

async.parallel([
innerDone => producer.close(innerDone),
Expand Down Expand Up @@ -708,3 +714,165 @@ describe('BackbeatConsumer with circuit breaker', () => {
test.breakerConf = t.breakerConf;
});
});

describe('BackbeatConsumer shutdown tests', () => {
const topic = 'backbeat-consumer-spec-shutdown';
const groupId = `bucket-processor-${Math.random()}`;
const messages = [
{ key: 'm1', message: '{"value":"1"}' },
{ key: 'm2', message: '{"value":"2"}' },
];
let zookeeper;
let producer;
let consumer;

function queueProcessor(message, cb) {
if (message.value.toString() !== 'taskStuck') {
setTimeout(cb, 1000);
}
}

before(function before(done) {
this.timeout(60000);
producer = new BackbeatProducer({
topic,
kafka: producerKafkaConf,
pollIntervalMs: 100,
});
async.parallel([
innerDone => producer.on('ready', innerDone),
innerDone => {
zookeeper = zookeeperHelper.createClient(
zookeeperConf.connectionString);
zookeeper.connect();
zookeeper.on('ready', innerDone);
},
], done);
});

beforeEach(function beforeEach(done) {
this.timeout(60000);
consumer = new BackbeatConsumer({
zookeeper: zookeeperConf,
kafka: {
maxPollIntervalMs: 45000,
...consumerKafkaConf,
},
queueProcessor,
groupId,
topic,
bootstrap: true,
concurrency: 2,
});
consumer.on('ready', () => {
consumer.subscribe();
done();
});
});

afterEach(() => {
consumer.removeAllListeners('consumed');
});

after(function after(done) {
this.timeout(10000);
async.parallel([
innerDone => producer.close(innerDone),
innerDone => {
zookeeper.close();
innerDone();
},
], done);
});

it('should stop consuming and wait for current jobs to end before shutting down', done => {
setTimeout(() => {
producer.send(messages, assert.ifError);
}, 3000);
let totalConsumed = 0;
consumer.on('consumed', messagesConsumed => {
totalConsumed += messagesConsumed;
});
async.series([
next => {
const interval = setInterval(() => {
if (consumer._processingQueue.idle()) {
return;
}
clearInterval(interval);
next();
}, 500);
},
next => {
assert(!consumer._processingQueue.idle());
consumer.close(() => {
assert(consumer._processingQueue.idle());
// concurrency set to 2, so should only consume the first two
// initial messages before shutting down
assert(totalConsumed <= 2);
assert.strictEqual(consumer.getOffsetLedger().getProcessingCount(topic), 0);
next();
});
},
], done);
}).timeout(30000);

it('should immediatly shuttdown when no in progress tasks', done => {
setTimeout(() => {
producer.send([messages[0]], assert.ifError);
}, 3000);
async.series([
next => {
const interval = setInterval(() => {
if (!consumer._processingQueue.idle()) {
return;
}
clearInterval(interval);
next();
}, 500);
},
next => {
assert(consumer._processingQueue.idle());
consumer.close(() => {
assert(consumer._processingQueue.idle());
assert.strictEqual(consumer.getOffsetLedger().getProcessingCount(topic), 0);
next();
});
},
], done);
}).timeout(30000);

it('should shuttdown when consumer has been disconnected', done => {
async.series([
next => {
consumer._consumer.disconnect();
consumer._consumer.on('disconnected', () => next());
},
next => consumer.close(next),
], done);
}).timeout(30000);

it('should close even when a job is stuck', done => {
setTimeout(() => {
producer.send([{ key: 'key', message: 'taskStuck' }], assert.ifError);
}, 3000);
async.series([
next => {
const interval = setInterval(() => {
if (consumer._processingQueue.idle()) {
return;
}
clearInterval(interval);
next();
}, 500);
},
next => {
assert(!consumer._processingQueue.idle());
consumer.close(() => {
assert(!consumer._processingQueue.idle());
next();
});
},
], done);
}).timeout(60000);
});

0 comments on commit 02beb15

Please sign in to comment.