diff --git a/kafka_exporter.go b/kafka_exporter.go index 532205cd..ab3a51c5 100644 --- a/kafka_exporter.go +++ b/kafka_exporter.go @@ -10,6 +10,7 @@ import ( "os" "regexp" "strconv" + "strings" "sync" "github.com/Shopify/sarama" @@ -27,71 +28,17 @@ const ( ) var ( - clusterBrokers = prometheus.NewDesc( - prometheus.BuildFQName(namespace, "", "brokers"), - "Number of Brokers in the Kafka Cluster.", - nil, nil, - ) - - topicPartitions = prometheus.NewDesc( - prometheus.BuildFQName(namespace, "topic", "partitions"), - "Number of partitions for this Topic", - []string{"topic"}, nil, - ) - - topicCurrentOffset = prometheus.NewDesc( - prometheus.BuildFQName(namespace, "topic", "partition_current_offset"), - "Current Offset of a Broker at Topic/Partition", - []string{"topic", "partition"}, nil, - ) - - topicOldestOffset = prometheus.NewDesc( - prometheus.BuildFQName(namespace, "topic", "partition_oldest_offset"), - "Oldest Offset of a Broker at Topic/Partition", - []string{"topic", "partition"}, nil, - ) - - topicPartitionLeader = prometheus.NewDesc( - prometheus.BuildFQName(namespace, "topic", "partition_leader"), - "Leader Broker ID of this Topic/Partition", - []string{"topic", "partition"}, nil, - ) - - topicPartitionReplicas = prometheus.NewDesc( - prometheus.BuildFQName(namespace, "topic", "partition_replicas"), - "Number of Replicas for this Topic/Partition", - []string{"topic", "partition"}, nil, - ) - - topicPartitionInSyncReplicas = prometheus.NewDesc( - prometheus.BuildFQName(namespace, "topic", "partition_in_sync_replica"), - "Number of In-Sync Replicas for this Topic/Partition", - []string{"topic", "partition"}, nil, - ) - - topicPartitionUsesPreferredReplica = prometheus.NewDesc( - prometheus.BuildFQName(namespace, "topic", "partition_leader_is_preferred"), - "1 if Topic/Partition is using the Preferred Broker", - []string{"topic", "partition"}, nil, - ) - - topicUnderReplicatedPartition = prometheus.NewDesc( - prometheus.BuildFQName(namespace, "topic", "partition_under_replicated_partition"), - "1 if Topic/Partition is under Replicated", - []string{"topic", "partition"}, nil, - ) - - consumergroupCurrentOffset = prometheus.NewDesc( - prometheus.BuildFQName(namespace, "consumergroup", "current_offset"), - "Current Offset of a ConsumerGroup at Topic/Partition", - []string{"consumergroup", "topic", "partition"}, nil, - ) - - consumergroupLag = prometheus.NewDesc( - prometheus.BuildFQName(namespace, "consumergroup", "lag"), - "Current Approximate Lag of a ConsumerGroup at Topic/Partition", - []string{"consumergroup", "topic", "partition"}, nil, - ) + clusterBrokers *prometheus.Desc + topicPartitions *prometheus.Desc + topicCurrentOffset *prometheus.Desc + topicOldestOffset *prometheus.Desc + topicPartitionLeader *prometheus.Desc + topicPartitionReplicas *prometheus.Desc + topicPartitionInSyncReplicas *prometheus.Desc + topicPartitionUsesPreferredReplica *prometheus.Desc + topicUnderReplicatedPartition *prometheus.Desc + consumergroupCurrentOffset *prometheus.Desc + consumergroupLag *prometheus.Desc ) // Exporter collects Kafka stats from the given server and exports them using @@ -115,6 +62,7 @@ type kafkaOpts struct { tlsKeyFile string tlsInsecureSkipTLSVerify bool kafkaVersion string + labels string } // CanReadCertAndKey returns true if the certificate and key files already exists, @@ -465,6 +413,7 @@ func main() { kingpin.Flag("tls.key-file", "The optional key file for client authentication.").Default("").StringVar(&opts.tlsKeyFile) kingpin.Flag("tls.insecure-skip-tls-verify", "If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure.").Default("false").BoolVar(&opts.tlsInsecureSkipTLSVerify) kingpin.Flag("kafka.version", "Kafka broker version").Default(sarama.V1_0_0_0.String()).StringVar(&opts.kafkaVersion) + kingpin.Flag("kafka.labels", "Kafka cluster name").Default(sarama.V1_0_0_0.String()).StringVar(&opts.labels) plog.AddFlags(kingpin.CommandLine) kingpin.Version(version.Print("kafka_exporter")) @@ -474,6 +423,75 @@ func main() { plog.Infoln("Starting kafka_exporter", version.Info()) plog.Infoln("Build context", version.BuildContext()) + labels := make(map[string]string) + for _, label := range strings.Split(opts.labels, ",") { + splitted := strings.Split(label, "=") + labels[splitted[0]] = splitted[1] + } + + clusterBrokers = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "", "brokers"), + "Number of Brokers in the Kafka Cluster.", + nil, labels, + ) + topicPartitions = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "topic", "partitions"), + "Number of partitions for this Topic", + []string{"topic"}, labels, + ) + topicCurrentOffset = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "topic", "partition_current_offset"), + "Current Offset of a Broker at Topic/Partition", + []string{"topic", "partition"}, labels, + ) + topicOldestOffset = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "topic", "partition_oldest_offset"), + "Oldest Offset of a Broker at Topic/Partition", + []string{"topic", "partition"}, labels, + ) + + topicPartitionLeader = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "topic", "partition_leader"), + "Leader Broker ID of this Topic/Partition", + []string{"topic", "partition"}, labels, + ) + + topicPartitionReplicas = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "topic", "partition_replicas"), + "Number of Replicas for this Topic/Partition", + []string{"topic", "partition"}, labels, + ) + + topicPartitionInSyncReplicas = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "topic", "partition_in_sync_replica"), + "Number of In-Sync Replicas for this Topic/Partition", + []string{"topic", "partition"}, labels, + ) + + topicPartitionUsesPreferredReplica = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "topic", "partition_leader_is_preferred"), + "1 if Topic/Partition is using the Preferred Broker", + []string{"topic", "partition"}, labels, + ) + + topicUnderReplicatedPartition = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "topic", "partition_under_replicated_partition"), + "1 if Topic/Partition is under Replicated", + []string{"topic", "partition"}, labels, + ) + + consumergroupCurrentOffset = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "consumergroup", "current_offset"), + "Current Offset of a ConsumerGroup at Topic/Partition", + []string{"consumergroup", "topic", "partition"}, labels, + ) + + consumergroupLag = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "consumergroup", "lag"), + "Current Approximate Lag of a ConsumerGroup at Topic/Partition", + []string{"consumergroup", "topic", "partition"}, labels, + ) + if *logSarama { sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) }