Skip to content

Commit

Permalink
Actualize fork
Browse files Browse the repository at this point in the history
  • Loading branch information
foxdalas committed Feb 11, 2022
1 parent 4b8cf7e commit e03a7cf
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 70 deletions.
2 changes: 1 addition & 1 deletion charts/kafka-exporter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
replicaCount: 1

image:
repository: danielqsj/kafka-exporter
repository: foxdalas/kafka-exporter
tag: latest
pullPolicy: IfNotPresent

Expand Down
4 changes: 2 additions & 2 deletions deploy/base/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ spec:
containers:
- name: kafka-exporter
imagePullPolicy: IfNotPresent
image: danielqsj/kafka-exporter
image: foxdalas/kafka-exporter
ports:
- name: http-metrics
containerPort: 9308
protocol: TCP
protocol: TCP
4 changes: 2 additions & 2 deletions deploy/base/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ resources:
- service.yaml

images:
- name: danielqsj/kafka-exporter
newTag: latest
- name: foxdalas/kafka-exporter
newTag: latest
95 changes: 34 additions & 61 deletions kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ type Exporter struct {
metrics []prometheus.Metric
metricsMtx sync.RWMutex

offsetShowAll bool
topicWorkers int
allowConcurrent bool
sgMutex sync.Mutex
sgWaitCh chan struct{}
sgChans []chan<- prometheus.Metric
consumerGroupFetchAll bool
offsetShowAll bool
topicWorkers int
allowConcurrent bool
sgMutex sync.Mutex
sgWaitCh chan struct{}
sgChans []chan<- prometheus.Metric
consumerGroupFetchAll bool
}

type kafkaOpts struct {
Expand All @@ -105,7 +105,7 @@ type kafkaOpts struct {
uriZookeeper []string
labels string
metadataRefreshInterval string
collectMetricsInterval time.Duration
collectMetricsInterval string
serviceName string
kerberosConfigPath string
realm string
Expand Down Expand Up @@ -271,33 +271,25 @@ func NewExporter(opts kafkaOpts, topicFilter string, groupFilter string) (*Expor
nextMetadataRefresh: time.Now(),
metadataRefreshInterval: interval,

offsetShowAll: opts.offsetShowAll,
topicWorkers: opts.topicWorkers,
allowConcurrent: opts.allowConcurrent,
sgMutex: sync.Mutex{},
sgWaitCh: nil,
sgChans: []chan<- prometheus.Metric{},
consumerGroupFetchAll: config.Version.IsAtLeast(sarama.V2_0_0_0),
offsetShowAll: opts.offsetShowAll,
topicWorkers: opts.topicWorkers,
allowConcurrent: opts.allowConcurrent,
sgMutex: sync.Mutex{},
sgWaitCh: nil,
sgChans: []chan<- prometheus.Metric{},
consumerGroupFetchAll: config.Version.IsAtLeast(sarama.V2_0_0_0),
}

// start background collect metrics
go exporter.backgroundCollect(opts.collectMetricsInterval)
interval, err = time.ParseDuration(opts.collectMetricsInterval)
if err != nil {
return nil, errors.Wrap(err, "Cannot parse collect metrics interval")
}
go exporter.backgroundCollect(interval)

return exporter, nil
}

func (e *Exporter) fetchOffsetVersion() int16 {
version := e.client.Config().Version
if e.client.Config().Version.IsAtLeast(sarama.V2_0_0_0) {
return 4
} else if version.IsAtLeast(sarama.V0_10_2_0) {
return 2
} else if version.IsAtLeast(sarama.V0_8_2_2) {
return 1
}
return 0
}

// Describe describes all the metrics ever exported by the Kafka exporter. It
// implements prometheus.Collector.
func (e *Exporter) Describe(ch chan<- *prometheus.Desc) {
Expand Down Expand Up @@ -347,32 +339,9 @@ func (e *Exporter) backgroundCollect(interval time.Duration) {
}
}

func (e *Exporter) collectChans(quit chan struct{}) {
original := make(chan prometheus.Metric)
container := make([]prometheus.Metric, 0, 100)
go func() {
for metric := range original {
container = append(container, metric)
}
}()
e.collect(original)
close(original)
// Lock to avoid modification on the channel slice
e.sgMutex.Lock()
for _, ch := range e.sgChans {
for _, metric := range container {
ch <- metric
}
}
// Reset the slice
e.sgChans = e.sgChans[:0]
// Notify remaining waiting Collect they can return
close(quit)
// Release the lock so Collect can append to the slice again
e.sgMutex.Unlock()
}

func (e *Exporter) collect(ch chan<- prometheus.Metric) {
glog.Info("Collecting metrics")

var wg = sync.WaitGroup{}
ch <- prometheus.MustNewConstMetric(
clusterBrokers, prometheus.GaugeValue, float64(len(e.client.Brokers())),
Expand Down Expand Up @@ -679,10 +648,6 @@ func init() {
metrics.UseNilMetrics = true
prometheus.MustRegister(version.NewCollector("kafka_exporter"))
}
func toFlag(name string, help string) *kingpin.FlagClause {
flag.CommandLine.String(name, "", help) // hack around flag.Parse and glog.init flags
return kingpin.Flag(name, help)
}

// hack around flag.Parse and glog.init flags
func toFlagString(name string, help string, value string) *string {
Expand Down Expand Up @@ -758,6 +723,7 @@ func main() {
toFlagBoolVar("concurrent.enable", "If true, all scrapes will trigger kafka operations otherwise, they will share results. WARN: This should be disabled on large clusters", false, "false", &opts.allowConcurrent)
toFlagIntVar("topic.workers", "Number of topic workers", 100, "100", &opts.topicWorkers)
toFlagIntVar("verbosity", "Verbosity log level", 0, "0", &opts.verbosityLogLevel)
toFlagStringVar("collect.interval", "Collect metrics interval", "15s", &opts.collectMetricsInterval)

plConfig := plog.Config{}
plogflag.AddFlags(kingpin.CommandLine, &plConfig)
Expand All @@ -780,7 +746,6 @@ func main() {
setup(*listenAddress, *metricsPath, *topicFilter, *groupFilter, *logSarama, opts, labels)
}


func setup(
listenAddress string,
metricsPath string,
Expand All @@ -793,7 +758,9 @@ func setup(
if err := flag.Set("logtostderr", "true"); err != nil {
glog.Errorf("Error on setting logtostderr to true")
}
flag.Set("v", strconv.Itoa(opts.verbosityLogLevel))
if err := flag.Set("v", strconv.Itoa(opts.verbosityLogLevel)); err != nil {
glog.Error(err)
}
flag.Parse()
defer glog.Flush()

Expand Down Expand Up @@ -900,17 +867,23 @@ func setup(

http.Handle(metricsPath, promhttp.Handler())
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`<html>
_, err := w.Write([]byte(`<html>
<head><title>Kafka Exporter</title></head>
<body>
<h1>Kafka Exporter</h1>
<p><a href='` + metricsPath + `'>Metrics</a></p>
</body>
</html>`))
if err != nil {
glog.Error(err)
}
})
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
// need more specific sarama check
w.Write([]byte("ok"))
_, err := w.Write([]byte("ok"))
if err != nil {
glog.Error(err)
}
})

if opts.serverUseTLS {
Expand Down
6 changes: 2 additions & 4 deletions simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,8 @@ func assumeKafka(t *testing.T) bool {
}
defer client.Close()
_, err = client.Topics()
if err != nil {
return false
}
return true

return err == nil
}

func execute(handler func(response *http.Response)) {
Expand Down

0 comments on commit e03a7cf

Please sign in to comment.