From aaf4e991fa4f72aeeefff76ebf58f8f5029b8e98 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Tue, 5 Mar 2019 11:09:41 +0100 Subject: [PATCH] Ensure kubernetes caches don't expire if they are being read (#10946) (#11059) Some metrics in metricbeat kubernetes module are cached during a time, if they are not updated they are removed. But it is usual to have pods or containers that are not updated during more time that the expiration cache. Current implementation was not renovating expiration times for cache entries so all were eventually removed if updates for them are not received. Replace it with the cache implementation available in libbeat, but keeping the existing interface. Also, use slashes instead of dashes to generate unique container uids. Dashes can be used by kubernetes names, what could lead to ambiguous keys for the caches. Fix #10658 (cherry picked from commit db4b4c25bcb6cd059ca81ff8736fc88d631a539d) --- CHANGELOG.next.asciidoc | 2 + .../module/kubernetes/util/metrics_cache.go | 95 +++++++++---------- .../kubernetes/util/metrics_cache_test.go | 37 +------- 3 files changed, 47 insertions(+), 87 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index dc293903188..559f671decd 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -42,6 +42,8 @@ https://github.com/elastic/beats/compare/v6.6.1...6.6[Check the HEAD diff] *Metricbeat* +- Fix issue in kubernetes module preventing usage percentages to be properly calculated. {pull}10946[10946] + *Packetbeat* - Avoid reporting unknown MongoDB opcodes more than once. {pull}10878[10878] diff --git a/metricbeat/module/kubernetes/util/metrics_cache.go b/metricbeat/module/kubernetes/util/metrics_cache.go index ffa8f235ed5..7ffff06edfe 100644 --- a/metricbeat/module/kubernetes/util/metrics_cache.go +++ b/metricbeat/module/kubernetes/util/metrics_cache.go @@ -18,17 +18,19 @@ package util import ( - "sync" "time" + + "github.com/elastic/beats/libbeat/common" ) // PerfMetrics stores known metrics from Kubernetes nodes and containers var PerfMetrics = NewPerfMetricsCache() -const defaultTimeout = 120 * time.Second +func init() { + PerfMetrics.Start() +} -var now = time.Now -var sleep = time.Sleep +const defaultTimeout = 120 * time.Second // NewPerfMetricsCache initializes and returns a new PerfMetricsCache func NewPerfMetricsCache() *PerfMetricsCache { @@ -43,7 +45,6 @@ func NewPerfMetricsCache() *PerfMetricsCache { // PerfMetricsCache stores known metrics from Kubernetes nodes and containers type PerfMetricsCache struct { - mutex sync.RWMutex NodeMemAllocatable *valueMap NodeCoresAllocatable *valueMap @@ -51,72 +52,64 @@ type PerfMetricsCache struct { ContainerCoresLimit *valueMap } -func newValueMap(timeout time.Duration) *valueMap { - return &valueMap{ - values: map[string]value{}, - timeout: timeout, - } +// Start cache workers +func (c *PerfMetricsCache) Start() { + c.NodeMemAllocatable.Start() + c.NodeCoresAllocatable.Start() + c.ContainerMemLimit.Start() + c.ContainerCoresLimit.Start() } -type valueMap struct { - sync.RWMutex - running bool - timeout time.Duration - values map[string]value +// Stop cache workers +func (c *PerfMetricsCache) Stop() { + c.NodeMemAllocatable.Stop() + c.NodeCoresAllocatable.Stop() + c.ContainerMemLimit.Stop() + c.ContainerCoresLimit.Stop() } -type value struct { - value float64 - expires int64 +type valueMap struct { + cache *common.Cache + timeout time.Duration } -// ContainerUID creates an unique ID for from namespace, pod name and container name -func ContainerUID(namespace, pod, container string) string { - return namespace + "-" + pod + "-" + container +func newValueMap(timeout time.Duration) *valueMap { + return &valueMap{ + cache: common.NewCache(timeout, 0), + timeout: timeout, + } } // Get value func (m *valueMap) Get(name string) float64 { - m.RLock() - defer m.RUnlock() - return m.values[name].value + return m.GetWithDefault(name, 0.0) } // Get value func (m *valueMap) GetWithDefault(name string, def float64) float64 { - m.RLock() - defer m.RUnlock() - val, ok := m.values[name] - if ok { - return val.value + v := m.cache.Get(name) + if v, ok := v.(float64); ok { + return v } return def } // Set value func (m *valueMap) Set(name string, val float64) { - m.Lock() - defer m.Unlock() - m.ensureCleanupWorker() - m.values[name] = value{val, now().Add(m.timeout).Unix()} + m.cache.PutWithTimeout(name, val, m.timeout) } -func (m *valueMap) ensureCleanupWorker() { - if !m.running { - // Run worker to cleanup expired entries - m.running = true - go func() { - for { - sleep(m.timeout) - m.Lock() - now := now().Unix() - for name, val := range m.values { - if now > val.expires { - delete(m.values, name) - } - } - m.Unlock() - } - }() - } +// Start cache workers +func (m *valueMap) Start() { + m.cache.StartJanitor(m.timeout) +} + +// Stop cache workers +func (m *valueMap) Stop() { + m.cache.StopJanitor() +} + +// ContainerUID creates an unique ID for from namespace, pod name and container name +func ContainerUID(namespace, pod, container string) string { + return namespace + "/" + pod + "/" + container } diff --git a/metricbeat/module/kubernetes/util/metrics_cache_test.go b/metricbeat/module/kubernetes/util/metrics_cache_test.go index d5ce7bd2bb8..649c1f5fb86 100644 --- a/metricbeat/module/kubernetes/util/metrics_cache_test.go +++ b/metricbeat/module/kubernetes/util/metrics_cache_test.go @@ -19,45 +19,10 @@ package util import ( "testing" - "time" "github.com/stretchr/testify/assert" ) -func TestTimeout(t *testing.T) { - // Mock monotonic time: - fakeTimeCh := make(chan int64) - go func() { - fakeTime := time.Now().Unix() - for { - fakeTime++ - fakeTimeCh <- fakeTime - } - }() - - now = func() time.Time { - return time.Unix(<-fakeTimeCh, 0) - } - - // Blocking sleep: - sleepCh := make(chan struct{}) - sleep = func(time.Duration) { - <-sleepCh - } - - test := newValueMap(1 * time.Second) - - test.Set("foo", 3.14) - - // Let cleanup do its job - sleepCh <- struct{}{} - sleepCh <- struct{}{} - sleepCh <- struct{}{} - - // Check it expired - assert.Equal(t, 0.0, test.Get("foo")) -} - func TestValueMap(t *testing.T) { test := newValueMap(defaultTimeout) @@ -82,5 +47,5 @@ func TestGetWithDefault(t *testing.T) { } func TestContainerUID(t *testing.T) { - assert.Equal(t, "a-b-c", ContainerUID("a", "b", "c")) + assert.Equal(t, "a/b/c", ContainerUID("a", "b", "c")) }