Skip to content

Commit

Permalink
Merge pull request #71 from milsonian/lag-offset-sum
Browse files Browse the repository at this point in the history
Adds metrics:  kafka_consumergroup_current_offset_sum and kafka_consumergroup_lag_sum
  • Loading branch information
danielqsj authored Nov 6, 2018
2 parents 127f0e3 + 60b7d31 commit 6f5fb77
Showing 1 changed file with 28 additions and 2 deletions.
30 changes: 28 additions & 2 deletions kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ var (
topicPartitionUsesPreferredReplica *prometheus.Desc
topicUnderReplicatedPartition *prometheus.Desc
consumergroupCurrentOffset *prometheus.Desc
consumergroupCurrentOffsetSum *prometheus.Desc
consumergroupLag *prometheus.Desc
consumergroupLagSum *prometheus.Desc
)

// Exporter collects Kafka stats from the given server and exports them using
Expand Down Expand Up @@ -181,7 +183,9 @@ func (e *Exporter) Describe(ch chan<- *prometheus.Desc) {
ch <- topicPartitionUsesPreferredReplica
ch <- topicUnderReplicatedPartition
ch <- consumergroupCurrentOffset
ch <- consumergroupCurrentOffsetSum
ch <- consumergroupLag
ch <- consumergroupLagSum
}

// Collect fetches the stats from configured Kafka location and delivers them
Expand Down Expand Up @@ -342,15 +346,18 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
}
}
if topicConsumed {
var currentOffsetSum int64
var lagSum int64
for partition, offsetFetchResponseBlock := range partitions {
err := offsetFetchResponseBlock.Err
if err != sarama.ErrNoError {
plog.Errorln("Error for partition %d :%v", partition, err.Error())
continue
}

currentOffset := offsetFetchResponseBlock.Offset
currentOffsetSum += currentOffset
ch <- prometheus.MustNewConstMetric(
consumergroupCurrentOffset, prometheus.GaugeValue, float64(offsetFetchResponseBlock.Offset), group.GroupId, topic, strconv.FormatInt(int64(partition), 10),
consumergroupCurrentOffset, prometheus.GaugeValue, float64(currentOffset), group.GroupId, topic, strconv.FormatInt(int64(partition), 10),
)
e.mu.Lock()
if offset, ok := offset[topic][partition]; ok {
Expand All @@ -361,6 +368,7 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
lag = -1
} else {
lag = offset - offsetFetchResponseBlock.Offset
lagSum += lag
}
ch <- prometheus.MustNewConstMetric(
consumergroupLag, prometheus.GaugeValue, float64(lag), group.GroupId, topic, strconv.FormatInt(int64(partition), 10),
Expand All @@ -370,6 +378,12 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
}
e.mu.Unlock()
}
ch <- prometheus.MustNewConstMetric(
consumergroupCurrentOffsetSum, prometheus.GaugeValue, float64(currentOffsetSum), group.GroupId, topic,
)
ch <- prometheus.MustNewConstMetric(
consumergroupLagSum, prometheus.GaugeValue, float64(lagSum), group.GroupId, topic,
)
}
}
}
Expand Down Expand Up @@ -488,12 +502,24 @@ func main() {
[]string{"consumergroup", "topic", "partition"}, labels,
)

consumergroupCurrentOffsetSum = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "current_offset_sum"),
"Current Offset of a ConsumerGroup at Topic for all partitions",
[]string{"consumergroup", "topic"}, labels,
)

consumergroupLag = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "lag"),
"Current Approximate Lag of a ConsumerGroup at Topic/Partition",
[]string{"consumergroup", "topic", "partition"}, labels,
)

consumergroupLagSum = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "lag_sum"),
"Current Approximate Lag of a ConsumerGroup at Topic for all partitions",
[]string{"consumergroup", "topic"}, labels,
)

if *logSarama {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}
Expand Down

0 comments on commit 6f5fb77

Please sign in to comment.