From 4fa200031514da52c2606bf5038298bc7642ebe7 Mon Sep 17 00:00:00 2001 From: MichaelKatsoulis Date: Thu, 22 Sep 2022 13:44:06 +0300 Subject: [PATCH 1/8] Backport #29133 to 7.17 --- .../autodiscover/providers/kubernetes/pod.go | 3 - .../common/kubernetes/metadata/metadata.go | 2 +- .../kubernetes/metadata/namespace_test.go | 3 + libbeat/common/kubernetes/metadata/pod.go | 29 ++- .../common/kubernetes/metadata/pod_test.go | 154 +++++++++++++- .../common/kubernetes/metadata/resource.go | 2 +- .../add_kubernetes_metadata/indexers_test.go | 8 +- .../add_kubernetes_metadata/kubernetes.go | 3 - metricbeat/module/kubernetes/_meta/config.yml | 9 + .../module/kubernetes/util/kubernetes.go | 196 +++++++++++------- .../module/kubernetes/util/kubernetes_test.go | 4 +- metricbeat/modules.d/kubernetes.yml.disabled | 9 + .../composable/providers/kubernetes/pod.go | 4 +- 13 files changed, 323 insertions(+), 103 deletions(-) diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index de63c3925e2..ecb9ede8c82 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -97,9 +97,6 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub options.Namespace = config.Namespace } metaConf := config.AddResourceMetadata - if metaConf == nil { - metaConf = metadata.GetDefaultResourceMetadataConfig() - } nodeWatcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, options, nil) if err != nil { logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err) diff --git a/libbeat/common/kubernetes/metadata/metadata.go b/libbeat/common/kubernetes/metadata/metadata.go index 9b493210255..9398dc7afb9 100644 --- a/libbeat/common/kubernetes/metadata/metadata.go +++ b/libbeat/common/kubernetes/metadata/metadata.go @@ -101,7 +101,7 @@ func GetPodMetaGen( if namespaceWatcher != nil && metaConf.Namespace.Enabled() { namespaceMetaGen = NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store(), namespaceWatcher.Client()) } - metaGen := NewPodMetadataGenerator(cfg, podWatcher.Store(), podWatcher.Client(), nodeMetaGen, namespaceMetaGen) + metaGen := NewPodMetadataGenerator(cfg, podWatcher.Store(), podWatcher.Client(), nodeMetaGen, namespaceMetaGen, metaConf) return metaGen } diff --git a/libbeat/common/kubernetes/metadata/namespace_test.go b/libbeat/common/kubernetes/metadata/namespace_test.go index 50b52f086d1..d3df2d41a0d 100644 --- a/libbeat/common/kubernetes/metadata/namespace_test.go +++ b/libbeat/common/kubernetes/metadata/namespace_test.go @@ -51,9 +51,11 @@ func TestNamespace_Generate(t *testing.T) { UID: types.UID(uid), Labels: map[string]string{ "foo": "bar", + "key": "value", }, Annotations: map[string]string{ "spam": "baz", + "key": "value", }, }, TypeMeta: metav1.TypeMeta{ @@ -88,6 +90,7 @@ func TestNamespace_Generate(t *testing.T) { } cfg, err := common.NewConfigFrom(Config{ + IncludeLabels: []string{"foo"}, IncludeAnnotations: []string{"spam"}, }) if err != nil { diff --git a/libbeat/common/kubernetes/metadata/pod.go b/libbeat/common/kubernetes/metadata/pod.go index b0b8576f9b4..5db87bfbe79 100644 --- a/libbeat/common/kubernetes/metadata/pod.go +++ b/libbeat/common/kubernetes/metadata/pod.go @@ -29,11 +29,12 @@ import ( ) type pod struct { - store cache.Store - client k8s.Interface - node MetaGen - namespace MetaGen - resource *Resource + store cache.Store + client k8s.Interface + node MetaGen + namespace MetaGen + resource *Resource + addResourceMetadata *AddResourceMetadataConfig } // NewPodMetadataGenerator creates a metagen for pod resources @@ -42,13 +43,19 @@ func NewPodMetadataGenerator( pods cache.Store, client k8s.Interface, node MetaGen, - namespace MetaGen) MetaGen { + namespace MetaGen, + addResourceMetadata *AddResourceMetadataConfig) MetaGen { + + if addResourceMetadata == nil { + addResourceMetadata = GetDefaultResourceMetadataConfig() + } return &pod{ - resource: NewResourceMetadataGenerator(cfg, client), - store: pods, - node: node, - namespace: namespace, - client: client, + resource: NewResourceMetadataGenerator(cfg, client), + store: pods, + node: node, + namespace: namespace, + client: client, + addResourceMetadata: addResourceMetadata, } } diff --git a/libbeat/common/kubernetes/metadata/pod_test.go b/libbeat/common/kubernetes/metadata/pod_test.go index 341d4eb10a0..aeacda89eb8 100644 --- a/libbeat/common/kubernetes/metadata/pod_test.go +++ b/libbeat/common/kubernetes/metadata/pod_test.go @@ -429,7 +429,7 @@ func TestPod_Generate(t *testing.T) { }) assert.Nil(t, err) - metagen := NewPodMetadataGenerator(config, nil, client, nil, nil) + metagen := NewPodMetadataGenerator(config, nil, client, nil, nil, nil) for _, test := range tests { t.Run(test.name, func(t *testing.T) { assert.Equal(t, test.output, metagen.Generate(test.input)) @@ -551,7 +551,7 @@ func TestPod_GenerateFromName(t *testing.T) { assert.Nil(t, err) pods := cache.NewStore(cache.MetaNamespaceKeyFunc) pods.Add(test.input) - metagen := NewPodMetadataGenerator(config, pods, client, nil, nil) + metagen := NewPodMetadataGenerator(config, pods, client, nil, nil, nil) accessor, err := meta.Accessor(test.input) require.Nil(t, err) @@ -673,7 +673,155 @@ func TestPod_GenerateWithNodeNamespace(t *testing.T) { namespaces.Add(test.namespace) nsMeta := NewNamespaceMetadataGenerator(config, namespaces, client) - metagen := NewPodMetadataGenerator(config, pods, client, nodeMeta, nsMeta) + metagen := NewPodMetadataGenerator(config, pods, client, nodeMeta, nsMeta, nil) + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.output, metagen.Generate(test.input)) + }) + } +} + +func TestPod_GenerateWithNodeNamespaceWithAddResourceConfig(t *testing.T) { + client := k8sfake.NewSimpleClientset() + uid := "005f3b90-4b9d-12f8-acf0-31020a840133" + namespace := "default" + name := "obj" + boolean := true + + tests := []struct { + input kubernetes.Resource + node kubernetes.Resource + namespace kubernetes.Resource + output common.MapStr + name string + }{ + { + name: "test simple object", + input: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{ + "app.kubernetes.io/component": "exporter", + }, + Annotations: map[string]string{ + "app": "production", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps", + Kind: "ReplicaSet", + Name: "nginx-rs", + UID: "005f3b90-4b9d-12f8-acf0-31020a8409087", + Controller: &boolean, + }, + }, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + + Spec: v1.PodSpec{ + NodeName: "testnode", + }, + Status: v1.PodStatus{PodIP: "127.0.0.5"}, + }, + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testnode", + UID: types.UID(uid), + Labels: map[string]string{ + "nodekey": "nodevalue", + "nodekey2": "nodevalue2", + }, + Annotations: map[string]string{}, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Node", + APIVersion: "v1", + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{{Type: v1.NodeHostName, Address: "node1"}}, + }, + }, + namespace: &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + UID: types.UID(uid), + Labels: map[string]string{ + "app.kubernetes.io/name": "kube-state-metrics", + "nskey2": "nsvalue2", + }, + Annotations: map[string]string{}, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + }, + output: common.MapStr{"kubernetes": common.MapStr{ + "pod": common.MapStr{ + "name": "obj", + "uid": uid, + "ip": "127.0.0.5", + }, + "namespace": "default", + "namespace_uid": uid, + "namespace_labels": common.MapStr{ + "app_kubernetes_io/name": "kube-state-metrics", + }, + "node": common.MapStr{ + "name": "testnode", + "uid": uid, + "labels": common.MapStr{ + "nodekey2": "nodevalue2", + }, + "hostname": "node1", + }, + "labels": common.MapStr{ + "app_kubernetes_io/component": "exporter", + }, + "annotations": common.MapStr{ + "app": "production", + }, + "replicaset": common.MapStr{ + "name": "nginx-rs", + }, + }}, + }, + } + + for _, test := range tests { + config, err := common.NewConfigFrom(map[string]interface{}{ + "include_annotations": []string{"app"}, + }) + + assert.NoError(t, err) + + namespaceConfig, _ := common.NewConfigFrom(map[string]interface{}{ + "include_labels": []string{"app.kubernetes.io/name"}, + }) + nodeConfig, _ := common.NewConfigFrom(map[string]interface{}{ + "include_labels": []string{"nodekey2"}, + }) + metaConfig := AddResourceMetadataConfig{ + Namespace: namespaceConfig, + Node: nodeConfig, + } + + pods := cache.NewStore(cache.MetaNamespaceKeyFunc) + pods.Add(test.input) + + nodes := cache.NewStore(cache.MetaNamespaceKeyFunc) + nodes.Add(test.node) + nodeMeta := NewNodeMetadataGenerator(nodeConfig, nodes, client) + + namespaces := cache.NewStore(cache.MetaNamespaceKeyFunc) + namespaces.Add(test.namespace) + nsMeta := NewNamespaceMetadataGenerator(namespaceConfig, namespaces, client) + + metagen := NewPodMetadataGenerator(config, pods, client, nodeMeta, nsMeta, &metaConfig) t.Run(test.name, func(t *testing.T) { assert.Equal(t, test.output, metagen.Generate(test.input)) }) diff --git a/libbeat/common/kubernetes/metadata/resource.go b/libbeat/common/kubernetes/metadata/resource.go index 854f074a266..4fa71d26d11 100644 --- a/libbeat/common/kubernetes/metadata/resource.go +++ b/libbeat/common/kubernetes/metadata/resource.go @@ -85,7 +85,7 @@ func (r *Resource) GenerateK8s(kind string, obj kubernetes.Resource, options ... return nil } - labelMap := common.MapStr{} + var labelMap common.MapStr if len(r.config.IncludeLabels) == 0 { labelMap = GenerateMap(accessor.GetLabels(), r.config.LabelsDedot) } else { diff --git a/libbeat/processors/add_kubernetes_metadata/indexers_test.go b/libbeat/processors/add_kubernetes_metadata/indexers_test.go index 3e5169f1349..de3cc918400 100644 --- a/libbeat/processors/add_kubernetes_metadata/indexers_test.go +++ b/libbeat/processors/add_kubernetes_metadata/indexers_test.go @@ -32,7 +32,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common/kubernetes" ) -var metagen = metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil, nil) +var metagen = metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil, nil, nil) func TestPodIndexer(t *testing.T) { var testConfig = common.NewConfig() @@ -90,7 +90,7 @@ func TestPodIndexer(t *testing.T) { func TestPodUIDIndexer(t *testing.T) { var testConfig = common.NewConfig() - metaGenWithPodUID := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil, nil) + metaGenWithPodUID := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil, nil, nil) podUIDIndexer, err := NewPodUIDIndexer(*testConfig, metaGenWithPodUID) assert.Nil(t, err) @@ -301,7 +301,7 @@ func TestFilteredGenMeta(t *testing.T) { }) assert.Nil(t, err) - filteredGen := metadata.NewPodMetadataGenerator(config, nil, nil, nil, nil) + filteredGen := metadata.NewPodMetadataGenerator(config, nil, nil, nil, nil, nil) podIndexer, err = NewPodNameIndexer(*testConfig, filteredGen) assert.Nil(t, err) @@ -338,7 +338,7 @@ func TestFilteredGenMetaExclusion(t *testing.T) { }) assert.Nil(t, err) - filteredGen := metadata.NewPodMetadataGenerator(config, nil, nil, nil, nil) + filteredGen := metadata.NewPodMetadataGenerator(config, nil, nil, nil, nil, nil) podIndexer, err := NewPodNameIndexer(*testConfig, filteredGen) assert.Nil(t, err) diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index 2f68d6cb044..fb74a9908b2 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -191,9 +191,6 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *common.Confi } metaConf := config.AddResourceMetadata - if metaConf == nil { - metaConf = metadata.GetDefaultResourceMetadataConfig() - } options := kubernetes.WatchOptions{ SyncTimeout: config.SyncPeriod, diff --git a/metricbeat/module/kubernetes/_meta/config.yml b/metricbeat/module/kubernetes/_meta/config.yml index 7bf79176381..d728eb356e2 100644 --- a/metricbeat/module/kubernetes/_meta/config.yml +++ b/metricbeat/module/kubernetes/_meta/config.yml @@ -23,6 +23,15 @@ # If kube_config is not set, KUBECONFIG environment variable will be checked # and if not present it will fall back to InCluster #kube_config: ~/.kube/config + # To configure additionally node and namespace metadata, added to pod, service and container resource types, + # `add_resource_metadata` can be defined. + # By default all labels will be included while annotations are not added by default. + # add_resource_metadata: + # namespace: + # include_labels: ["namespacelabel1"] + # node: + # include_labels: ["nodelabel2"] + # include_annotations: ["nodeannotation1"] # State metrics from kube-state-metrics service: #- module: kubernetes diff --git a/metricbeat/module/kubernetes/util/kubernetes.go b/metricbeat/module/kubernetes/util/kubernetes.go index 34c5ea00dcc..cf234eb4e8f 100644 --- a/metricbeat/module/kubernetes/util/kubernetes.go +++ b/metricbeat/module/kubernetes/util/kubernetes.go @@ -48,11 +48,14 @@ type Enricher interface { } type kubernetesConfig struct { + KubeConfig string `config:"kube_config"` + + Host string `config:"host"` + SyncPeriod time.Duration `config:"sync_period"` + // AddMetadata enables enriching metricset events with metadata from the API server - AddMetadata bool `config:"add_metadata"` - KubeConfig string `config:"kube_config"` - Host string `config:"host"` - SyncPeriod time.Duration `config:"sync_period"` + AddMetadata bool `config:"add_metadata"` + AddResourceMetadata *metadata.AddResourceMetadataConfig `config:"add_resource_metadata"` } type enricher struct { @@ -62,57 +65,13 @@ type enricher struct { watcher kubernetes.Watcher watcherStarted bool watcherStartedLock sync.Mutex + namespaceWatcher kubernetes.Watcher + nodeWatcher kubernetes.Watcher isPod bool } const selector = "kubernetes" -// GetWatcher initializes a kubernetes watcher with the given -// scope (node or cluster), and resource type -func GetWatcher(base mb.BaseMetricSet, resource kubernetes.Resource, nodeScope bool) (kubernetes.Watcher, error) { - config := kubernetesConfig{ - AddMetadata: true, - SyncPeriod: time.Minute * 10, - } - if err := base.Module().UnpackConfig(&config); err != nil { - return nil, err - } - - // Return nil if metadata enriching is disabled: - if !config.AddMetadata { - return nil, nil - } - - client, err := kubernetes.GetKubernetesClient(config.KubeConfig) - if err != nil { - return nil, err - } - - options := kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - } - - log := logp.NewLogger(selector) - - // Watch objects in the node only - if nodeScope { - nd := &kubernetes.DiscoverKubernetesNodeParams{ - ConfigHost: config.Host, - Client: client, - IsInCluster: kubernetes.IsInCluster(config.KubeConfig), - HostUtils: &kubernetes.DefaultDiscoveryUtils{}, - } - options.Node, err = kubernetes.DiscoverKubernetesNode(log, nd) - if err != nil { - return nil, fmt.Errorf("couldn't discover kubernetes node: %w", err) - } - } - - log.Debugf("Initializing a new Kubernetes watcher using host: %v", config.Host) - - return kubernetes.NewWatcher(client, resource, options, nil) -} - // NewResourceMetadataEnricher returns an Enricher configured for kubernetes resource events func NewResourceMetadataEnricher( base mb.BaseMetricSet, @@ -120,29 +79,31 @@ func NewResourceMetadataEnricher( metricsRepo *MetricsRepo, nodeScope bool) Enricher { - watcher, err := GetWatcher(base, res, nodeScope) - if err != nil { - logp.Err("Error initializing Kubernetes metadata enricher: %s", err) + config := validatedConfig(base) + if config == nil { + logp.Info("Kubernetes metricset enriching is disabled") return &nilEnricher{} } + watcher, nodeWatcher, namespaceWatcher := getResourceMetadataWatchers(config, res, nodeScope) if watcher == nil { - logp.Info("Kubernetes metricset enriching is disabled") return &nilEnricher{} } - metaConfig := metadata.Config{} - if err := base.Module().UnpackConfig(&metaConfig); err != nil { + // GetPodMetaGen requires cfg of type Config + commonMetaConfig := metadata.Config{} + if err := base.Module().UnpackConfig(&commonMetaConfig); err != nil { logp.Err("Error initializing Kubernetes metadata enricher: %s", err) return &nilEnricher{} } - cfg, _ := common.NewConfigFrom(&metaConfig) + cfg, _ := common.NewConfigFrom(&commonMetaConfig) metaGen := metadata.NewResourceMetadataGenerator(cfg, watcher.Client()) - podMetaGen := metadata.NewPodMetadataGenerator(cfg, nil, watcher.Client(), nil, nil) - serviceMetaGen := metadata.NewServiceMetadataGenerator(cfg, nil, nil, watcher.Client()) - enricher := buildMetadataEnricher(watcher, + podMetaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, config.AddResourceMetadata) + namespaceMeta := metadata.NewNamespaceMetadataGenerator(config.AddResourceMetadata.Namespace, namespaceWatcher.Store(), watcher.Client()) + serviceMetaGen := metadata.NewServiceMetadataGenerator(cfg, watcher.Store(), namespaceMeta, watcher.Client()) + enricher := buildMetadataEnricher(watcher, nodeWatcher, namespaceWatcher, // update func(m map[string]common.MapStr, r kubernetes.Resource) { accessor, _ := meta.Accessor(r) @@ -220,27 +181,26 @@ func NewContainerMetadataEnricher( metricsRepo *MetricsRepo, nodeScope bool) Enricher { - watcher, err := GetWatcher(base, &kubernetes.Pod{}, nodeScope) - if err != nil { - logp.Err("Error initializing Kubernetes metadata enricher: %s", err) + config := validatedConfig(base) + if config == nil { + logp.Info("Kubernetes metricset enriching is disabled") return &nilEnricher{} } + watcher, nodeWatcher, namespaceWatcher := getResourceMetadataWatchers(config, &kubernetes.Pod{}, nodeScope) if watcher == nil { - logp.Info("Kubernetes metricset enriching is disabled") return &nilEnricher{} } - metaConfig := metadata.Config{} - if err := base.Module().UnpackConfig(&metaConfig); err != nil { + commonMetaConfig := metadata.Config{} + if err := base.Module().UnpackConfig(&commonMetaConfig); err != nil { logp.Err("Error initializing Kubernetes metadata enricher: %s", err) return &nilEnricher{} } + cfg, _ := common.NewConfigFrom(&commonMetaConfig) + metaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, config.AddResourceMetadata) - cfg, _ := common.NewConfigFrom(&metaConfig) - - metaGen := metadata.NewPodMetadataGenerator(cfg, nil, watcher.Client(), nil, nil) - enricher := buildMetadataEnricher(watcher, + enricher := buildMetadataEnricher(watcher, nodeWatcher, namespaceWatcher, // update func(m map[string]common.MapStr, r kubernetes.Resource) { pod := r.(*kubernetes.Pod) @@ -294,6 +254,75 @@ func NewContainerMetadataEnricher( return enricher } +func getResourceMetadataWatchers(config *kubernetesConfig, resource kubernetes.Resource, nodeScope bool) (kubernetes.Watcher, kubernetes.Watcher, kubernetes.Watcher) { + client, err := kubernetes.GetKubernetesClient(config.KubeConfig) + if err != nil { + logp.Err("Error creating Kubernetes client: %s", err) + return nil, nil, nil + } + + options := kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + } + + log := logp.NewLogger(selector) + + // Watch objects in the node only + if nodeScope { + nd := &kubernetes.DiscoverKubernetesNodeParams{ + ConfigHost: config.Host, + Client: client, + IsInCluster: kubernetes.IsInCluster(config.KubeConfig), + HostUtils: &kubernetes.DefaultDiscoveryUtils{}, + } + options.Node, err = kubernetes.DiscoverKubernetesNode(log, nd) + if err != nil { + logp.Err("Couldn't discover kubernetes node: %s", err) + return nil, nil, nil + } + } + + log.Debugf("Initializing a new Kubernetes watcher using host: %v", config.Host) + + watcher, err := kubernetes.NewWatcher(client, resource, options, nil) + if err != nil { + logp.Err("Error initializing Kubernetes watcher: %s", err) + return nil, nil, nil + } + + nodeWatcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, options, nil) + if err != nil { + logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.Node{}, err) + return watcher, nil, nil + } + + namespaceWatcher, err := kubernetes.NewWatcher(client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + }, nil) + if err != nil { + logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + return watcher, nodeWatcher, nil + } + + return watcher, nodeWatcher, namespaceWatcher +} + +func validatedConfig(base mb.BaseMetricSet) *kubernetesConfig { + config := kubernetesConfig{ + AddMetadata: true, + SyncPeriod: time.Minute * 10, + AddResourceMetadata: metadata.GetDefaultResourceMetadataConfig(), + } + if err := base.Module().UnpackConfig(&config); err != nil { + return nil + } + + // Return nil if metadata enriching is disabled: + if !config.AddMetadata { + return nil + } + return &config +} func getString(m common.MapStr, key string) string { val, err := m.GetValue(key) @@ -311,14 +340,18 @@ func join(fields ...string) string { func buildMetadataEnricher( watcher kubernetes.Watcher, + nodeWatcher kubernetes.Watcher, + namespaceWatcher kubernetes.Watcher, update func(map[string]common.MapStr, kubernetes.Resource), delete func(map[string]common.MapStr, kubernetes.Resource), index func(e common.MapStr) string) *enricher { enricher := enricher{ - metadata: map[string]common.MapStr{}, - index: index, - watcher: watcher, + metadata: map[string]common.MapStr{}, + index: index, + watcher: watcher, + nodeWatcher: nodeWatcher, + namespaceWatcher: namespaceWatcher, } watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{ @@ -345,6 +378,16 @@ func buildMetadataEnricher( func (m *enricher) Start() { m.watcherStartedLock.Lock() defer m.watcherStartedLock.Unlock() + if m.nodeWatcher != nil { + if err := m.nodeWatcher.Start(); err != nil { + logp.Warn("Error starting node watcher: %s", err) + } + } + if m.namespaceWatcher != nil { + if err := m.namespaceWatcher.Start(); err != nil { + logp.Warn("Error starting namespace watcher: %s", err) + } + } if !m.watcherStarted { err := m.watcher.Start() if err != nil { @@ -361,6 +404,13 @@ func (m *enricher) Stop() { m.watcher.Stop() m.watcherStarted = false } + if m.namespaceWatcher != nil { + m.namespaceWatcher.Stop() + } + + if m.nodeWatcher != nil { + m.nodeWatcher.Stop() + } } func (m *enricher) Enrich(events []common.MapStr) { diff --git a/metricbeat/module/kubernetes/util/kubernetes_test.go b/metricbeat/module/kubernetes/util/kubernetes_test.go index 4ab9acc9d5d..83bbd3dd1f7 100644 --- a/metricbeat/module/kubernetes/util/kubernetes_test.go +++ b/metricbeat/module/kubernetes/util/kubernetes_test.go @@ -36,6 +36,8 @@ import ( func TestBuildMetadataEnricher(t *testing.T) { watcher := mockWatcher{} + nodeWatcher := mockWatcher{} + namespaceWatcher := mockWatcher{} funcs := mockFuncs{} resource := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -48,7 +50,7 @@ func TestBuildMetadataEnricher(t *testing.T) { }, } - enricher := buildMetadataEnricher(&watcher, funcs.update, funcs.delete, funcs.index) + enricher := buildMetadataEnricher(&watcher, &nodeWatcher, &namespaceWatcher, funcs.update, funcs.delete, funcs.index) assert.NotNil(t, watcher.handler) enricher.Start() diff --git a/metricbeat/modules.d/kubernetes.yml.disabled b/metricbeat/modules.d/kubernetes.yml.disabled index 451d1882e54..a0793347051 100644 --- a/metricbeat/modules.d/kubernetes.yml.disabled +++ b/metricbeat/modules.d/kubernetes.yml.disabled @@ -26,6 +26,15 @@ # If kube_config is not set, KUBECONFIG environment variable will be checked # and if not present it will fall back to InCluster #kube_config: ~/.kube/config + # To configure additionally node and namespace metadata, added to pod, service and container resource types, + # `add_resource_metadata` can be defined. + # By default all labels will be included while annotations are not added by default. + # add_resource_metadata: + # namespace: + # include_labels: ["namespacelabel1"] + # node: + # include_labels: ["nodelabel2"] + # include_annotations: ["nodeannotation1"] # State metrics from kube-state-metrics service: #- module: kubernetes diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go index 80b28fcab47..fde83af20f8 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go @@ -89,9 +89,7 @@ func NewPodEventer( Node: cfg.Node, } metaConf := cfg.AddResourceMetadata - if metaConf == nil { - metaConf = metadata.GetDefaultResourceMetadataConfig() - } + nodeWatcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, options, nil) if err != nil { logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err) From 33d58841f18be50c83ac709319e7a78f2f68abb2 Mon Sep 17 00:00:00 2001 From: MichaelKatsoulis Date: Thu, 22 Sep 2022 14:21:42 +0300 Subject: [PATCH 2/8] Test fail --- libbeat/autodiscover/providers/kubernetes/pod_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/autodiscover/providers/kubernetes/pod_test.go b/libbeat/autodiscover/providers/kubernetes/pod_test.go index 3c89a5acf20..5388eeb4e6d 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod_test.go +++ b/libbeat/autodiscover/providers/kubernetes/pod_test.go @@ -1907,7 +1907,7 @@ func TestPod_EmitEvent(t *testing.T) { t.Fatal(err) } - metaGen := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, client, nil, nil) + metaGen := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, client, nil, nil, nil) p := &Provider{ config: defaultConfig(), bus: bus.New(logp.NewLogger("bus"), "test"), From 6d40104f5a8591ba7e5ce937a410c5579413e1c7 Mon Sep 17 00:00:00 2001 From: MichaelKatsoulis Date: Thu, 22 Sep 2022 14:43:58 +0300 Subject: [PATCH 3/8] Fix lint errors --- libbeat/autodiscover/providers/kubernetes/pod.go | 4 ++-- libbeat/common/kubernetes/metadata/metadata.go | 6 +++--- libbeat/common/kubernetes/metadata/namespace_test.go | 2 +- libbeat/common/kubernetes/metadata/pod.go | 8 ++++---- libbeat/common/kubernetes/metadata/pod_test.go | 2 +- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index ecb9ede8c82..d75e09def96 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -86,7 +86,7 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub HonorReSyncs: true, }, nil) if err != nil { - return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Pod{}, err) + return nil, fmt.Errorf("couldn't create watcher for %T due to error %w", &kubernetes.Pod{}, err) } options := kubernetes.WatchOptions{ @@ -392,7 +392,7 @@ func (p *pod) containerPodEvents(flag string, pod *kubernetes.Pod, c *containerI var events []bus.Event portsMap := common.MapStr{} - meta.Put("container", cmeta) + _, _ = meta.Put("container", cmeta) for _, port := range ports { event := bus.Event{ diff --git a/libbeat/common/kubernetes/metadata/metadata.go b/libbeat/common/kubernetes/metadata/metadata.go index 9398dc7afb9..3dee3d26d59 100644 --- a/libbeat/common/kubernetes/metadata/metadata.go +++ b/libbeat/common/kubernetes/metadata/metadata.go @@ -110,7 +110,7 @@ func GetPodMetaGen( func GetKubernetesClusterIdentifier(cfg *common.Config, client k8sclient.Interface) (ClusterInfo, error) { // try with kube config file var config Config - config.Unmarshal(cfg) + _ = config.Unmarshal(cfg) clusterInfo, err := getClusterInfoFromKubeConfigFile(config.KubeConfig) if err == nil { return clusterInfo, nil @@ -130,7 +130,7 @@ func getClusterInfoFromKubeadmConfigMap(client k8sclient.Interface) (ClusterInfo } cm, err := client.CoreV1().ConfigMaps("kube-system").Get(context.TODO(), "kubeadm-config", metav1.GetOptions{}) if err != nil { - return clusterInfo, fmt.Errorf("unable to get cluster identifiers from kubeadm-config: %+v", err) + return clusterInfo, fmt.Errorf("unable to get cluster identifiers from kubeadm-config: %w", err) } p, ok := cm.Data["ClusterConfiguration"] if !ok { @@ -163,7 +163,7 @@ func getClusterInfoFromKubeConfigFile(kubeconfig string) (ClusterInfo, error) { cfg, err := kubernetes.BuildConfig(kubeconfig) if err != nil { - return ClusterInfo{}, fmt.Errorf("unable to build kube config due to error: %+v", err) + return ClusterInfo{}, fmt.Errorf("unable to build kube config due to error: %w", err) } kube_cfg, err := clientcmd.LoadFromFile(kubeconfig) diff --git a/libbeat/common/kubernetes/metadata/namespace_test.go b/libbeat/common/kubernetes/metadata/namespace_test.go index d3df2d41a0d..5ce7829c65c 100644 --- a/libbeat/common/kubernetes/metadata/namespace_test.go +++ b/libbeat/common/kubernetes/metadata/namespace_test.go @@ -165,7 +165,7 @@ func TestNamespace_GenerateFromName(t *testing.T) { } namespaces := cache.NewStore(cache.MetaNamespaceKeyFunc) - namespaces.Add(test.input) + _ = namespaces.Add(test.input) metagen := NewNamespaceMetadataGenerator(cfg, namespaces, client) accessor, err := meta.Accessor(test.input) diff --git a/libbeat/common/kubernetes/metadata/pod.go b/libbeat/common/kubernetes/metadata/pod.go index 5db87bfbe79..0c321522817 100644 --- a/libbeat/common/kubernetes/metadata/pod.go +++ b/libbeat/common/kubernetes/metadata/pod.go @@ -95,19 +95,19 @@ func (p *pod) GenerateK8s(obj kubernetes.Resource, opts ...FieldOptions) common. if rsName, ok := rsName.(string); ok { dep := p.getRSDeployment(rsName, po.GetNamespace()) if dep != "" { - out.Put("deployment.name", dep) + _, _ = out.Put("deployment.name", dep) } } if p.node != nil { meta := p.node.GenerateFromName(po.Spec.NodeName, WithMetadata("node")) if meta != nil { - out.Put("node", meta["node"]) + _, _ = out.Put("node", meta["node"]) } else { - out.Put("node.name", po.Spec.NodeName) + _, _ = out.Put("node.name", po.Spec.NodeName) } } else { - out.Put("node.name", po.Spec.NodeName) + _, _ = out.Put("node.name", po.Spec.NodeName) } if p.namespace != nil { diff --git a/libbeat/common/kubernetes/metadata/pod_test.go b/libbeat/common/kubernetes/metadata/pod_test.go index aeacda89eb8..1f9fc8def04 100644 --- a/libbeat/common/kubernetes/metadata/pod_test.go +++ b/libbeat/common/kubernetes/metadata/pod_test.go @@ -550,7 +550,7 @@ func TestPod_GenerateFromName(t *testing.T) { }) assert.Nil(t, err) pods := cache.NewStore(cache.MetaNamespaceKeyFunc) - pods.Add(test.input) + _ = pods.Add(test.input) metagen := NewPodMetadataGenerator(config, pods, client, nil, nil, nil) accessor, err := meta.Accessor(test.input) From d253c1579721db475492b51ae39f6e79546b98dc Mon Sep 17 00:00:00 2001 From: MichaelKatsoulis Date: Thu, 22 Sep 2022 14:56:01 +0300 Subject: [PATCH 4/8] Fix lint errors --- .../autodiscover/providers/kubernetes/pod_test.go | 2 +- libbeat/common/kubernetes/metadata/metadata.go | 2 +- libbeat/common/kubernetes/metadata/pod.go | 2 +- libbeat/common/kubernetes/metadata/pod_test.go | 12 ++++++------ libbeat/common/kubernetes/metadata/resource.go | 2 +- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/libbeat/autodiscover/providers/kubernetes/pod_test.go b/libbeat/autodiscover/providers/kubernetes/pod_test.go index 5388eeb4e6d..96dcad66b04 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod_test.go +++ b/libbeat/autodiscover/providers/kubernetes/pod_test.go @@ -2040,7 +2040,7 @@ func getNestedAnnotations(in common.MapStr) common.MapStr { out := common.MapStr{} for k, v := range in { - out.Put(k, v) + _, _ = out.Put(k, v) } return out } diff --git a/libbeat/common/kubernetes/metadata/metadata.go b/libbeat/common/kubernetes/metadata/metadata.go index 3dee3d26d59..927fe71c140 100644 --- a/libbeat/common/kubernetes/metadata/metadata.go +++ b/libbeat/common/kubernetes/metadata/metadata.go @@ -168,7 +168,7 @@ func getClusterInfoFromKubeConfigFile(kubeconfig string) (ClusterInfo, error) { kube_cfg, err := clientcmd.LoadFromFile(kubeconfig) if err != nil { - return ClusterInfo{}, fmt.Errorf("unable to load kube_config due to error: %+v", err) + return ClusterInfo{}, fmt.Errorf("unable to load kube_config due to error: %w", err) } for key, element := range kube_cfg.Clusters { diff --git a/libbeat/common/kubernetes/metadata/pod.go b/libbeat/common/kubernetes/metadata/pod.go index 0c321522817..a58ee1c9895 100644 --- a/libbeat/common/kubernetes/metadata/pod.go +++ b/libbeat/common/kubernetes/metadata/pod.go @@ -120,7 +120,7 @@ func (p *pod) GenerateK8s(obj kubernetes.Resource, opts ...FieldOptions) common. } if po.Status.PodIP != "" { - out.Put("pod.ip", po.Status.PodIP) + _, _ = out.Put("pod.ip", po.Status.PodIP) } return out diff --git a/libbeat/common/kubernetes/metadata/pod_test.go b/libbeat/common/kubernetes/metadata/pod_test.go index 1f9fc8def04..c117f1a0d7a 100644 --- a/libbeat/common/kubernetes/metadata/pod_test.go +++ b/libbeat/common/kubernetes/metadata/pod_test.go @@ -663,14 +663,14 @@ func TestPod_GenerateWithNodeNamespace(t *testing.T) { }) assert.Nil(t, err) pods := cache.NewStore(cache.MetaNamespaceKeyFunc) - pods.Add(test.input) + _ = pods.Add(test.input) nodes := cache.NewStore(cache.MetaNamespaceKeyFunc) - nodes.Add(test.node) + _ = nodes.Add(test.node) nodeMeta := NewNodeMetadataGenerator(config, nodes, client) namespaces := cache.NewStore(cache.MetaNamespaceKeyFunc) - namespaces.Add(test.namespace) + _ = namespaces.Add(test.namespace) nsMeta := NewNamespaceMetadataGenerator(config, namespaces, client) metagen := NewPodMetadataGenerator(config, pods, client, nodeMeta, nsMeta, nil) @@ -811,14 +811,14 @@ func TestPod_GenerateWithNodeNamespaceWithAddResourceConfig(t *testing.T) { } pods := cache.NewStore(cache.MetaNamespaceKeyFunc) - pods.Add(test.input) + _ = pods.Add(test.input) nodes := cache.NewStore(cache.MetaNamespaceKeyFunc) - nodes.Add(test.node) + _ = nodes.Add(test.node) nodeMeta := NewNodeMetadataGenerator(nodeConfig, nodes, client) namespaces := cache.NewStore(cache.MetaNamespaceKeyFunc) - namespaces.Add(test.namespace) + _ = namespaces.Add(test.namespace) nsMeta := NewNamespaceMetadataGenerator(namespaceConfig, namespaces, client) metagen := NewPodMetadataGenerator(config, pods, client, nodeMeta, nsMeta, &metaConfig) diff --git a/libbeat/common/kubernetes/metadata/resource.go b/libbeat/common/kubernetes/metadata/resource.go index 4fa71d26d11..e429a89f85c 100644 --- a/libbeat/common/kubernetes/metadata/resource.go +++ b/libbeat/common/kubernetes/metadata/resource.go @@ -37,7 +37,7 @@ type Resource struct { // NewResourceMetadataGenerator creates a metadata generator for a generic resource func NewResourceMetadataGenerator(cfg *common.Config, client k8s.Interface) *Resource { var config Config - config.Unmarshal(cfg) + _ = config.Unmarshal(cfg) r := &Resource{ config: &config, From 31b80c2c6101959abd705d03fb1423c4ecbda896 Mon Sep 17 00:00:00 2001 From: MichaelKatsoulis Date: Thu, 22 Sep 2022 15:04:37 +0300 Subject: [PATCH 5/8] Fix lint errors --- libbeat/common/kubernetes/metadata/resource.go | 10 +++++----- .../add_kubernetes_metadata/indexers_test.go | 6 +++--- .../processors/add_kubernetes_metadata/kubernetes.go | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/libbeat/common/kubernetes/metadata/resource.go b/libbeat/common/kubernetes/metadata/resource.go index e429a89f85c..a166c87d3bb 100644 --- a/libbeat/common/kubernetes/metadata/resource.go +++ b/libbeat/common/kubernetes/metadata/resource.go @@ -70,10 +70,10 @@ func (r *Resource) Generate(kind string, obj kubernetes.Resource, opts ...FieldO func (r *Resource) GenerateECS(obj kubernetes.Resource) common.MapStr { ecsMeta := common.MapStr{} if r.clusterInfo.Url != "" { - ecsMeta.Put("orchestrator.cluster.url", r.clusterInfo.Url) + _, _ = ecsMeta.Put("orchestrator.cluster.url", r.clusterInfo.Url) } if r.clusterInfo.Name != "" { - ecsMeta.Put("orchestrator.cluster.name", r.clusterInfo.Name) + _, _ = ecsMeta.Put("orchestrator.cluster.name", r.clusterInfo.Name) } return ecsMeta } @@ -94,7 +94,7 @@ func (r *Resource) GenerateK8s(kind string, obj kubernetes.Resource, options ... // Exclude any labels that are present in the exclude_labels config for _, label := range r.config.ExcludeLabels { - labelMap.Delete(label) + _ = labelMap.Delete(label) } annotationsMap := generateMapSubset(accessor.GetAnnotations(), r.config.IncludeAnnotations, r.config.AnnotationsDedot) @@ -154,7 +154,7 @@ func generateMapSubset(input map[string]string, keys []string, dedot bool) commo if ok { if dedot { dedotKey := common.DeDot(key) - output.Put(dedotKey, value) + _, _ = output.Put(dedotKey, value) } else { safemapstr.Put(output, key, value) } @@ -173,7 +173,7 @@ func GenerateMap(input map[string]string, dedot bool) common.MapStr { for k, v := range input { if dedot { label := common.DeDot(k) - output.Put(label, v) + _, _ = output.Put(label, v) } else { safemapstr.Put(output, k, v) } diff --git a/libbeat/processors/add_kubernetes_metadata/indexers_test.go b/libbeat/processors/add_kubernetes_metadata/indexers_test.go index de3cc918400..46221351bf7 100644 --- a/libbeat/processors/add_kubernetes_metadata/indexers_test.go +++ b/libbeat/processors/add_kubernetes_metadata/indexers_test.go @@ -230,7 +230,7 @@ func TestContainerIndexer(t *testing.T) { assert.Equal(t, indices[1], "fghij") assert.Equal(t, indices[2], "klmno") - expected.Put("kubernetes.container", + _, _ = expected.Put("kubernetes.container", common.MapStr{ "name": container, "image": containerImage, @@ -239,7 +239,7 @@ func TestContainerIndexer(t *testing.T) { }) assert.Equal(t, expected.String(), indexers[0].Data.String()) - expected.Put("kubernetes.container", + _, _ = expected.Put("kubernetes.container", common.MapStr{ "name": initContainer, "image": initContainerImage, @@ -248,7 +248,7 @@ func TestContainerIndexer(t *testing.T) { }) assert.Equal(t, expected.String(), indexers[1].Data.String()) - expected.Put("kubernetes.container", + _, _ = expected.Put("kubernetes.container", common.MapStr{ "name": ephemeralContainer, "image": ephemeralContainerImage, diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index fb74a9908b2..f532c3900d4 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -123,7 +123,7 @@ func newProcessorConfig(cfg *common.Config, register *Register) (kubeAnnotatorCo err := cfg.Unpack(&config) if err != nil { - return config, fmt.Errorf("fail to unpack the kubernetes configuration: %s", err) + return config, fmt.Errorf("fail to unpack the kubernetes configuration: %w", err) } //Load and append default indexer configs @@ -277,7 +277,7 @@ func (k *kubernetesAnnotator) Run(event *beat.Event) (*beat.Event, error) { } metaClone := metadata.Clone() - metaClone.Delete("kubernetes.container.name") + _ = metaClone.Delete("kubernetes.container.name") containerImage, err := metadata.GetValue("kubernetes.container.image") if err == nil { metaClone.Delete("kubernetes.container.image") From 63fc3b35ccfe4b05a03bcbcd14c320b2e0d352ff Mon Sep 17 00:00:00 2001 From: MichaelKatsoulis Date: Thu, 22 Sep 2022 15:35:32 +0300 Subject: [PATCH 6/8] Fix lint errors --- libbeat/common/kubernetes/metadata/metadata.go | 6 +++--- .../common/kubernetes/metadata/resource_test.go | 2 +- .../processors/add_kubernetes_metadata/config.go | 2 +- .../add_kubernetes_metadata/indexers_test.go | 2 +- .../add_kubernetes_metadata/kubernetes.go | 10 +++++----- metricbeat/module/kubernetes/util/kubernetes.go | 2 +- .../module/kubernetes/util/kubernetes_test.go | 4 ++-- .../pkg/composable/providers/kubernetes/pod.go | 14 +++++++------- 8 files changed, 21 insertions(+), 21 deletions(-) diff --git a/libbeat/common/kubernetes/metadata/metadata.go b/libbeat/common/kubernetes/metadata/metadata.go index 927fe71c140..881e4053eb5 100644 --- a/libbeat/common/kubernetes/metadata/metadata.go +++ b/libbeat/common/kubernetes/metadata/metadata.go @@ -68,7 +68,7 @@ type ClusterConfiguration struct { // WithFields FieldOption allows adding specific fields into the generated metadata func WithFields(key string, value interface{}) FieldOptions { return func(meta common.MapStr) { - safemapstr.Put(meta, key, value) + _ = safemapstr.Put(meta, key, value) } } @@ -77,10 +77,10 @@ func WithFields(key string, value interface{}) FieldOptions { func WithMetadata(kind string) FieldOptions { return func(meta common.MapStr) { if meta["labels"] != nil { - safemapstr.Put(meta, strings.ToLower(kind)+".labels", meta["labels"]) + _ = safemapstr.Put(meta, strings.ToLower(kind)+".labels", meta["labels"]) } if meta["annotations"] != nil { - safemapstr.Put(meta, strings.ToLower(kind)+".annotations", meta["annotations"]) + _ = safemapstr.Put(meta, strings.ToLower(kind)+".annotations", meta["annotations"]) } } } diff --git a/libbeat/common/kubernetes/metadata/resource_test.go b/libbeat/common/kubernetes/metadata/resource_test.go index 5ed6f20fce0..b1997190c1a 100644 --- a/libbeat/common/kubernetes/metadata/resource_test.go +++ b/libbeat/common/kubernetes/metadata/resource_test.go @@ -116,7 +116,7 @@ func TestResource_Generate(t *testing.T) { } var cfg Config - ucfg.New().Unpack(&cfg) + _ = ucfg.New().Unpack(&cfg) metagen := &Resource{ config: &cfg, } diff --git a/libbeat/processors/add_kubernetes_metadata/config.go b/libbeat/processors/add_kubernetes_metadata/config.go index 1d0ca3bf9b3..6327ea75379 100644 --- a/libbeat/processors/add_kubernetes_metadata/config.go +++ b/libbeat/processors/add_kubernetes_metadata/config.go @@ -80,7 +80,7 @@ func (k *kubeAnnotatorConfig) Validate() error { err := matcherCfg.Unpack(&logsPathMatcher) if err != nil { - return fmt.Errorf("fail to unpack the `logs_path` matcher configuration: %s", err) + return fmt.Errorf("fail to unpack the `logs_path` matcher configuration: %w", err) } if logsPathMatcher.LogsPath == "" { return fmt.Errorf("invalid logs_path matcher configuration: when resource_type is defined, logs_path must be set as well") diff --git a/libbeat/processors/add_kubernetes_metadata/indexers_test.go b/libbeat/processors/add_kubernetes_metadata/indexers_test.go index 46221351bf7..4001e2f86eb 100644 --- a/libbeat/processors/add_kubernetes_metadata/indexers_test.go +++ b/libbeat/processors/add_kubernetes_metadata/indexers_test.go @@ -475,7 +475,7 @@ func TestIpPortIndexer(t *testing.T) { assert.Equal(t, fmt.Sprintf("%s:%d", ip, port), indices[1]) assert.Equal(t, expected.String(), indexers[0].Data.String()) - expected.Put("kubernetes.container", + _, _ = expected.Put("kubernetes.container", common.MapStr{ "name": container, "image": containerImage, diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index f532c3900d4..c7ce0ae73e6 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -280,8 +280,8 @@ func (k *kubernetesAnnotator) Run(event *beat.Event) (*beat.Event, error) { _ = metaClone.Delete("kubernetes.container.name") containerImage, err := metadata.GetValue("kubernetes.container.image") if err == nil { - metaClone.Delete("kubernetes.container.image") - metaClone.Put("kubernetes.container.image.name", containerImage) + _ = metaClone.Delete("kubernetes.container.image") + _, _ = metaClone.Put("kubernetes.container.image.name", containerImage) } cmeta, err := metaClone.Clone().GetValue("kubernetes.container") if err == nil { @@ -292,9 +292,9 @@ func (k *kubernetesAnnotator) Run(event *beat.Event) (*beat.Event, error) { kubeMeta := metadata.Clone() // remove container meta from kubernetes.container.* - kubeMeta.Delete("kubernetes.container.id") - kubeMeta.Delete("kubernetes.container.runtime") - kubeMeta.Delete("kubernetes.container.image") + _ = kubeMeta.Delete("kubernetes.container.id") + _ = kubeMeta.Delete("kubernetes.container.runtime") + _ = kubeMeta.Delete("kubernetes.container.image") event.Fields.DeepUpdate(kubeMeta) return event, nil diff --git a/metricbeat/module/kubernetes/util/kubernetes.go b/metricbeat/module/kubernetes/util/kubernetes.go index cf234eb4e8f..92fd9b7ecf5 100644 --- a/metricbeat/module/kubernetes/util/kubernetes.go +++ b/metricbeat/module/kubernetes/util/kubernetes.go @@ -438,7 +438,7 @@ func (m *enricher) Enrich(events []common.MapStr) { delete(k8sMeta, "pod") } ecsMeta := meta.Clone() - ecsMeta.Delete("kubernetes") + _ = ecsMeta.Delete("kubernetes") event.DeepUpdate(common.MapStr{ mb.ModuleDataKey: k8sMeta, "meta": ecsMeta, diff --git a/metricbeat/module/kubernetes/util/kubernetes_test.go b/metricbeat/module/kubernetes/util/kubernetes_test.go index 83bbd3dd1f7..72a3feae273 100644 --- a/metricbeat/module/kubernetes/util/kubernetes_test.go +++ b/metricbeat/module/kubernetes/util/kubernetes_test.go @@ -128,9 +128,9 @@ func (f *mockFuncs) update(m map[string]common.MapStr, obj kubernetes.Resource) }, } for k, v := range accessor.GetLabels() { - meta.Put(fmt.Sprintf("kubernetes.%v", k), v) + _, _ = meta.Put(fmt.Sprintf("kubernetes.%v", k), v) } - meta.Put("orchestrator.cluster.name", "gke-4242") + _, _ = meta.Put("orchestrator.cluster.name", "gke-4242") m[accessor.GetName()] = meta } diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go index fde83af20f8..78a549256c4 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go @@ -169,7 +169,7 @@ func (p *pod) emitRunning(pod *kubernetes.Pod) { // Emit the pod // We emit Pod + containers to ensure that configs matching Pod only // get Pod metadata (not specific to any container) - p.comm.AddOrUpdate(data.uid, PodPriority, data.mapping, data.processors) + _ = p.comm.AddOrUpdate(data.uid, PodPriority, data.mapping, data.processors) // Emit all containers in the pod // TODO: deal with init containers stopping after initialization @@ -257,7 +257,7 @@ func generatePodData( // Pass annotations to all events so that it can be used in templating and by annotation builders. annotations := common.MapStr{} for k, v := range pod.GetObjectMeta().GetAnnotations() { - safemapstr.Put(annotations, k, v) + _ = safemapstr.Put(annotations, k, v) } k8sMapping["annotations"] = annotations @@ -293,7 +293,7 @@ func generateContainerData( // Pass annotations to all events so that it can be used in templating and by annotation builders. annotations := common.MapStr{} for k, v := range pod.GetObjectMeta().GetAnnotations() { - safemapstr.Put(annotations, k, v) + _ = safemapstr.Put(annotations, k, v) } for _, c := range containers { @@ -364,14 +364,14 @@ func generateContainerData( } if len(c.spec.Ports) > 0 { for _, port := range c.spec.Ports { - containerMeta.Put("port", fmt.Sprintf("%v", port.ContainerPort)) - containerMeta.Put("port_name", port.Name) + _, _ = containerMeta.Put("port", fmt.Sprintf("%v", port.ContainerPort)) + _, _ = containerMeta.Put("port_name", port.Name) k8sMapping["container"] = containerMeta - comm.AddOrUpdate(eventID, ContainerPriority, k8sMapping, processors) + _ = comm.AddOrUpdate(eventID, ContainerPriority, k8sMapping, processors) } } else { k8sMapping["container"] = containerMeta - comm.AddOrUpdate(eventID, ContainerPriority, k8sMapping, processors) + _ = comm.AddOrUpdate(eventID, ContainerPriority, k8sMapping, processors) } } } From 58d0e3c4bb917758b6058b06c174f53486459322 Mon Sep 17 00:00:00 2001 From: MichaelKatsoulis Date: Thu, 22 Sep 2022 15:47:01 +0300 Subject: [PATCH 7/8] Fix lint errors --- libbeat/common/kubernetes/metadata/resource.go | 12 ++++++------ .../pkg/composable/providers/kubernetes/pod.go | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/libbeat/common/kubernetes/metadata/resource.go b/libbeat/common/kubernetes/metadata/resource.go index a166c87d3bb..38d16563141 100644 --- a/libbeat/common/kubernetes/metadata/resource.go +++ b/libbeat/common/kubernetes/metadata/resource.go @@ -108,7 +108,7 @@ func (r *Resource) GenerateK8s(kind string, obj kubernetes.Resource, options ... if accessor.GetNamespace() != "" { // TODO make this namespace.name in 8.0 - safemapstr.Put(meta, "namespace", accessor.GetNamespace()) + _ = safemapstr.Put(meta, "namespace", accessor.GetNamespace()) } // Add controller metadata if present @@ -122,18 +122,18 @@ func (r *Resource) GenerateK8s(kind string, obj kubernetes.Resource, options ... "StatefulSet", "DaemonSet", "Job": - safemapstr.Put(meta, strings.ToLower(ref.Kind)+".name", ref.Name) + _ = safemapstr.Put(meta, strings.ToLower(ref.Kind)+".name", ref.Name) } } } } if len(labelMap) != 0 { - safemapstr.Put(meta, "labels", labelMap) + _ = safemapstr.Put(meta, "labels", labelMap) } if len(annotationsMap) != 0 { - safemapstr.Put(meta, "annotations", annotationsMap) + _ = safemapstr.Put(meta, "annotations", annotationsMap) } for _, option := range options { @@ -156,7 +156,7 @@ func generateMapSubset(input map[string]string, keys []string, dedot bool) commo dedotKey := common.DeDot(key) _, _ = output.Put(dedotKey, value) } else { - safemapstr.Put(output, key, value) + _ = safemapstr.Put(output, key, value) } } } @@ -175,7 +175,7 @@ func GenerateMap(input map[string]string, dedot bool) common.MapStr { label := common.DeDot(k) _, _ = output.Put(label, v) } else { - safemapstr.Put(output, k, v) + _ = safemapstr.Put(output, k, v) } } diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go index 78a549256c4..0e074fa088c 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go @@ -394,7 +394,7 @@ func podNamespaceAnnotations(pod *kubernetes.Pod, watcher kubernetes.Watcher) co annotations := common.MapStr{} for k, v := range namespace.GetAnnotations() { - safemapstr.Put(annotations, k, v) + _ = safemapstr.Put(annotations, k, v) } return annotations } From 2388dd6865fa6fd723b14786f68543df62dc3d2a Mon Sep 17 00:00:00 2001 From: MichaelKatsoulis Date: Thu, 22 Sep 2022 15:50:56 +0300 Subject: [PATCH 8/8] Fix lint errors --- libbeat/autodiscover/providers/kubernetes/pod.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index d75e09def96..360d61d1f7e 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -458,7 +458,7 @@ func (p *pod) podEvent(flag string, pod *kubernetes.Pod, ports common.MapStr, in func podAnnotations(pod *kubernetes.Pod) common.MapStr { annotations := common.MapStr{} for k, v := range pod.GetObjectMeta().GetAnnotations() { - safemapstr.Put(annotations, k, v) + _ = safemapstr.Put(annotations, k, v) } return annotations } @@ -481,7 +481,7 @@ func podNamespaceAnnotations(pod *kubernetes.Pod, watcher kubernetes.Watcher) co annotations := common.MapStr{} for k, v := range namespace.GetAnnotations() { - safemapstr.Put(annotations, k, v) + _ = safemapstr.Put(annotations, k, v) } return annotations }