From a87b0a087f3d9fd5131ec7964a0c4d8f28bbe70f Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 19 May 2021 12:43:05 +0300 Subject: [PATCH 1/8] Refactor kubelet metricsets to share response from endpoint Signed-off-by: chrismark --- metricbeat/module/kubernetes/kubernetes.go | 67 ++++++++++++++++--- metricbeat/module/kubernetes/node/node.go | 11 ++- metricbeat/module/kubernetes/pod/pod.go | 13 +++- metricbeat/module/kubernetes/system/system.go | 10 ++- metricbeat/module/kubernetes/volume/volume.go | 10 ++- 5 files changed, 94 insertions(+), 17 deletions(-) diff --git a/metricbeat/module/kubernetes/kubernetes.go b/metricbeat/module/kubernetes/kubernetes.go index 53cd037f5e0..e0c08bf6df5 100644 --- a/metricbeat/module/kubernetes/kubernetes.go +++ b/metricbeat/module/kubernetes/kubernetes.go @@ -18,6 +18,7 @@ package kubernetes import ( + "github.com/elastic/beats/v7/metricbeat/helper" "sync" "time" @@ -39,6 +40,7 @@ func init() { type Module interface { mb.Module GetSharedFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error) + GetSharedKubeletStats(http *helper.HTTP) ([]byte, error) } type familiesCache struct { @@ -61,30 +63,51 @@ func (c *kubeStateMetricsCache) getCacheMapEntry(hash uint64) *familiesCache { return c.cacheMap[hash] } +type statsCache struct { + sharedStats []byte + lastFetchErr error + lastFetchTimestamp time.Time +} + +type kubeletStatsCache struct { + cacheMap map[uint64]*statsCache + lock sync.Mutex +} + +func (c *kubeletStatsCache) getCacheMapEntry(hash uint64) *statsCache { + c.lock.Lock() + defer c.lock.Unlock() + if _, ok := c.cacheMap[hash]; !ok { + c.cacheMap[hash] = &statsCache{} + } + return c.cacheMap[hash] +} + type module struct { mb.BaseModule kubeStateMetricsCache *kubeStateMetricsCache - familiesCache *familiesCache + kubeletStatsCache *kubeletStatsCache + cacheHash uint64 } func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) { kubeStateMetricsCache := &kubeStateMetricsCache{ cacheMap: make(map[uint64]*familiesCache), } + kubeletStatsCache := &kubeletStatsCache{ + cacheMap: make(map[uint64]*statsCache), + } return func(base mb.BaseModule) (mb.Module, error) { hash, err := generateCacheHash(base.Config().Hosts) if err != nil { return nil, errors.Wrap(err, "error generating cache hash for kubeStateMetricsCache") } - // NOTE: These entries will be never removed, this can be a leak if - // metricbeat is used to monitor clusters dynamically created. - // (https://github.com/elastic/beats/pull/25640#discussion_r633395213) - familiesCache := kubeStateMetricsCache.getCacheMapEntry(hash) m := module{ BaseModule: base, kubeStateMetricsCache: kubeStateMetricsCache, - familiesCache: familiesCache, + kubeletStatsCache: kubeletStatsCache, + cacheHash: hash, } return &m, nil } @@ -96,12 +119,36 @@ func (m *module) GetSharedFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily now := time.Now() - if m.familiesCache.lastFetchTimestamp.IsZero() || now.Sub(m.familiesCache.lastFetchTimestamp) > m.Config().Period { - m.familiesCache.sharedFamilies, m.familiesCache.lastFetchErr = prometheus.GetFamilies() - m.familiesCache.lastFetchTimestamp = now + // NOTE: These entries will be never removed, this can be a leak if + // metricbeat is used to monitor clusters dynamically created. + // (https://github.com/elastic/beats/pull/25640#discussion_r633395213) + familiesCache := m.kubeStateMetricsCache.getCacheMapEntry(m.cacheHash) + + if familiesCache.lastFetchTimestamp.IsZero() || now.Sub(familiesCache.lastFetchTimestamp) > m.Config().Period { + familiesCache.sharedFamilies, familiesCache.lastFetchErr = prometheus.GetFamilies() + familiesCache.lastFetchTimestamp = now + } + + return familiesCache.sharedFamilies, familiesCache.lastFetchErr +} + +func (m *module) GetSharedKubeletStats(http *helper.HTTP) ([]byte, error) { + m.kubeletStatsCache.lock.Lock() + defer m.kubeletStatsCache.lock.Unlock() + + now := time.Now() + + // NOTE: These entries will be never removed, this can be a leak if + // metricbeat is used to monitor clusters dynamically created. + // (https://github.com/elastic/beats/pull/25640#discussion_r633395213) + statsCache := m.kubeletStatsCache.getCacheMapEntry(m.cacheHash) + + if statsCache.lastFetchTimestamp.IsZero() || now.Sub(statsCache.lastFetchTimestamp) > m.Config().Period { + statsCache.sharedStats, statsCache.lastFetchErr = http.FetchContent() + statsCache.lastFetchTimestamp = now } - return m.familiesCache.sharedFamilies, m.familiesCache.lastFetchErr + return statsCache.sharedStats, statsCache.lastFetchErr } func generateCacheHash(host []string) (uint64, error) { diff --git a/metricbeat/module/kubernetes/node/node.go b/metricbeat/module/kubernetes/node/node.go index 9868f3ec2cd..f2b1d8c9931 100644 --- a/metricbeat/module/kubernetes/node/node.go +++ b/metricbeat/module/kubernetes/node/node.go @@ -18,6 +18,7 @@ package node import ( + "fmt" "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/common" @@ -26,6 +27,7 @@ import ( "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" ) @@ -60,6 +62,7 @@ type MetricSet struct { mb.BaseMetricSet http *helper.HTTP enricher util.Enricher + mod k8smod.Module } // New create a new instance of the MetricSet @@ -70,11 +73,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } - + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } return &MetricSet{ BaseMetricSet: base, http: http, enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, false), + mod: mod, }, nil } @@ -84,7 +91,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { m.enricher.Start() - body, err := m.http.FetchContent() + body, err := m.mod.GetSharedKubeletStats(m.http) if err != nil { return errors.Wrap(err, "error doing HTTP request to fetch 'node' Metricset data") diff --git a/metricbeat/module/kubernetes/pod/pod.go b/metricbeat/module/kubernetes/pod/pod.go index 4c47cb6863c..dd0e2c9b7e8 100644 --- a/metricbeat/module/kubernetes/pod/pod.go +++ b/metricbeat/module/kubernetes/pod/pod.go @@ -18,6 +18,7 @@ package pod import ( + "fmt" "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/common/kubernetes" @@ -25,6 +26,7 @@ import ( "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" ) @@ -59,6 +61,7 @@ type MetricSet struct { mb.BaseMetricSet http *helper.HTTP enricher util.Enricher + mod k8smod.Module } // New create a new instance of the MetricSet @@ -69,11 +72,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } - + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } return &MetricSet{ BaseMetricSet: base, http: http, enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, true), + mod: mod, }, nil } @@ -83,9 +90,9 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { m.enricher.Start() - body, err := m.http.FetchContent() + body, err := m.mod.GetSharedKubeletStats(m.http) if err != nil { - return errors.Wrap(err, "error doing HTTP request to fetch 'pod' Metricset data") + return errors.Wrap(err, "error fetching shared data for 'pod' Metricset") } events, err := eventMapping(body, util.PerfMetrics) diff --git a/metricbeat/module/kubernetes/system/system.go b/metricbeat/module/kubernetes/system/system.go index b10c9c4afb3..007521edb5b 100644 --- a/metricbeat/module/kubernetes/system/system.go +++ b/metricbeat/module/kubernetes/system/system.go @@ -18,12 +18,14 @@ package system import ( + "fmt" "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" ) const ( @@ -56,6 +58,7 @@ func init() { type MetricSet struct { mb.BaseMetricSet http *helper.HTTP + mod k8smod.Module } // New create a new instance of the MetricSet @@ -66,9 +69,14 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } return &MetricSet{ BaseMetricSet: base, http: http, + mod: mod, }, nil } @@ -76,7 +84,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { - body, err := m.http.FetchContent() + body, err := m.mod.GetSharedKubeletStats(m.http) if err != nil { return errors.Wrap(err, "error doing HTTP request to fetch 'system' Metricset data") } diff --git a/metricbeat/module/kubernetes/volume/volume.go b/metricbeat/module/kubernetes/volume/volume.go index 102d8edfabc..4001c13a339 100644 --- a/metricbeat/module/kubernetes/volume/volume.go +++ b/metricbeat/module/kubernetes/volume/volume.go @@ -18,12 +18,14 @@ package volume import ( + "fmt" "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" ) const ( @@ -56,6 +58,7 @@ func init() { type MetricSet struct { mb.BaseMetricSet http *helper.HTTP + mod k8smod.Module } // New create a new instance of the MetricSet @@ -66,9 +69,14 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } return &MetricSet{ BaseMetricSet: base, http: http, + mod: mod, }, nil } @@ -76,7 +84,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { - body, err := m.http.FetchContent() + body, err := m.mod.GetSharedKubeletStats(m.http) if err != nil { return errors.Wrap(err, "error doing HTTP request to fetch 'volume' Metricset data") } From 7342b544e33a9fc828264280461da67eb0e81f72 Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 19 May 2021 15:28:43 +0300 Subject: [PATCH 2/8] fmt Signed-off-by: chrismark --- metricbeat/module/kubernetes/kubernetes.go | 11 ++++++----- metricbeat/module/kubernetes/node/node.go | 5 +++-- metricbeat/module/kubernetes/pod/pod.go | 5 +++-- metricbeat/module/kubernetes/system/system.go | 5 +++-- metricbeat/module/kubernetes/volume/volume.go | 5 +++-- 5 files changed, 18 insertions(+), 13 deletions(-) diff --git a/metricbeat/module/kubernetes/kubernetes.go b/metricbeat/module/kubernetes/kubernetes.go index e0c08bf6df5..5808e41682d 100644 --- a/metricbeat/module/kubernetes/kubernetes.go +++ b/metricbeat/module/kubernetes/kubernetes.go @@ -18,10 +18,11 @@ package kubernetes import ( - "github.com/elastic/beats/v7/metricbeat/helper" "sync" "time" + "github.com/elastic/beats/v7/metricbeat/helper" + "github.com/mitchellh/hashstructure" "github.com/pkg/errors" dto "github.com/prometheus/client_model/go" @@ -87,8 +88,8 @@ type module struct { mb.BaseModule kubeStateMetricsCache *kubeStateMetricsCache - kubeletStatsCache *kubeletStatsCache - cacheHash uint64 + kubeletStatsCache *kubeletStatsCache + cacheHash uint64 } func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) { @@ -106,8 +107,8 @@ func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) { m := module{ BaseModule: base, kubeStateMetricsCache: kubeStateMetricsCache, - kubeletStatsCache: kubeletStatsCache, - cacheHash: hash, + kubeletStatsCache: kubeletStatsCache, + cacheHash: hash, } return &m, nil } diff --git a/metricbeat/module/kubernetes/node/node.go b/metricbeat/module/kubernetes/node/node.go index f2b1d8c9931..84e9ff61cc2 100644 --- a/metricbeat/module/kubernetes/node/node.go +++ b/metricbeat/module/kubernetes/node/node.go @@ -19,6 +19,7 @@ package node import ( "fmt" + "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/common" @@ -62,7 +63,7 @@ type MetricSet struct { mb.BaseMetricSet http *helper.HTTP enricher util.Enricher - mod k8smod.Module + mod k8smod.Module } // New create a new instance of the MetricSet @@ -81,7 +82,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { BaseMetricSet: base, http: http, enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, false), - mod: mod, + mod: mod, }, nil } diff --git a/metricbeat/module/kubernetes/pod/pod.go b/metricbeat/module/kubernetes/pod/pod.go index dd0e2c9b7e8..dab5c744d59 100644 --- a/metricbeat/module/kubernetes/pod/pod.go +++ b/metricbeat/module/kubernetes/pod/pod.go @@ -19,6 +19,7 @@ package pod import ( "fmt" + "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/common/kubernetes" @@ -61,7 +62,7 @@ type MetricSet struct { mb.BaseMetricSet http *helper.HTTP enricher util.Enricher - mod k8smod.Module + mod k8smod.Module } // New create a new instance of the MetricSet @@ -80,7 +81,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { BaseMetricSet: base, http: http, enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, true), - mod: mod, + mod: mod, }, nil } diff --git a/metricbeat/module/kubernetes/system/system.go b/metricbeat/module/kubernetes/system/system.go index 007521edb5b..8b600133c70 100644 --- a/metricbeat/module/kubernetes/system/system.go +++ b/metricbeat/module/kubernetes/system/system.go @@ -19,6 +19,7 @@ package system import ( "fmt" + "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/logp" @@ -58,7 +59,7 @@ func init() { type MetricSet struct { mb.BaseMetricSet http *helper.HTTP - mod k8smod.Module + mod k8smod.Module } // New create a new instance of the MetricSet @@ -76,7 +77,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, http: http, - mod: mod, + mod: mod, }, nil } diff --git a/metricbeat/module/kubernetes/volume/volume.go b/metricbeat/module/kubernetes/volume/volume.go index 4001c13a339..8f866408129 100644 --- a/metricbeat/module/kubernetes/volume/volume.go +++ b/metricbeat/module/kubernetes/volume/volume.go @@ -19,6 +19,7 @@ package volume import ( "fmt" + "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/logp" @@ -58,7 +59,7 @@ func init() { type MetricSet struct { mb.BaseMetricSet http *helper.HTTP - mod k8smod.Module + mod k8smod.Module } // New create a new instance of the MetricSet @@ -76,7 +77,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, http: http, - mod: mod, + mod: mod, }, nil } From 2d3d4a673bf44260818a18382d362ba6bb349224 Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 19 May 2021 16:48:08 +0300 Subject: [PATCH 3/8] fix locks Signed-off-by: chrismark --- metricbeat/module/kubernetes/kubernetes.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/metricbeat/module/kubernetes/kubernetes.go b/metricbeat/module/kubernetes/kubernetes.go index 5808e41682d..326155ebf31 100644 --- a/metricbeat/module/kubernetes/kubernetes.go +++ b/metricbeat/module/kubernetes/kubernetes.go @@ -18,6 +18,7 @@ package kubernetes import ( + "fmt" "sync" "time" @@ -56,8 +57,6 @@ type kubeStateMetricsCache struct { } func (c *kubeStateMetricsCache) getCacheMapEntry(hash uint64) *familiesCache { - c.lock.Lock() - defer c.lock.Unlock() if _, ok := c.cacheMap[hash]; !ok { c.cacheMap[hash] = &familiesCache{} } @@ -76,8 +75,6 @@ type kubeletStatsCache struct { } func (c *kubeletStatsCache) getCacheMapEntry(hash uint64) *statsCache { - c.lock.Lock() - defer c.lock.Unlock() if _, ok := c.cacheMap[hash]; !ok { c.cacheMap[hash] = &statsCache{} } @@ -119,7 +116,7 @@ func (m *module) GetSharedFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily defer m.kubeStateMetricsCache.lock.Unlock() now := time.Now() - + fmt.Println("hahahah") // NOTE: These entries will be never removed, this can be a leak if // metricbeat is used to monitor clusters dynamically created. // (https://github.com/elastic/beats/pull/25640#discussion_r633395213) @@ -137,6 +134,7 @@ func (m *module) GetSharedKubeletStats(http *helper.HTTP) ([]byte, error) { m.kubeletStatsCache.lock.Lock() defer m.kubeletStatsCache.lock.Unlock() + fmt.Println("hehehehe") now := time.Now() // NOTE: These entries will be never removed, this can be a leak if From 7f3632a19203f37060711242df48027de2755084 Mon Sep 17 00:00:00 2001 From: chrismark Date: Thu, 20 May 2021 13:36:38 +0300 Subject: [PATCH 4/8] fixup Signed-off-by: chrismark --- metricbeat/module/kubernetes/container/container.go | 11 ++++++++++- metricbeat/module/kubernetes/kubernetes.go | 3 --- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/metricbeat/module/kubernetes/container/container.go b/metricbeat/module/kubernetes/container/container.go index 718696a0a4f..cae43bf5e81 100644 --- a/metricbeat/module/kubernetes/container/container.go +++ b/metricbeat/module/kubernetes/container/container.go @@ -18,12 +18,15 @@ package container import ( + "fmt" + "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" ) @@ -58,6 +61,7 @@ type MetricSet struct { mb.BaseMetricSet http *helper.HTTP enricher util.Enricher + mod k8smod.Module } // New create a new instance of the MetricSet @@ -68,10 +72,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } return &MetricSet{ BaseMetricSet: base, http: http, enricher: util.NewContainerMetadataEnricher(base, true), + mod: mod, }, nil } @@ -81,7 +90,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { m.enricher.Start() - body, err := m.http.FetchContent() + body, err := m.mod.GetSharedKubeletStats(m.http) if err != nil { return errors.Wrap(err, "error doing HTTP request to fetch 'container' Metricset data") diff --git a/metricbeat/module/kubernetes/kubernetes.go b/metricbeat/module/kubernetes/kubernetes.go index 326155ebf31..ff2596147a2 100644 --- a/metricbeat/module/kubernetes/kubernetes.go +++ b/metricbeat/module/kubernetes/kubernetes.go @@ -18,7 +18,6 @@ package kubernetes import ( - "fmt" "sync" "time" @@ -116,7 +115,6 @@ func (m *module) GetSharedFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily defer m.kubeStateMetricsCache.lock.Unlock() now := time.Now() - fmt.Println("hahahah") // NOTE: These entries will be never removed, this can be a leak if // metricbeat is used to monitor clusters dynamically created. // (https://github.com/elastic/beats/pull/25640#discussion_r633395213) @@ -134,7 +132,6 @@ func (m *module) GetSharedKubeletStats(http *helper.HTTP) ([]byte, error) { m.kubeletStatsCache.lock.Lock() defer m.kubeletStatsCache.lock.Unlock() - fmt.Println("hehehehe") now := time.Now() // NOTE: These entries will be never removed, this can be a leak if From c26d2c41a59b9e30aab1aa018ed96ef9fca9c23b Mon Sep 17 00:00:00 2001 From: chrismark Date: Thu, 20 May 2021 13:56:05 +0300 Subject: [PATCH 5/8] Add changelog Signed-off-by: chrismark --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index d85b8f1529c..162573c2770 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -992,6 +992,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Refactor state_* metricsets to share response from endpoint. {pull}25640[25640] - Add server id to zookeeper events. {pull}25550[25550] - Add additional network metrics to docker/network {pull}25354[25354] +- [Kubernetes] Refactor kubelet metricsets to share response from endpoint. {pull}25782[25782] *Packetbeat* From 2b7606a29a0fbdd81dc61ab138399b6a07beb5a4 Mon Sep 17 00:00:00 2001 From: chrismark Date: Thu, 20 May 2021 13:58:17 +0300 Subject: [PATCH 6/8] Fix import order Signed-off-by: chrismark --- metricbeat/module/kubernetes/kubernetes.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/metricbeat/module/kubernetes/kubernetes.go b/metricbeat/module/kubernetes/kubernetes.go index ff2596147a2..b4f17978cbb 100644 --- a/metricbeat/module/kubernetes/kubernetes.go +++ b/metricbeat/module/kubernetes/kubernetes.go @@ -21,12 +21,11 @@ import ( "sync" "time" - "github.com/elastic/beats/v7/metricbeat/helper" - "github.com/mitchellh/hashstructure" "github.com/pkg/errors" dto "github.com/prometheus/client_model/go" + "github.com/elastic/beats/v7/metricbeat/helper" p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" ) From 8fabd99790c8e8d9a5bcf038e58ec10a6c0245d7 Mon Sep 17 00:00:00 2001 From: Chris Mark Date: Thu, 27 May 2021 15:53:09 +0300 Subject: [PATCH 7/8] Update CHANGELOG.next.asciidoc Co-authored-by: Jaime Soriano Pastor --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 162573c2770..400b13f2720 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -992,7 +992,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Refactor state_* metricsets to share response from endpoint. {pull}25640[25640] - Add server id to zookeeper events. {pull}25550[25550] - Add additional network metrics to docker/network {pull}25354[25354] -- [Kubernetes] Refactor kubelet metricsets to share response from endpoint. {pull}25782[25782] +- Reduce number of requests done by kubernetes metricsets to kubelet. {pull}25782[25782] *Packetbeat* From 8cfa7ed2bf21eecba362ab61953a368f948b7666 Mon Sep 17 00:00:00 2001 From: chrismark Date: Thu, 27 May 2021 15:58:27 +0300 Subject: [PATCH 8/8] Improve method naming Signed-off-by: chrismark --- metricbeat/module/kubernetes/container/container.go | 2 +- metricbeat/module/kubernetes/kubernetes.go | 8 ++++---- metricbeat/module/kubernetes/node/node.go | 2 +- metricbeat/module/kubernetes/pod/pod.go | 2 +- .../module/kubernetes/state_container/state_container.go | 2 +- .../module/kubernetes/state_cronjob/state_cronjob.go | 2 +- .../module/kubernetes/state_daemonset/state_daemonset.go | 2 +- .../kubernetes/state_deployment/state_deployment.go | 2 +- metricbeat/module/kubernetes/state_node/state_node.go | 2 +- .../state_persistentvolume/state_persistentvolume.go | 2 +- .../state_persistentvolumeclaim.go | 2 +- metricbeat/module/kubernetes/state_pod/state_pod.go | 2 +- .../kubernetes/state_replicaset/state_replicaset.go | 2 +- .../kubernetes/state_resourcequota/state_resourcequota.go | 2 +- .../module/kubernetes/state_service/state_service.go | 2 +- .../kubernetes/state_statefulset/state_statefulset.go | 2 +- .../kubernetes/state_storageclass/state_storageclass.go | 2 +- metricbeat/module/kubernetes/system/system.go | 2 +- metricbeat/module/kubernetes/volume/volume.go | 2 +- 19 files changed, 22 insertions(+), 22 deletions(-) diff --git a/metricbeat/module/kubernetes/container/container.go b/metricbeat/module/kubernetes/container/container.go index cae43bf5e81..8aa30094c70 100644 --- a/metricbeat/module/kubernetes/container/container.go +++ b/metricbeat/module/kubernetes/container/container.go @@ -90,7 +90,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { m.enricher.Start() - body, err := m.mod.GetSharedKubeletStats(m.http) + body, err := m.mod.GetKubeletStats(m.http) if err != nil { return errors.Wrap(err, "error doing HTTP request to fetch 'container' Metricset data") diff --git a/metricbeat/module/kubernetes/kubernetes.go b/metricbeat/module/kubernetes/kubernetes.go index b4f17978cbb..f9bd0e29e37 100644 --- a/metricbeat/module/kubernetes/kubernetes.go +++ b/metricbeat/module/kubernetes/kubernetes.go @@ -39,8 +39,8 @@ func init() { type Module interface { mb.Module - GetSharedFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error) - GetSharedKubeletStats(http *helper.HTTP) ([]byte, error) + GetStateMetricsFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error) + GetKubeletStats(http *helper.HTTP) ([]byte, error) } type familiesCache struct { @@ -109,7 +109,7 @@ func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) { } } -func (m *module) GetSharedFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error) { +func (m *module) GetStateMetricsFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error) { m.kubeStateMetricsCache.lock.Lock() defer m.kubeStateMetricsCache.lock.Unlock() @@ -127,7 +127,7 @@ func (m *module) GetSharedFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily return familiesCache.sharedFamilies, familiesCache.lastFetchErr } -func (m *module) GetSharedKubeletStats(http *helper.HTTP) ([]byte, error) { +func (m *module) GetKubeletStats(http *helper.HTTP) ([]byte, error) { m.kubeletStatsCache.lock.Lock() defer m.kubeletStatsCache.lock.Unlock() diff --git a/metricbeat/module/kubernetes/node/node.go b/metricbeat/module/kubernetes/node/node.go index 84e9ff61cc2..14e606fc887 100644 --- a/metricbeat/module/kubernetes/node/node.go +++ b/metricbeat/module/kubernetes/node/node.go @@ -92,7 +92,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { m.enricher.Start() - body, err := m.mod.GetSharedKubeletStats(m.http) + body, err := m.mod.GetKubeletStats(m.http) if err != nil { return errors.Wrap(err, "error doing HTTP request to fetch 'node' Metricset data") diff --git a/metricbeat/module/kubernetes/pod/pod.go b/metricbeat/module/kubernetes/pod/pod.go index dab5c744d59..d9cabf5a414 100644 --- a/metricbeat/module/kubernetes/pod/pod.go +++ b/metricbeat/module/kubernetes/pod/pod.go @@ -91,7 +91,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { m.enricher.Start() - body, err := m.mod.GetSharedKubeletStats(m.http) + body, err := m.mod.GetKubeletStats(m.http) if err != nil { return errors.Wrap(err, "error fetching shared data for 'pod' Metricset") } diff --git a/metricbeat/module/kubernetes/state_container/state_container.go b/metricbeat/module/kubernetes/state_container/state_container.go index d2bab6fc216..4e6c416fca3 100644 --- a/metricbeat/module/kubernetes/state_container/state_container.go +++ b/metricbeat/module/kubernetes/state_container/state_container.go @@ -120,7 +120,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { m.enricher.Start() - families, err := m.mod.GetSharedFamilies(m.prometheus) + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { return errors.Wrap(err, "error getting families") } diff --git a/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go b/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go index 809a9b82f29..272dc58aca6 100644 --- a/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go +++ b/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go @@ -87,7 +87,7 @@ func NewCronJobMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { // // Copied from other kube state metrics. func (m *CronJobMetricSet) Fetch(reporter mb.ReporterV2) error { - families, err := m.mod.GetSharedFamilies(m.prometheus) + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { return errors.Wrap(err, "error getting family metrics") } diff --git a/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go b/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go index 484d42dd8c2..7a97692f120 100644 --- a/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go +++ b/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go @@ -101,7 +101,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) { m.enricher.Start() - families, err := m.mod.GetSharedFamilies(m.prometheus) + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/state_deployment/state_deployment.go b/metricbeat/module/kubernetes/state_deployment/state_deployment.go index 64704e09d5d..fecad904c72 100644 --- a/metricbeat/module/kubernetes/state_deployment/state_deployment.go +++ b/metricbeat/module/kubernetes/state_deployment/state_deployment.go @@ -102,7 +102,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) { m.enricher.Start() - families, err := m.mod.GetSharedFamilies(m.prometheus) + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/state_node/state_node.go b/metricbeat/module/kubernetes/state_node/state_node.go index 4a31b2170a8..3a14dce8357 100644 --- a/metricbeat/module/kubernetes/state_node/state_node.go +++ b/metricbeat/module/kubernetes/state_node/state_node.go @@ -113,7 +113,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { m.enricher.Start() - families, err := m.mod.GetSharedFamilies(m.prometheus) + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { return errors.Wrap(err, "error doing HTTP request to fetch 'state_node' Metricset data") } diff --git a/metricbeat/module/kubernetes/state_persistentvolume/state_persistentvolume.go b/metricbeat/module/kubernetes/state_persistentvolume/state_persistentvolume.go index f61172d3b39..90847c63137 100644 --- a/metricbeat/module/kubernetes/state_persistentvolume/state_persistentvolume.go +++ b/metricbeat/module/kubernetes/state_persistentvolume/state_persistentvolume.go @@ -77,7 +77,7 @@ func NewPersistentVolumeMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch prometheus metrics and treats those prefixed by mb.ModuleDataKey as // module rooted fields at the event that gets reported func (m *PersistentVolumeMetricSet) Fetch(reporter mb.ReporterV2) { - families, err := m.mod.GetSharedFamilies(m.prometheus) + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/state_persistentvolumeclaim/state_persistentvolumeclaim.go b/metricbeat/module/kubernetes/state_persistentvolumeclaim/state_persistentvolumeclaim.go index 54d26c57a3a..3cf414c498a 100644 --- a/metricbeat/module/kubernetes/state_persistentvolumeclaim/state_persistentvolumeclaim.go +++ b/metricbeat/module/kubernetes/state_persistentvolumeclaim/state_persistentvolumeclaim.go @@ -82,7 +82,7 @@ func NewpersistentvolumeclaimMetricSet(base mb.BaseMetricSet) (mb.MetricSet, err // module rooted fields at the event that gets reported func (m *persistentvolumeclaimMetricSet) Fetch(reporter mb.ReporterV2) error { - families, err := m.mod.GetSharedFamilies(m.prometheus) + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { return err } diff --git a/metricbeat/module/kubernetes/state_pod/state_pod.go b/metricbeat/module/kubernetes/state_pod/state_pod.go index 4be4aea5023..89d7325537a 100644 --- a/metricbeat/module/kubernetes/state_pod/state_pod.go +++ b/metricbeat/module/kubernetes/state_pod/state_pod.go @@ -104,7 +104,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) { m.enricher.Start() - families, err := m.mod.GetSharedFamilies(m.prometheus) + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go b/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go index ee955f691a3..72f5e50fb33 100644 --- a/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go +++ b/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go @@ -102,7 +102,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) { m.enricher.Start() - families, err := m.mod.GetSharedFamilies(m.prometheus) + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/state_resourcequota/state_resourcequota.go b/metricbeat/module/kubernetes/state_resourcequota/state_resourcequota.go index ccb3d94a935..eb8bc6ddf35 100644 --- a/metricbeat/module/kubernetes/state_resourcequota/state_resourcequota.go +++ b/metricbeat/module/kubernetes/state_resourcequota/state_resourcequota.go @@ -76,7 +76,7 @@ func NewResourceQuotaMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch prometheus metrics and treats those prefixed by mb.ModuleDataKey as // module rooted fields at the event that gets reported func (m *ResourceQuotaMetricSet) Fetch(reporter mb.ReporterV2) { - families, err := m.mod.GetSharedFamilies(m.prometheus) + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/state_service/state_service.go b/metricbeat/module/kubernetes/state_service/state_service.go index a656d21e86c..efc489da8e0 100644 --- a/metricbeat/module/kubernetes/state_service/state_service.go +++ b/metricbeat/module/kubernetes/state_service/state_service.go @@ -95,7 +95,7 @@ func NewServiceMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *ServiceMetricSet) Fetch(reporter mb.ReporterV2) { m.enricher.Start() - families, err := m.mod.GetSharedFamilies(m.prometheus) + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go b/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go index 9641b62df51..eb59e4127f4 100644 --- a/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go +++ b/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go @@ -101,7 +101,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(reporter mb.ReporterV2) { m.enricher.Start() - families, err := m.mod.GetSharedFamilies(m.prometheus) + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/state_storageclass/state_storageclass.go b/metricbeat/module/kubernetes/state_storageclass/state_storageclass.go index 85e598d388d..820a052a05a 100644 --- a/metricbeat/module/kubernetes/state_storageclass/state_storageclass.go +++ b/metricbeat/module/kubernetes/state_storageclass/state_storageclass.go @@ -78,7 +78,7 @@ func NewStorageClassMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch prometheus metrics and treats those prefixed by mb.ModuleDataKey as // module rooted fields at the event that gets reported func (m *StorageClassMetricSet) Fetch(reporter mb.ReporterV2) { - families, err := m.mod.GetSharedFamilies(m.prometheus) + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/system/system.go b/metricbeat/module/kubernetes/system/system.go index 8b600133c70..80e520d22a7 100644 --- a/metricbeat/module/kubernetes/system/system.go +++ b/metricbeat/module/kubernetes/system/system.go @@ -85,7 +85,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { - body, err := m.mod.GetSharedKubeletStats(m.http) + body, err := m.mod.GetKubeletStats(m.http) if err != nil { return errors.Wrap(err, "error doing HTTP request to fetch 'system' Metricset data") } diff --git a/metricbeat/module/kubernetes/volume/volume.go b/metricbeat/module/kubernetes/volume/volume.go index 8f866408129..77b446a56f1 100644 --- a/metricbeat/module/kubernetes/volume/volume.go +++ b/metricbeat/module/kubernetes/volume/volume.go @@ -85,7 +85,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { - body, err := m.mod.GetSharedKubeletStats(m.http) + body, err := m.mod.GetKubeletStats(m.http) if err != nil { return errors.Wrap(err, "error doing HTTP request to fetch 'volume' Metricset data") }