Skip to content

Commit

Permalink
support custom labels for multi kafka clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Jul 27, 2018
1 parent 8306602 commit e4caa00
Showing 1 changed file with 83 additions and 65 deletions.
148 changes: 83 additions & 65 deletions kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"regexp"
"strconv"
"strings"
"sync"

"github.com/Shopify/sarama"
Expand All @@ -27,71 +28,17 @@ const (
)

var (
clusterBrokers = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "brokers"),
"Number of Brokers in the Kafka Cluster.",
nil, nil,
)

topicPartitions = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partitions"),
"Number of partitions for this Topic",
[]string{"topic"}, nil,
)

topicCurrentOffset = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_current_offset"),
"Current Offset of a Broker at Topic/Partition",
[]string{"topic", "partition"}, nil,
)

topicOldestOffset = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_oldest_offset"),
"Oldest Offset of a Broker at Topic/Partition",
[]string{"topic", "partition"}, nil,
)

topicPartitionLeader = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_leader"),
"Leader Broker ID of this Topic/Partition",
[]string{"topic", "partition"}, nil,
)

topicPartitionReplicas = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_replicas"),
"Number of Replicas for this Topic/Partition",
[]string{"topic", "partition"}, nil,
)

topicPartitionInSyncReplicas = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_in_sync_replica"),
"Number of In-Sync Replicas for this Topic/Partition",
[]string{"topic", "partition"}, nil,
)

topicPartitionUsesPreferredReplica = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_leader_is_preferred"),
"1 if Topic/Partition is using the Preferred Broker",
[]string{"topic", "partition"}, nil,
)

topicUnderReplicatedPartition = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_under_replicated_partition"),
"1 if Topic/Partition is under Replicated",
[]string{"topic", "partition"}, nil,
)

consumergroupCurrentOffset = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "current_offset"),
"Current Offset of a ConsumerGroup at Topic/Partition",
[]string{"consumergroup", "topic", "partition"}, nil,
)

consumergroupLag = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "lag"),
"Current Approximate Lag of a ConsumerGroup at Topic/Partition",
[]string{"consumergroup", "topic", "partition"}, nil,
)
clusterBrokers *prometheus.Desc
topicPartitions *prometheus.Desc
topicCurrentOffset *prometheus.Desc
topicOldestOffset *prometheus.Desc
topicPartitionLeader *prometheus.Desc
topicPartitionReplicas *prometheus.Desc
topicPartitionInSyncReplicas *prometheus.Desc
topicPartitionUsesPreferredReplica *prometheus.Desc
topicUnderReplicatedPartition *prometheus.Desc
consumergroupCurrentOffset *prometheus.Desc
consumergroupLag *prometheus.Desc
)

// Exporter collects Kafka stats from the given server and exports them using
Expand All @@ -115,6 +62,7 @@ type kafkaOpts struct {
tlsKeyFile string
tlsInsecureSkipTLSVerify bool
kafkaVersion string
labels string
}

// CanReadCertAndKey returns true if the certificate and key files already exists,
Expand Down Expand Up @@ -465,6 +413,7 @@ func main() {
kingpin.Flag("tls.key-file", "The optional key file for client authentication.").Default("").StringVar(&opts.tlsKeyFile)
kingpin.Flag("tls.insecure-skip-tls-verify", "If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure.").Default("false").BoolVar(&opts.tlsInsecureSkipTLSVerify)
kingpin.Flag("kafka.version", "Kafka broker version").Default(sarama.V1_0_0_0.String()).StringVar(&opts.kafkaVersion)
kingpin.Flag("kafka.labels", "Kafka cluster name").Default(sarama.V1_0_0_0.String()).StringVar(&opts.labels)

plog.AddFlags(kingpin.CommandLine)
kingpin.Version(version.Print("kafka_exporter"))
Expand All @@ -474,6 +423,75 @@ func main() {
plog.Infoln("Starting kafka_exporter", version.Info())
plog.Infoln("Build context", version.BuildContext())

labels := make(map[string]string)
for _, label := range strings.Split(opts.labels, ",") {
splitted := strings.Split(label, "=")
labels[splitted[0]] = splitted[1]
}

clusterBrokers = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "brokers"),
"Number of Brokers in the Kafka Cluster.",
nil, labels,
)
topicPartitions = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partitions"),
"Number of partitions for this Topic",
[]string{"topic"}, labels,
)
topicCurrentOffset = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_current_offset"),
"Current Offset of a Broker at Topic/Partition",
[]string{"topic", "partition"}, labels,
)
topicOldestOffset = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_oldest_offset"),
"Oldest Offset of a Broker at Topic/Partition",
[]string{"topic", "partition"}, labels,
)

topicPartitionLeader = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_leader"),
"Leader Broker ID of this Topic/Partition",
[]string{"topic", "partition"}, labels,
)

topicPartitionReplicas = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_replicas"),
"Number of Replicas for this Topic/Partition",
[]string{"topic", "partition"}, labels,
)

topicPartitionInSyncReplicas = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_in_sync_replica"),
"Number of In-Sync Replicas for this Topic/Partition",
[]string{"topic", "partition"}, labels,
)

topicPartitionUsesPreferredReplica = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_leader_is_preferred"),
"1 if Topic/Partition is using the Preferred Broker",
[]string{"topic", "partition"}, labels,
)

topicUnderReplicatedPartition = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_under_replicated_partition"),
"1 if Topic/Partition is under Replicated",
[]string{"topic", "partition"}, labels,
)

consumergroupCurrentOffset = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "current_offset"),
"Current Offset of a ConsumerGroup at Topic/Partition",
[]string{"consumergroup", "topic", "partition"}, labels,
)

consumergroupLag = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "lag"),
"Current Approximate Lag of a ConsumerGroup at Topic/Partition",
[]string{"consumergroup", "topic", "partition"}, labels,
)

if *logSarama {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}
Expand Down

0 comments on commit e4caa00

Please sign in to comment.