Skip to content

Commit

Permalink
Add metrics on rebalance
Browse files Browse the repository at this point in the history
Track the number of rebalances, and especially their status: idle,
drained, or timeout.

Issue: BB-440
  • Loading branch information
francoisferrand committed Oct 27, 2023
1 parent 3fb173e commit 6e0507f
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 3 deletions.
8 changes: 5 additions & 3 deletions lib/BackbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -553,12 +553,14 @@ class BackbeatConsumer extends EventEmitter {
running: this._processingQueue?.running(),
});

const unassign = jsutil.once(message => {
this._log.info(`processing queue ${message}, un-assigning`, {
const unassign = jsutil.once(status => {
this._log.info(`processing queue ${status}, un-assigning`, {
queueLen: this._processingQueue?.length(),
running: this._processingQueue?.running(),
});

KafkaBacklogMetrics.onRebalance(this._topic, this._groupId, status);

// Reset the drain callback
if (this._processingQueue) {
this._processingQueue.drain = () => { };
Expand Down Expand Up @@ -588,7 +590,7 @@ class BackbeatConsumer extends EventEmitter {
});

if (!this._processingQueue || this._processingQueue.idle()) {
unassign('empty');
unassign('idle');
return;
}

Expand Down
26 changes: 26 additions & 0 deletions lib/KafkaBacklogMetrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ const latestConsumeEventTimestampGauge = metrics.ZenkoMetrics.createGauge({
labelNames: ['topic', 'partition', 'consumergroup'],
});

const rebalanceTotalCounter = metrics.ZenkoMetrics.createCounter({
name: promMetricNames.rebalanceTotal,
help: 'Number of rebalance events',
labelNames: ['topic', 'consumergroup', 'status'],
});

const slowTasksCountGauge = metrics.ZenkoMetrics.createGauge({
name: promMetricNames.slowTasksCount,
help: 'Current number of slow tasks',
Expand Down Expand Up @@ -144,6 +150,26 @@ class KafkaBacklogMetrics extends EventEmitter {
}, Date.now() / 1000);
}

/**
* Increments the rebalance total counter
* @param {string} topic - The topic being rebalanced.
* @param {string} consumerGroup - The consumer group being rebalanced.
* @param {string} status - The status of the rebalance.
* @returns {void}
*/
static onRebalance(topic, consumerGroup, status) {
rebalanceTotalCounter.inc({
topic, consumergroup: consumerGroup, status,
});
}

/**
* Increment the slow tasks count gauge
* @param {string} topic - The Kafka topic.
* @param {number} partition - The Kafka partition.
* @param {string} consumerGroup - The Kafka consumer group.
* @returns {void}
*/
static onSlowTask(topic, partition, consumerGroup) {
slowTasksCountGauge.inc({
topic, partition, consumergroup: consumerGroup,
Expand Down
1 change: 1 addition & 0 deletions lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const constants = {
deliveryReportsTotal: 's3_zenko_queue_delivery_reports_total',
latestConsumedMessageTimestamp: 's3_zenko_queue_latest_consumed_message_timestamp',
latestConsumeEventTimestamp: 's3_zenko_queue_latest_consume_event_timestamp',
rebalanceTotal: 's3_zenko_queue_rebalance_total',
slowTasksCount: 's3_zenko_queue_slowTasks_count',
taskProcessingTime: 's3_zenko_queue_task_processing_time_seconds',
},
Expand Down

0 comments on commit 6e0507f

Please sign in to comment.