Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor kubelet metricsets to share response from endpoint #25782

Merged
merged 8 commits into from
May 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Refactor state_* metricsets to share response from endpoint. {pull}25640[25640]
- Add server id to zookeeper events. {pull}25550[25550]
- Add additional network metrics to docker/network {pull}25354[25354]
- Reduce number of requests done by kubernetes metricsets to kubelet. {pull}25782[25782]

*Packetbeat*

Expand Down
11 changes: 10 additions & 1 deletion metricbeat/module/kubernetes/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
package container

import (
"fmt"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/helper"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
)

Expand Down Expand Up @@ -58,6 +61,7 @@ type MetricSet struct {
mb.BaseMetricSet
http *helper.HTTP
enricher util.Enricher
mod k8smod.Module
}

// New create a new instance of the MetricSet
Expand All @@ -68,10 +72,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}
mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewContainerMetadataEnricher(base, true),
mod: mod,
}, nil
}

Expand All @@ -81,7 +90,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
m.enricher.Start()

body, err := m.http.FetchContent()
body, err := m.mod.GetKubeletStats(m.http)
if err != nil {
return errors.Wrap(err, "error doing HTTP request to fetch 'container' Metricset data")

Expand Down
70 changes: 56 additions & 14 deletions metricbeat/module/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pkg/errors"
dto "github.com/prometheus/client_model/go"

"github.com/elastic/beats/v7/metricbeat/helper"
p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
)
Expand All @@ -38,7 +39,8 @@ func init() {

type Module interface {
mb.Module
GetSharedFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error)
GetStateMetricsFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error)
GetKubeletStats(http *helper.HTTP) ([]byte, error)
}

type familiesCache struct {
Expand All @@ -53,55 +55,95 @@ type kubeStateMetricsCache struct {
}

func (c *kubeStateMetricsCache) getCacheMapEntry(hash uint64) *familiesCache {
c.lock.Lock()
defer c.lock.Unlock()
if _, ok := c.cacheMap[hash]; !ok {
c.cacheMap[hash] = &familiesCache{}
}
return c.cacheMap[hash]
}

type statsCache struct {
sharedStats []byte
lastFetchErr error
lastFetchTimestamp time.Time
}

type kubeletStatsCache struct {
cacheMap map[uint64]*statsCache
lock sync.Mutex
}

func (c *kubeletStatsCache) getCacheMapEntry(hash uint64) *statsCache {
if _, ok := c.cacheMap[hash]; !ok {
c.cacheMap[hash] = &statsCache{}
}
return c.cacheMap[hash]
}

type module struct {
mb.BaseModule

kubeStateMetricsCache *kubeStateMetricsCache
familiesCache *familiesCache
kubeletStatsCache *kubeletStatsCache
cacheHash uint64
}

func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) {
kubeStateMetricsCache := &kubeStateMetricsCache{
cacheMap: make(map[uint64]*familiesCache),
}
kubeletStatsCache := &kubeletStatsCache{
cacheMap: make(map[uint64]*statsCache),
}
return func(base mb.BaseModule) (mb.Module, error) {
hash, err := generateCacheHash(base.Config().Hosts)
if err != nil {
return nil, errors.Wrap(err, "error generating cache hash for kubeStateMetricsCache")
}
// NOTE: These entries will be never removed, this can be a leak if
// metricbeat is used to monitor clusters dynamically created.
// (https://github.com/elastic/beats/pull/25640#discussion_r633395213)
familiesCache := kubeStateMetricsCache.getCacheMapEntry(hash)
m := module{
BaseModule: base,
kubeStateMetricsCache: kubeStateMetricsCache,
familiesCache: familiesCache,
kubeletStatsCache: kubeletStatsCache,
cacheHash: hash,
}
return &m, nil
}
}

func (m *module) GetSharedFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error) {
func (m *module) GetStateMetricsFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error) {
m.kubeStateMetricsCache.lock.Lock()
defer m.kubeStateMetricsCache.lock.Unlock()

now := time.Now()
// NOTE: These entries will be never removed, this can be a leak if
// metricbeat is used to monitor clusters dynamically created.
// (https://github.com/elastic/beats/pull/25640#discussion_r633395213)
familiesCache := m.kubeStateMetricsCache.getCacheMapEntry(m.cacheHash)

if familiesCache.lastFetchTimestamp.IsZero() || now.Sub(familiesCache.lastFetchTimestamp) > m.Config().Period {
familiesCache.sharedFamilies, familiesCache.lastFetchErr = prometheus.GetFamilies()
familiesCache.lastFetchTimestamp = now
}

return familiesCache.sharedFamilies, familiesCache.lastFetchErr
}

