Skip to content

Commit

Permalink
New metric holder (#411)
Browse files Browse the repository at this point in the history
* New metric holder

* Rename metric.Ctl constructor

* Fix prometheus import name

* Fix build

* Add tests

* Refactor metric controller

* Remove updateTime & beautify

* Fix nowTime & tests

* Refactor metric controller

* Add xtime package

* Use xtime in metric holder

* Rework holder & wrappers

* Rework

* Fix go mod

* Some performance optimizations to the MetricHolder (#568)

* Some performance optimizations

* Rename RenameReleaser -> MetricDeleter

* Benchmark prometheus.MetricVec

* Replace parallel benchmark with the loop

* Fix string copy

* Add tests for the HeldMetric

* Fix naming & build (#569)

* Fix naming & build

* Rollback comment

* Update MetricHolder test

* Add metric_hold_duration to pipeline settings

* go mod tidy

* Fix tests

* Fix after merge

* Fix tests
  • Loading branch information
kirillov6 authored Jan 31, 2024
1 parent 2f1335d commit 10521d4
Show file tree
Hide file tree
Showing 44 changed files with 859 additions and 446 deletions.
4 changes: 2 additions & 2 deletions fd/file.d.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ func (f *FileD) Start() {
}

func (f *FileD) initMetrics() {
f.metricCtl = metric.New("file_d", f.registry)
f.versionMetric = f.metricCtl.RegisterCounter("version", "", "version")
f.metricCtl = metric.NewCtl("file_d", f.registry)
f.versionMetric = f.metricCtl.RegisterCounterVec("version", "", "version")
f.versionMetric.WithLabelValues(buildinfo.Version).Inc()
}

Expand Down
11 changes: 11 additions & 0 deletions fd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
decoder := "auto"
isStrict := false
eventTimeout := pipeline.DefaultEventTimeout
metricHoldDuration := pipeline.DefaultMetricHoldDuration

if settings != nil {
val := settings.Get("capacity").MustInt()
Expand Down Expand Up @@ -85,6 +86,15 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
antispamExceptions.Prepare()

isStrict = settings.Get("is_strict").MustBool()

str = settings.Get("metric_hold_duration").MustString()
if str != "" {
i, err := time.ParseDuration(str)
if err != nil {
logger.Fatalf("can't parse pipeline metric hold duration: %s", err.Error())
}
metricHoldDuration = i
}
}

return &pipeline.Settings{
Expand All @@ -98,6 +108,7 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
EventTimeout: eventTimeout,
StreamField: streamField,
IsStrict: isStrict,
MetricHoldDuration: metricHoldDuration,
}
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137
github.com/alicebob/miniredis/v2 v2.30.5
github.com/bitly/go-simplejson v0.5.1
github.com/cespare/xxhash/v2 v2.2.0
github.com/euank/go-kmsg-parser v2.0.0+incompatible
github.com/go-redis/redis v6.15.9+incompatible
github.com/golang/mock v1.6.0
Expand Down Expand Up @@ -48,7 +49,6 @@ require (
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v3 v3.0.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cilium/ebpf v0.9.1 // indirect
github.com/containerd/cgroups/v3 v3.0.1 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
Expand Down
113 changes: 63 additions & 50 deletions metric/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,90 +20,103 @@ type Ctl struct {
subsystem string
register *prometheus.Registry

counters map[string]*prometheus.CounterVec
counterMx *sync.Mutex

gauges map[string]*prometheus.GaugeVec
gaugeMx *sync.Mutex

histograms map[string]*prometheus.HistogramVec
histogramMx *sync.Mutex
metrics map[string]prometheus.Collector
mu sync.RWMutex
}

func New(subsystem string, registry *prometheus.Registry) *Ctl {
func NewCtl(subsystem string, registry *prometheus.Registry) *Ctl {
ctl := &Ctl{
subsystem: subsystem,
counters: make(map[string]*prometheus.CounterVec),
counterMx: new(sync.Mutex),
gauges: make(map[string]*prometheus.GaugeVec),
gaugeMx: new(sync.Mutex),
histograms: make(map[string]*prometheus.HistogramVec),
histogramMx: new(sync.Mutex),
register: registry,
subsystem: subsystem,
register: registry,
metrics: make(map[string]prometheus.Collector),
}
return ctl
}

func (mc *Ctl) RegisterCounter(name, help string, labels ...string) *prometheus.CounterVec {
mc.counterMx.Lock()
defer mc.counterMx.Unlock()
func (mc *Ctl) RegisterCounter(name, help string) prometheus.Counter {
counter := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: PromNamespace,
Subsystem: mc.subsystem,
Name: name,
Help: help,
})

if metric, hasCounter := mc.counters[name]; hasCounter {
return metric
}
return mc.registerMetric(name, counter).(prometheus.Counter)
}

promCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
func (mc *Ctl) RegisterCounterVec(name, help string, labels ...string) *prometheus.CounterVec {
counterVec := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: PromNamespace,
Subsystem: mc.subsystem,
Name: name,
Help: help,
}, labels)

mc.counters[name] = promCounter
mc.register.Unregister(promCounter)
mc.register.MustRegister(promCounter)
return promCounter
return mc.registerMetric(name, counterVec).(*prometheus.CounterVec)
}

func (mc *Ctl) RegisterGauge(name, help string, labels ...string) *prometheus.GaugeVec {
mc.gaugeMx.Lock()
defer mc.gaugeMx.Unlock()
func (mc *Ctl) RegisterGauge(name, help string) prometheus.Gauge {
gauge := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: PromNamespace,
Subsystem: mc.subsystem,
Name: name,
Help: help,
})

if metric, hasGauge := mc.gauges[name]; hasGauge {
return metric
}
return mc.registerMetric(name, gauge).(prometheus.Gauge)
}

promGauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{
func (mc *Ctl) RegisterGaugeVec(name, help string, labels ...string) *prometheus.GaugeVec {
gaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: PromNamespace,
Subsystem: mc.subsystem,
Name: name,
Help: help,
}, labels)

mc.gauges[name] = promGauge
mc.register.Unregister(promGauge)
mc.register.MustRegister(promGauge)
return promGauge
return mc.registerMetric(name, gaugeVec).(*prometheus.GaugeVec)
}

