From 6e0507fcee83fd533f8081161575266bba1511b3 Mon Sep 17 00:00:00 2001 From: Francois Ferrand Date: Thu, 26 Oct 2023 10:31:23 +0200 Subject: [PATCH] Add metrics on rebalance Track the number of rebalances, and especially their status: idle, drained, or timeout. Issue: BB-440 --- lib/BackbeatConsumer.js | 8 +++++--- lib/KafkaBacklogMetrics.js | 26 ++++++++++++++++++++++++++ lib/constants.js | 1 + 3 files changed, 32 insertions(+), 3 deletions(-) diff --git a/lib/BackbeatConsumer.js b/lib/BackbeatConsumer.js index a8cc196f2..1e71c3ffd 100644 --- a/lib/BackbeatConsumer.js +++ b/lib/BackbeatConsumer.js @@ -553,12 +553,14 @@ class BackbeatConsumer extends EventEmitter { running: this._processingQueue?.running(), }); - const unassign = jsutil.once(message => { - this._log.info(`processing queue ${message}, un-assigning`, { + const unassign = jsutil.once(status => { + this._log.info(`processing queue ${status}, un-assigning`, { queueLen: this._processingQueue?.length(), running: this._processingQueue?.running(), }); + KafkaBacklogMetrics.onRebalance(this._topic, this._groupId, status); + // Reset the drain callback if (this._processingQueue) { this._processingQueue.drain = () => { }; @@ -588,7 +590,7 @@ class BackbeatConsumer extends EventEmitter { }); if (!this._processingQueue || this._processingQueue.idle()) { - unassign('empty'); + unassign('idle'); return; } diff --git a/lib/KafkaBacklogMetrics.js b/lib/KafkaBacklogMetrics.js index d2f879f51..ec2e18132 100644 --- a/lib/KafkaBacklogMetrics.js +++ b/lib/KafkaBacklogMetrics.js @@ -37,6 +37,12 @@ const latestConsumeEventTimestampGauge = metrics.ZenkoMetrics.createGauge({ labelNames: ['topic', 'partition', 'consumergroup'], }); +const rebalanceTotalCounter = metrics.ZenkoMetrics.createCounter({ + name: promMetricNames.rebalanceTotal, + help: 'Number of rebalance events', + labelNames: ['topic', 'consumergroup', 'status'], +}); + const slowTasksCountGauge = metrics.ZenkoMetrics.createGauge({ name: promMetricNames.slowTasksCount, help: 'Current number of slow tasks', @@ -144,6 +150,26 @@ class KafkaBacklogMetrics extends EventEmitter { }, Date.now() / 1000); } + /** + * Increments the rebalance total counter + * @param {string} topic - The topic being rebalanced. + * @param {string} consumerGroup - The consumer group being rebalanced. + * @param {string} status - The status of the rebalance. + * @returns {void} + */ + static onRebalance(topic, consumerGroup, status) { + rebalanceTotalCounter.inc({ + topic, consumergroup: consumerGroup, status, + }); + } + + /** + * Increment the slow tasks count gauge + * @param {string} topic - The Kafka topic. + * @param {number} partition - The Kafka partition. + * @param {string} consumerGroup - The Kafka consumer group. + * @returns {void} + */ static onSlowTask(topic, partition, consumerGroup) { slowTasksCountGauge.inc({ topic, partition, consumergroup: consumerGroup, diff --git a/lib/constants.js b/lib/constants.js index 6abd86734..80798d047 100644 --- a/lib/constants.js +++ b/lib/constants.js @@ -5,6 +5,7 @@ const constants = { deliveryReportsTotal: 's3_zenko_queue_delivery_reports_total', latestConsumedMessageTimestamp: 's3_zenko_queue_latest_consumed_message_timestamp', latestConsumeEventTimestamp: 's3_zenko_queue_latest_consume_event_timestamp', + rebalanceTotal: 's3_zenko_queue_rebalance_total', slowTasksCount: 's3_zenko_queue_slowTasks_count', taskProcessingTime: 's3_zenko_queue_task_processing_time_seconds', },