From 33875069e020ffb73b5c0209563bd22435f13070 Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 10 Jun 2020 14:31:44 +0300 Subject: [PATCH 1/9] Stop counterCache only when already started Signed-off-by: chrismark --- .../module/prometheus/collector/counter.go | 12 ++++++++++++ .../module/prometheus/collector/data.go | 16 ++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/x-pack/metricbeat/module/prometheus/collector/counter.go b/x-pack/metricbeat/module/prometheus/collector/counter.go index 6f0f72d80eb8..854ffa1439b3 100644 --- a/x-pack/metricbeat/module/prometheus/collector/counter.go +++ b/x-pack/metricbeat/module/prometheus/collector/counter.go @@ -30,12 +30,16 @@ type CounterCache interface { // and the value that was given in a previous call, and true if a previous value existed. // It will return 0 and false on the first call. RateFloat64(counterName string, value float64) (float64, bool) + + // Started returns started state of the counterCache + Started() bool } type counterCache struct { ints *common.Cache floats *common.Cache timeout time.Duration + started bool } // NewCounterCache initializes and returns a CounterCache. The timeout parameter will be @@ -45,6 +49,7 @@ func NewCounterCache(timeout time.Duration) CounterCache { ints: common.NewCache(timeout, 0), floats: common.NewCache(timeout, 0), timeout: timeout, + started: false, } } @@ -87,10 +92,17 @@ func (c *counterCache) RateFloat64(counterName string, value float64) (float64, func (c *counterCache) Start() { c.ints.StartJanitor(c.timeout) c.floats.StartJanitor(c.timeout) + c.started = true } // Stop the cache cleanup worker. It mus be called when the cache is disposed func (c *counterCache) Stop() { c.ints.StopJanitor() c.floats.StopJanitor() + c.started = false +} + +// Started returns started state of the counterCache +func (c *counterCache) Started() bool { + return c.started } diff --git a/x-pack/metricbeat/module/prometheus/collector/data.go b/x-pack/metricbeat/module/prometheus/collector/data.go index 8f747ce16ffc..04c90f6d70c7 100644 --- a/x-pack/metricbeat/module/prometheus/collector/data.go +++ b/x-pack/metricbeat/module/prometheus/collector/data.go @@ -7,9 +7,11 @@ package collector import ( "math" "strconv" + "time" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/module/prometheus/collector" @@ -54,6 +56,20 @@ func (g *typedGenerator) Start() { } func (g *typedGenerator) Stop() { + attempts := 0 + maxAttempts := 5 + // this will wait for some time before trying to stop the counterCache in order to handel race conditions + // that might occur with runners started with some delay by autodiscover + for !g.counterCache.Started() { + logp.Debug("prometheus.collector.cache", "counterCache cannot be stopped yet, since it is not started") + if attempts > maxAttempts { + logp.Warn("leaving without stopping counterCache (is not started and cannot be stopped)") + return + } + attempts++ + time.Sleep(5*time.Second) + } + logp.Debug("prometheus.collector.cache", "stopping counterCache") g.counterCache.Stop() } From 4388bd3dcb441e3ec23268bf54dd51a75faa1a9a Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 10 Jun 2020 15:13:53 +0300 Subject: [PATCH 2/9] fmt Signed-off-by: chrismark --- x-pack/metricbeat/module/prometheus/collector/data.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/metricbeat/module/prometheus/collector/data.go b/x-pack/metricbeat/module/prometheus/collector/data.go index 04c90f6d70c7..5a272f4a7720 100644 --- a/x-pack/metricbeat/module/prometheus/collector/data.go +++ b/x-pack/metricbeat/module/prometheus/collector/data.go @@ -67,7 +67,7 @@ func (g *typedGenerator) Stop() { return } attempts++ - time.Sleep(5*time.Second) + time.Sleep(5 * time.Second) } logp.Debug("prometheus.collector.cache", "stopping counterCache") g.counterCache.Stop() From 509e84f760de31983718fa24638c687d6d1df50b Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 16 Jun 2020 11:38:34 +0300 Subject: [PATCH 3/9] Move check after first fetch Signed-off-by: chrismark --- .../module/prometheus/collector/collector.go | 42 +++++++++++++------ .../module/prometheus/collector/config.go | 5 +++ .../module/prometheus/collector/counter.go | 11 ----- .../module/prometheus/collector/data.go | 14 ------- 4 files changed, 35 insertions(+), 37 deletions(-) diff --git a/metricbeat/module/prometheus/collector/collector.go b/metricbeat/module/prometheus/collector/collector.go index 690672ff8297..69f48e00c8fb 100644 --- a/metricbeat/module/prometheus/collector/collector.go +++ b/metricbeat/module/prometheus/collector/collector.go @@ -20,6 +20,7 @@ package collector import ( "regexp" "sync" + "time" "github.com/pkg/errors" dto "github.com/prometheus/client_model/go" @@ -76,13 +77,15 @@ type PromEventsGeneratorFactory func(ms mb.BaseMetricSet) (PromEventsGenerator, // MetricSet for fetching prometheus data type MetricSet struct { mb.BaseMetricSet - prometheus p.Prometheus - includeMetrics []*regexp.Regexp - excludeMetrics []*regexp.Regexp - namespace string - promEventsGen PromEventsGenerator - once sync.Once - host string + prometheus p.Prometheus + includeMetrics []*regexp.Regexp + excludeMetrics []*regexp.Regexp + namespace string + promEventsGen PromEventsGenerator + once sync.Once + host string + eventGenStarted bool + period time.Duration } // MetricSetBuilder returns a builder function for a new Prometheus metricset using @@ -104,10 +107,12 @@ func MetricSetBuilder(namespace string, genFactory PromEventsGeneratorFactory) f } ms := &MetricSet{ - BaseMetricSet: base, - prometheus: prometheus, - namespace: namespace, - promEventsGen: promEventsGen, + BaseMetricSet: base, + prometheus: prometheus, + namespace: namespace, + promEventsGen: promEventsGen, + eventGenStarted: false, + period: config.Period, } // store host here to use it as a pointer when building `up` metric ms.host = ms.Host() @@ -126,7 +131,10 @@ func MetricSetBuilder(namespace string, genFactory PromEventsGeneratorFactory) f // Fetch fetches data and reports it func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { - m.once.Do(m.promEventsGen.Start) + m.once.Do(func() { + m.promEventsGen.Start() + m.eventGenStarted = true + }) families, err := m.prometheus.GetFamilies() eventList := map[string]common.MapStr{} @@ -186,6 +194,16 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { // Close stops the metricset func (m *MetricSet) Close() error { + attempts := 0 + for !m.eventGenStarted { + time.Sleep(m.period) + // waiting 2 periods so as the first Fetch to start the generator, otherwise return since + // Fetch is not called at all + if attempts > 2 { + return nil + } + attempts++ + } m.promEventsGen.Stop() return nil } diff --git a/metricbeat/module/prometheus/collector/config.go b/metricbeat/module/prometheus/collector/config.go index 1a2c5688177c..f2d3b720a15d 100644 --- a/metricbeat/module/prometheus/collector/config.go +++ b/metricbeat/module/prometheus/collector/config.go @@ -17,8 +17,13 @@ package collector +import ( + "time" +) + type metricsetConfig struct { MetricsFilters MetricFilters `config:"metrics_filters" yaml:"metrics_filters,omitempty"` + Period time.Duration `config:"period" yaml:"period"` } type MetricFilters struct { diff --git a/x-pack/metricbeat/module/prometheus/collector/counter.go b/x-pack/metricbeat/module/prometheus/collector/counter.go index 854ffa1439b3..a38f12ccb13b 100644 --- a/x-pack/metricbeat/module/prometheus/collector/counter.go +++ b/x-pack/metricbeat/module/prometheus/collector/counter.go @@ -30,16 +30,12 @@ type CounterCache interface { // and the value that was given in a previous call, and true if a previous value existed. // It will return 0 and false on the first call. RateFloat64(counterName string, value float64) (float64, bool) - - // Started returns started state of the counterCache - Started() bool } type counterCache struct { ints *common.Cache floats *common.Cache timeout time.Duration - started bool } // NewCounterCache initializes and returns a CounterCache. The timeout parameter will be @@ -92,17 +88,10 @@ func (c *counterCache) RateFloat64(counterName string, value float64) (float64, func (c *counterCache) Start() { c.ints.StartJanitor(c.timeout) c.floats.StartJanitor(c.timeout) - c.started = true } // Stop the cache cleanup worker. It mus be called when the cache is disposed func (c *counterCache) Stop() { c.ints.StopJanitor() c.floats.StopJanitor() - c.started = false -} - -// Started returns started state of the counterCache -func (c *counterCache) Started() bool { - return c.started } diff --git a/x-pack/metricbeat/module/prometheus/collector/data.go b/x-pack/metricbeat/module/prometheus/collector/data.go index 5a272f4a7720..23ef386291b1 100644 --- a/x-pack/metricbeat/module/prometheus/collector/data.go +++ b/x-pack/metricbeat/module/prometheus/collector/data.go @@ -7,7 +7,6 @@ package collector import ( "math" "strconv" - "time" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" @@ -56,19 +55,6 @@ func (g *typedGenerator) Start() { } func (g *typedGenerator) Stop() { - attempts := 0 - maxAttempts := 5 - // this will wait for some time before trying to stop the counterCache in order to handel race conditions - // that might occur with runners started with some delay by autodiscover - for !g.counterCache.Started() { - logp.Debug("prometheus.collector.cache", "counterCache cannot be stopped yet, since it is not started") - if attempts > maxAttempts { - logp.Warn("leaving without stopping counterCache (is not started and cannot be stopped)") - return - } - attempts++ - time.Sleep(5 * time.Second) - } logp.Debug("prometheus.collector.cache", "stopping counterCache") g.counterCache.Stop() } From 0d04d9f2f0c6cc84e569daf7ed218f02e7cf5548 Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 16 Jun 2020 11:44:24 +0300 Subject: [PATCH 4/9] fix Signed-off-by: chrismark --- x-pack/metricbeat/module/prometheus/collector/counter.go | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/metricbeat/module/prometheus/collector/counter.go b/x-pack/metricbeat/module/prometheus/collector/counter.go index a38f12ccb13b..6f0f72d80eb8 100644 --- a/x-pack/metricbeat/module/prometheus/collector/counter.go +++ b/x-pack/metricbeat/module/prometheus/collector/counter.go @@ -45,7 +45,6 @@ func NewCounterCache(timeout time.Duration) CounterCache { ints: common.NewCache(timeout, 0), floats: common.NewCache(timeout, 0), timeout: timeout, - started: false, } } From ec649f79f257b0cdb65bee7e200503691282c089 Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 16 Jun 2020 12:02:34 +0300 Subject: [PATCH 5/9] fmt Signed-off-by: chrismark --- x-pack/filebeat/input/cloudfoundry/input.go | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/filebeat/input/cloudfoundry/input.go b/x-pack/filebeat/input/cloudfoundry/input.go index 82bf9d668076..fc38d85fc348 100644 --- a/x-pack/filebeat/input/cloudfoundry/input.go +++ b/x-pack/filebeat/input/cloudfoundry/input.go @@ -9,7 +9,6 @@ import ( "github.com/elastic/beats/v7/filebeat/channel" "github.com/elastic/beats/v7/filebeat/input" - "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/libbeat/logp" From 929a2d5f89eaf1264c84d2a9a3e18174774af134 Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 16 Jun 2020 12:07:03 +0300 Subject: [PATCH 6/9] Add changelog Signed-off-by: chrismark --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 356a067fb670..1040add809f2 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -248,6 +248,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix compute and pubsub dashboard for googlecloud module. {issue}18962[18962] {pull}18980[18980] - Fix crash on vsphere module when Host information is not available. {issue}18996[18996] {pull}19078[19078] - Fix incorrect usage of hints builder when exposed port is a substring of the hint {pull}19052[19052] +- Stop counterCache only when already started {pull}19103[19103] *Packetbeat* From 2e13cfa302b8c4dbbdb9285b1e5d3398171a7912 Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 16 Jun 2020 12:37:47 +0300 Subject: [PATCH 7/9] review fixes Signed-off-by: chrismark --- .../module/prometheus/collector/collector.go | 15 ++------------- metricbeat/module/prometheus/collector/config.go | 5 ----- 2 files changed, 2 insertions(+), 18 deletions(-) diff --git a/metricbeat/module/prometheus/collector/collector.go b/metricbeat/module/prometheus/collector/collector.go index 69f48e00c8fb..30cd587a182a 100644 --- a/metricbeat/module/prometheus/collector/collector.go +++ b/metricbeat/module/prometheus/collector/collector.go @@ -20,7 +20,6 @@ package collector import ( "regexp" "sync" - "time" "github.com/pkg/errors" dto "github.com/prometheus/client_model/go" @@ -85,7 +84,6 @@ type MetricSet struct { once sync.Once host string eventGenStarted bool - period time.Duration } // MetricSetBuilder returns a builder function for a new Prometheus metricset using @@ -112,7 +110,6 @@ func MetricSetBuilder(namespace string, genFactory PromEventsGeneratorFactory) f namespace: namespace, promEventsGen: promEventsGen, eventGenStarted: false, - period: config.Period, } // store host here to use it as a pointer when building `up` metric ms.host = ms.Host() @@ -194,17 +191,9 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { // Close stops the metricset func (m *MetricSet) Close() error { - attempts := 0 - for !m.eventGenStarted { - time.Sleep(m.period) - // waiting 2 periods so as the first Fetch to start the generator, otherwise return since - // Fetch is not called at all - if attempts > 2 { - return nil - } - attempts++ + if m.eventGenStarted { + m.promEventsGen.Stop() } - m.promEventsGen.Stop() return nil } diff --git a/metricbeat/module/prometheus/collector/config.go b/metricbeat/module/prometheus/collector/config.go index f2d3b720a15d..1a2c5688177c 100644 --- a/metricbeat/module/prometheus/collector/config.go +++ b/metricbeat/module/prometheus/collector/config.go @@ -17,13 +17,8 @@ package collector -import ( - "time" -) - type metricsetConfig struct { MetricsFilters MetricFilters `config:"metrics_filters" yaml:"metrics_filters,omitempty"` - Period time.Duration `config:"period" yaml:"period"` } type MetricFilters struct { From e553dd02d70aa5e26078d01819816c81e0f808f4 Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 17 Jun 2020 10:13:41 +0300 Subject: [PATCH 8/9] review changes Signed-off-by: chrismark --- metricbeat/module/prometheus/collector/collector.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/metricbeat/module/prometheus/collector/collector.go b/metricbeat/module/prometheus/collector/collector.go index 30cd587a182a..fbb64a10b7ca 100644 --- a/metricbeat/module/prometheus/collector/collector.go +++ b/metricbeat/module/prometheus/collector/collector.go @@ -19,8 +19,6 @@ package collector import ( "regexp" - "sync" - "github.com/pkg/errors" dto "github.com/prometheus/client_model/go" @@ -81,7 +79,6 @@ type MetricSet struct { excludeMetrics []*regexp.Regexp namespace string promEventsGen PromEventsGenerator - once sync.Once host string eventGenStarted bool } @@ -128,10 +125,10 @@ func MetricSetBuilder(namespace string, genFactory PromEventsGeneratorFactory) f // Fetch fetches data and reports it func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { - m.once.Do(func() { + if !m.eventGenStarted { m.promEventsGen.Start() m.eventGenStarted = true - }) + } families, err := m.prometheus.GetFamilies() eventList := map[string]common.MapStr{} From f9d2a9385ed6d3f5053d4e5ab42f183ace0d24d5 Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 17 Jun 2020 10:15:33 +0300 Subject: [PATCH 9/9] fmt Signed-off-by: chrismark --- metricbeat/module/prometheus/collector/collector.go | 1 + 1 file changed, 1 insertion(+) diff --git a/metricbeat/module/prometheus/collector/collector.go b/metricbeat/module/prometheus/collector/collector.go index fbb64a10b7ca..6941f30bd8ae 100644 --- a/metricbeat/module/prometheus/collector/collector.go +++ b/metricbeat/module/prometheus/collector/collector.go @@ -19,6 +19,7 @@ package collector import ( "regexp" + "github.com/pkg/errors" dto "github.com/prometheus/client_model/go"