func (mc *Ctl) RegisterHistogram(name, help string, buckets []float64, labels ...string) *prometheus.HistogramVec {
mc.histogramMx.Lock()
defer mc.histogramMx.Unlock()
func (mc *Ctl) RegisterHistogram(name, help string, buckets []float64) prometheus.Histogram {
histogram := prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: PromNamespace,
Subsystem: mc.subsystem,
Name: name,
Help: help,
Buckets: buckets,
})

if metric, hasHistogram := mc.histograms[name]; hasHistogram {
return metric
}
return mc.registerMetric(name, histogram).(prometheus.Histogram)
}

promHistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{
func (mc *Ctl) RegisterHistogramVec(name, help string, buckets []float64, labels ...string) *prometheus.HistogramVec {
histogramVec := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: PromNamespace,
Subsystem: mc.subsystem,
Name: name,
Help: help,
Buckets: buckets,
}, labels)

mc.histograms[name] = promHistogram
mc.register.Unregister(promHistogram)
mc.register.MustRegister(promHistogram)
return promHistogram
return mc.registerMetric(name, histogramVec).(*prometheus.HistogramVec)
}

func (mc *Ctl) registerMetric(name string, newMetric prometheus.Collector) prometheus.Collector {
mc.mu.RLock()
metric, has := mc.metrics[name]
mc.mu.RUnlock()
if has {
return metric
}

mc.mu.Lock()
defer mc.mu.Unlock()
metric, has = mc.metrics[name]
if !has {
metric = newMetric
mc.metrics[name] = metric
mc.register.MustRegister(metric)
}

return metric
}
43 changes: 43 additions & 0 deletions metric/held_counter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package metric