func (m *module) GetKubeletStats(http *helper.HTTP) ([]byte, error) {
m.kubeletStatsCache.lock.Lock()
defer m.kubeletStatsCache.lock.Unlock()

now := time.Now()

// NOTE: These entries will be never removed, this can be a leak if
// metricbeat is used to monitor clusters dynamically created.
// (https://github.com/elastic/beats/pull/25640#discussion_r633395213)
statsCache := m.kubeletStatsCache.getCacheMapEntry(m.cacheHash)

if m.familiesCache.lastFetchTimestamp.IsZero() || now.Sub(m.familiesCache.lastFetchTimestamp) > m.Config().Period {
m.familiesCache.sharedFamilies, m.familiesCache.lastFetchErr = prometheus.GetFamilies()
m.familiesCache.lastFetchTimestamp = now
if statsCache.lastFetchTimestamp.IsZero() || now.Sub(statsCache.lastFetchTimestamp) > m.Config().Period {
statsCache.sharedStats, statsCache.lastFetchErr = http.FetchContent()
statsCache.lastFetchTimestamp = now
}

return m.familiesCache.sharedFamilies, m.familiesCache.lastFetchErr
return statsCache.sharedStats, statsCache.lastFetchErr
}

func generateCacheHash(host []string) (uint64, error) {
Expand Down
12 changes: 10 additions & 2 deletions metricbeat/module/kubernetes/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package node

import (
"fmt"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
Expand All @@ -26,6 +28,7 @@ import (
"github.com/elastic/beats/v7/metricbeat/helper"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
)

Expand Down Expand Up @@ -60,6 +63,7 @@ type MetricSet struct {
mb.BaseMetricSet
http *helper.HTTP
enricher util.Enricher
mod k8smod.Module
}

// New create a new instance of the MetricSet
Expand All @@ -70,11 +74,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}

mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, false),
mod: mod,
}, nil
}

Expand All @@ -84,7 +92,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
m.enricher.Start()

body, err := m.http.FetchContent()
body, err := m.mod.GetKubeletStats(m.http)
if err != nil {
return errors.Wrap(err, "error doing HTTP request to fetch 'node' Metricset data")

Expand Down
14 changes: 11 additions & 3 deletions metricbeat/module/kubernetes/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
package pod

import (
"fmt"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common/kubernetes"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/helper"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
)

Expand Down Expand Up @@ -59,6 +62,7 @@ type MetricSet struct {
mb.BaseMetricSet
http *helper.HTTP
enricher util.Enricher
mod k8smod.Module
}

// New create a new instance of the MetricSet
Expand All @@ -69,11 +73,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}

mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, true),
mod: mod,
}, nil
}

Expand All @@ -83,9 +91,9 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
m.enricher.Start()

body, err := m.http.FetchContent()
body, err := m.mod.GetKubeletStats(m.http)
if err != nil {
return errors.Wrap(err, "error doing HTTP request to fetch 'pod' Metricset data")
return errors.Wrap(err, "error fetching shared data for 'pod' Metricset")
}

events, err := eventMapping(body, util.PerfMetrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
m.enricher.Start()

families, err := m.mod.GetSharedFamilies(m.prometheus)
families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
return errors.Wrap(err, "error getting families")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func NewCronJobMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
//
// Copied from other kube state metrics.
func (m *CronJobMetricSet) Fetch(reporter mb.ReporterV2) error {
families, err := m.mod.GetSharedFamilies(m.prometheus)
families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
return errors.Wrap(err, "error getting family metrics")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
m.enricher.Start()

families, err := m.mod.GetSharedFamilies(m.prometheus)
families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
m.enricher.Start()

families, err := m.mod.GetSharedFamilies(m.prometheus)
families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/kubernetes/state_node/state_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
m.enricher.Start()

families, err := m.mod.GetSharedFamilies(m.prometheus)
families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
return errors.Wrap(err, "error doing HTTP request to fetch 'state_node' Metricset data")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func NewPersistentVolumeMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch prometheus metrics and treats those prefixed by mb.ModuleDataKey as
// module rooted fields at the event that gets reported
func (m *PersistentVolumeMetricSet) Fetch(reporter mb.ReporterV2) {
families, err := m.mod.GetSharedFamilies(m.prometheus)
families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func NewpersistentvolumeclaimMetricSet(base mb.BaseMetricSet) (mb.MetricSet, err
// module rooted fields at the event that gets reported
func (m *persistentvolumeclaimMetricSet) Fetch(reporter mb.ReporterV2) error {

families, err := m.mod.GetSharedFamilies(m.prometheus)
families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/kubernetes/state_pod/state_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
m.enricher.Start()

families, err := m.mod.GetSharedFamilies(m.prometheus)
families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
m.enricher.Start()

families, err := m.mod.GetSharedFamilies(m.prometheus)
families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewResourceQuotaMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch prometheus metrics and treats those prefixed by mb.ModuleDataKey as
// module rooted fields at the event that gets reported
func (m *ResourceQuotaMetricSet) Fetch(reporter mb.ReporterV2) {
families, err := m.mod.GetSharedFamilies(m.prometheus)
families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func NewServiceMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *ServiceMetricSet) Fetch(reporter mb.ReporterV2) {
m.enricher.Start()

families, err := m.mod.GetSharedFamilies(m.prometheus)
families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
m.enricher.Start()

families, err := m.mod.GetSharedFamilies(m.prometheus)
families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
Loading