Skip to content

Commit

Permalink
Merge pull request danielqsj#52 from matheusdaluz/master
Browse files Browse the repository at this point in the history
consume lag from zookeeper
  • Loading branch information
danielqsj authored Nov 6, 2018
2 parents 0728072 + dbbee53 commit 8488376
Show file tree
Hide file tree
Showing 55 changed files with 9,089 additions and 1,380 deletions.
Binary file added .DS_Store
Binary file not shown.
14 changes: 13 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 48 additions & 8 deletions kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"strconv"
"strings"
"sync"

"github.com/Shopify/sarama"
kazoo "github.com/krallistic/kazoo-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
plog "github.com/prometheus/common/log"
Expand Down Expand Up @@ -41,15 +41,18 @@ var (
consumergroupCurrentOffsetSum *prometheus.Desc
consumergroupLag *prometheus.Desc
consumergroupLagSum *prometheus.Desc
consumergroupLagZookeeper *prometheus.Desc
)

// Exporter collects Kafka stats from the given server and exports them using
// the prometheus metrics package.
type Exporter struct {
client sarama.Client
topicFilter *regexp.Regexp
groupFilter *regexp.Regexp
mu sync.Mutex
client sarama.Client
topicFilter *regexp.Regexp
groupFilter *regexp.Regexp
mu sync.Mutex
useZooKeeperLag bool
zookeeperClient *kazoo.Kazoo
}

type kafkaOpts struct {
Expand All @@ -64,6 +67,8 @@ type kafkaOpts struct {
tlsKeyFile string
tlsInsecureSkipTLSVerify bool
kafkaVersion string
useZooKeeperLag bool
uriZookeeper []string
labels string
}

Expand Down Expand Up @@ -103,6 +108,7 @@ func canReadFile(path string) bool {

// NewExporter returns an initialized Exporter.
func NewExporter(opts kafkaOpts, topicFilter string, groupFilter string) (*Exporter, error) {
var zookeeperClient *kazoo.Kazoo
config := sarama.NewConfig()
config.ClientID = clientID
kafkaVersion, err := sarama.ParseKafkaVersion(opts.kafkaVersion)
Expand Down Expand Up @@ -154,6 +160,10 @@ func NewExporter(opts kafkaOpts, topicFilter string, groupFilter string) (*Expor
}
}

if opts.useZooKeeperLag {
zookeeperClient, err = kazoo.NewKazoo(opts.uriZookeeper, nil)
}

client, err := sarama.NewClient(opts.uri, config)

if err != nil {
Expand All @@ -164,9 +174,11 @@ func NewExporter(opts kafkaOpts, topicFilter string, groupFilter string) (*Expor

// Init our exporter.
return &Exporter{
client: client,
topicFilter: regexp.MustCompile(topicFilter),
groupFilter: regexp.MustCompile(groupFilter),
client: client,
topicFilter: regexp.MustCompile(topicFilter),
groupFilter: regexp.MustCompile(groupFilter),
useZooKeeperLag: opts.useZooKeeperLag,
zookeeperClient: zookeeperClient,
}, nil
}

Expand All @@ -185,6 +197,7 @@ func (e *Exporter) Describe(ch chan<- *prometheus.Desc) {
ch <- consumergroupCurrentOffset
ch <- consumergroupCurrentOffsetSum
ch <- consumergroupLag
ch <- consumergroupLagZookeeper
ch <- consumergroupLagSum
}

Expand Down Expand Up @@ -289,6 +302,25 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
topicUnderReplicatedPartition, prometheus.GaugeValue, float64(0), topic, strconv.FormatInt(int64(partition), 10),
)
}

if e.useZooKeeperLag {
ConsumerGroups, err := e.zookeeperClient.Consumergroups()

if err != nil {
plog.Errorf("Cannot get consumer group %v", err)
}

for _, group := range ConsumerGroups {
offset, _ := group.FetchOffset(topic, partition)
if offset > 0 {

consumerGroupLag := currentOffset - offset
ch <- prometheus.MustNewConstMetric(
consumergroupLagZookeeper, prometheus.GaugeValue, float64(consumerGroupLag), group.Name, topic, strconv.FormatInt(int64(partition), 10),
)
}
}
}
}
}
}
Expand Down Expand Up @@ -427,6 +459,8 @@ func main() {
kingpin.Flag("tls.key-file", "The optional key file for client authentication.").Default("").StringVar(&opts.tlsKeyFile)
kingpin.Flag("tls.insecure-skip-tls-verify", "If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure.").Default("false").BoolVar(&opts.tlsInsecureSkipTLSVerify)
kingpin.Flag("kafka.version", "Kafka broker version").Default(sarama.V1_0_0_0.String()).StringVar(&opts.kafkaVersion)
kingpin.Flag("use.consumelag.zookeeper", "if you need to use a group from zookeeper").Default("false").BoolVar(&opts.useZooKeeperLag)
kingpin.Flag("zookeeper.server", "Address (hosts) of zookeeper server.").Default("localhost:2181").StringsVar(&opts.uriZookeeper)
kingpin.Flag("kafka.labels", "Kafka cluster name").Default("").StringVar(&opts.labels)

plog.AddFlags(kingpin.CommandLine)
Expand Down Expand Up @@ -517,6 +551,12 @@ func main() {
"Current Approximate Lag of a ConsumerGroup at Topic/Partition",
[]string{"consumergroup", "topic", "partition"}, labels,
)

consumergroupLagZookeeper = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroupzookeeper", "lag_zookeeper"),
"Current Approximate Lag(zookeeper) of a ConsumerGroup at Topic/Partition",
[]string{"consumergroup", "topic", "partition"}, nil,
)

consumergroupLagSum = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "lag_sum"),
Expand Down
Loading

0 comments on commit 8488376

Please sign in to comment.