import (
"time"

"github.com/prometheus/client_golang/prometheus"
)

type HeldCounter struct {
*heldMetric[prometheus.Counter]
}

func (h HeldCounter) Inc() {
h.metric.Inc()
h.updateUsage()
}

func (h HeldCounter) Add(v float64) {
h.metric.Add(v)
h.updateUsage()
}

type HeldCounterVec struct {
store *heldMetricsStore[prometheus.Counter]
vec *prometheus.CounterVec
}

func NewHeldCounterVec(cv *prometheus.CounterVec) HeldCounterVec {
return HeldCounterVec{
vec: cv,
store: newHeldMetricsStore[prometheus.Counter](),
}
}

func (h HeldCounterVec) WithLabelValues(lvs ...string) HeldCounter {
return HeldCounter{
heldMetric: h.store.GetOrCreate(lvs, h.vec.WithLabelValues),
}
}

func (h HeldCounterVec) DeleteOldMetrics(holdDuration time.Duration) {
h.store.DeleteOldMetrics(holdDuration, h.vec)
}
58 changes: 58 additions & 0 deletions metric/held_gauge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package metric

import (
"time"

"github.com/prometheus/client_golang/prometheus"
)

type HeldGauge struct {
*heldMetric[prometheus.Gauge]
}

func (h HeldGauge) Set(v float64) {
h.metric.Set(v)
h.updateUsage()
}

func (h HeldGauge) Inc() {
h.metric.Inc()
h.updateUsage()
}

func (h HeldGauge) Dec() {
h.metric.Dec()
h.updateUsage()
}

func (h HeldGauge) Add(v float64) {
h.metric.Add(v)
h.updateUsage()
}

func (h HeldGauge) Sub(v float64) {
h.metric.Sub(v)
h.updateUsage()
}

type HeldGaugeVec struct {
store *heldMetricsStore[prometheus.Gauge]
vec *prometheus.GaugeVec
}

func NewHeldGaugeVec(gv *prometheus.GaugeVec) HeldGaugeVec {
return HeldGaugeVec{
vec: gv,
store: newHeldMetricsStore[prometheus.Gauge](),
}
}

func (h HeldGaugeVec) WithLabelValues(lvs ...string) HeldGauge {
return HeldGauge{
heldMetric: h.store.GetOrCreate(lvs, h.vec.WithLabelValues),
}
}

func (h HeldGaugeVec) DeleteOldMetrics(holdDuration time.Duration) {
h.store.DeleteOldMetrics(holdDuration, h.vec)
}
40 changes: 40 additions & 0 deletions metric/held_histogram.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package metric

import (
"time"

"github.com/prometheus/client_golang/prometheus"
)

type HeldHistogram struct {
*heldMetric[prometheus.Histogram]
}

func (h HeldHistogram) Observe(v float64) {
h.metric.Observe(v)
h.updateUsage()
}

type HeldHistogramVec struct {
store *heldMetricsStore[prometheus.Histogram]
vec *prometheus.HistogramVec
}

func NewHeldHistogramVec(hv *prometheus.HistogramVec) HeldHistogramVec {
return HeldHistogramVec{
vec: hv,
store: newHeldMetricsStore[prometheus.Histogram](),
}
}

func (h HeldHistogramVec) WithLabelValues(lvs ...string) HeldHistogram {
return HeldHistogram{
heldMetric: h.store.GetOrCreate(lvs, func(s ...string) prometheus.Histogram {
return h.vec.WithLabelValues(s...).(prometheus.Histogram)
}),
}
}

func (h HeldHistogramVec) DeleteOldMetrics(holdDuration time.Duration) {
h.store.DeleteOldMetrics(holdDuration, h.vec)
}
Loading

0 comments on commit 10521d4

Please sign in to comment.