diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9f1c0efd2d7..3c91025edc7 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -29,6 +29,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Libbeat: Do not overwrite agent.*, ecs.version, and host.name. {pull}14407[14407] - Libbeat: Cleanup the x-pack licenser code to use the new license endpoint and the new format. {pull}15091[15091] - Users can now specify `monitoring.cloud.*` to override `monitoring.elasticsearch.*` settings. {issue}14399[14399] {pull}15254[15254] +- Refactor metadata generator to support adding metadata across resources {pull}14875[14875] - Update to ECS 1.4.0. {pull}14844[14844] *Auditbeat* diff --git a/libbeat/autodiscover/providers/kubernetes/config.go b/libbeat/autodiscover/providers/kubernetes/config.go index 625a0ee1751..f443439c57f 100644 --- a/libbeat/autodiscover/providers/kubernetes/config.go +++ b/libbeat/autodiscover/providers/kubernetes/config.go @@ -23,6 +23,8 @@ import ( "fmt" "time" + "github.com/elastic/beats/libbeat/common/kubernetes/metadata" + "github.com/elastic/beats/libbeat/autodiscover/template" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" @@ -48,6 +50,8 @@ type Config struct { Builders []*common.Config `config:"builders"` Appenders []*common.Config `config:"appenders"` Templates template.MapperSettings `config:"templates"` + + AddResourceMetadata *metadata.AddResourceMetadataConfig `config:"add_resource_metadata"` } func defaultConfig() *Config { diff --git a/libbeat/autodiscover/providers/kubernetes/node.go b/libbeat/autodiscover/providers/kubernetes/node.go index 5c039bc8952..894133f3d12 100644 --- a/libbeat/autodiscover/providers/kubernetes/node.go +++ b/libbeat/autodiscover/providers/kubernetes/node.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/bus" "github.com/elastic/beats/libbeat/common/kubernetes" + "github.com/elastic/beats/libbeat/common/kubernetes/metadata" "github.com/elastic/beats/libbeat/common/safemapstr" "github.com/elastic/beats/libbeat/logp" ) @@ -36,7 +37,7 @@ import ( type node struct { uuid uuid.UUID config *Config - metagen kubernetes.MetaGenerator + metagen metadata.MetaGen logger *logp.Logger publish func(bus.Event) watcher kubernetes.Watcher @@ -44,15 +45,10 @@ type node struct { // NewNodeEventer creates an eventer that can discover and process node objects func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, publish func(event bus.Event)) (Eventer, error) { - metagen, err := kubernetes.NewMetaGenerator(cfg) - if err != nil { - return nil, err - } - logger := logp.NewLogger("autodiscover.node") config := defaultConfig() - err = cfg.Unpack(&config) + err := cfg.Unpack(&config) if err != nil { return nil, err } @@ -70,7 +66,7 @@ func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pu watcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, kubernetes.WatchOptions{ SyncTimeout: config.SyncPeriod, Node: config.Node, - }) + }, nil) if err != nil { return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err) @@ -80,7 +76,7 @@ func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pu config: config, uuid: uuid, publish: publish, - metagen: metagen, + metagen: metadata.NewNodeMetadataGenerator(cfg, watcher.Store()), logger: logger, watcher: watcher, } @@ -172,11 +168,7 @@ func (n *node) emit(node *kubernetes.Node, flag string) { } eventID := fmt.Sprint(node.GetObjectMeta().GetUID()) - meta := n.metagen.ResourceMetadata(node) - - // TODO: Refactor metagen to make sure that this is seamless - meta.Put("node.name", node.Name) - meta.Put("node.uid", string(node.GetObjectMeta().GetUID())) + meta := n.metagen.Generate(node) kubemeta := meta.Clone() // Pass annotations to all events so that it can be used in templating and by annotation builders. diff --git a/libbeat/autodiscover/providers/kubernetes/node_test.go b/libbeat/autodiscover/providers/kubernetes/node_test.go index 99741b42b45..aa4cd3476a9 100644 --- a/libbeat/autodiscover/providers/kubernetes/node_test.go +++ b/libbeat/autodiscover/providers/kubernetes/node_test.go @@ -21,6 +21,8 @@ import ( "testing" "time" + "github.com/elastic/beats/libbeat/common/kubernetes/metadata" + "github.com/gofrs/uuid" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" @@ -114,6 +116,11 @@ func TestEmitEvent_Node(t *testing.T) { nodeIP := "192.168.0.1" uid := "005f3b90-4b9d-12f8-acf0-31020a840133" UUID, err := uuid.NewV4() + + typeMeta := metav1.TypeMeta{ + Kind: "Node", + APIVersion: "v1", + } if err != nil { t.Fatal(err) } @@ -134,6 +141,7 @@ func TestEmitEvent_Node(t *testing.T) { Labels: map[string]string{}, Annotations: map[string]string{}, }, + TypeMeta: typeMeta, Status: v1.NodeStatus{ Addresses: []v1.NodeAddress{ { @@ -180,7 +188,8 @@ func TestEmitEvent_Node(t *testing.T) { Labels: map[string]string{}, Annotations: map[string]string{}, }, - Status: v1.NodeStatus{}, + TypeMeta: typeMeta, + Status: v1.NodeStatus{}, }, Expected: nil, }, @@ -194,6 +203,7 @@ func TestEmitEvent_Node(t *testing.T) { Labels: map[string]string{}, Annotations: map[string]string{}, }, + TypeMeta: typeMeta, Status: v1.NodeStatus{ Addresses: []v1.NodeAddress{}, Conditions: []v1.NodeCondition{ @@ -236,11 +246,7 @@ func TestEmitEvent_Node(t *testing.T) { t.Fatal(err) } - metaGen, err := kubernetes.NewMetaGenerator(common.NewConfig()) - if err != nil { - t.Fatal(err) - } - + metaGen := metadata.NewNodeMetadataGenerator(common.NewConfig(), nil) p := &Provider{ config: defaultConfig(), bus: bus.New("test"), diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index 4d6ef20fc3a..ee71f0a7673 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -25,34 +25,31 @@ import ( k8s "k8s.io/client-go/kubernetes" "github.com/elastic/beats/libbeat/autodiscover/builder" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/bus" "github.com/elastic/beats/libbeat/common/kubernetes" + "github.com/elastic/beats/libbeat/common/kubernetes/metadata" "github.com/elastic/beats/libbeat/common/safemapstr" "github.com/elastic/beats/libbeat/logp" ) type pod struct { - uuid uuid.UUID - config *Config - metagen kubernetes.MetaGenerator - logger *logp.Logger - publish func(bus.Event) - watcher kubernetes.Watcher + uuid uuid.UUID + config *Config + metagen metadata.MetaGen + logger *logp.Logger + publish func(bus.Event) + watcher kubernetes.Watcher + nodeWatcher kubernetes.Watcher + namespaceWatcher kubernetes.Watcher } // NewPodEventer creates an eventer that can discover and process pod objects func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, publish func(event bus.Event)) (Eventer, error) { - metagen, err := kubernetes.NewMetaGenerator(cfg) - if err != nil { - return nil, err - } - logger := logp.NewLogger("autodiscover.pod") config := defaultConfig() - err = cfg.Unpack(&config) + err := cfg.Unpack(&config) if err != nil { return nil, err } @@ -71,18 +68,52 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub SyncTimeout: config.SyncPeriod, Node: config.Node, Namespace: config.Namespace, - }) + }, nil) if err != nil { return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Pod{}, err) } + var nodeMeta, namespaceMeta metadata.MetaGen + var nodeWatcher, namespaceWatcher kubernetes.Watcher + metaConf := config.AddResourceMetadata + if metaConf != nil { + if metaConf.Node != nil && metaConf.Node.Enabled() { + options := kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + Node: config.Node, + } + if config.Namespace != "" { + options.Namespace = config.Namespace + } + nodeWatcher, err = kubernetes.NewWatcher(client, &kubernetes.Node{}, options, nil) + if err != nil { + return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err) + } + + nodeMeta = metadata.NewNodeMetadataGenerator(metaConf.Node, nodeWatcher.Store()) + } + + if metaConf.Namespace != nil && metaConf.Namespace.Enabled() { + namespaceWatcher, err = kubernetes.NewWatcher(client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + }, nil) + if err != nil { + return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + } + + namespaceMeta = metadata.NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store()) + } + } + p := &pod{ - config: config, - uuid: uuid, - publish: publish, - metagen: metagen, - logger: logger, - watcher: watcher, + config: config, + uuid: uuid, + publish: publish, + metagen: metadata.NewPodMetadataGenerator(cfg, watcher.Store(), nodeMeta, namespaceMeta), + logger: logger, + watcher: watcher, + nodeWatcher: nodeWatcher, + namespaceWatcher: namespaceWatcher, } watcher.AddEventHandler(p) @@ -168,12 +199,33 @@ func (p *pod) GenerateHints(event bus.Event) bus.Event { // Start starts the eventer func (p *pod) Start() error { + if p.nodeWatcher != nil { + err := p.nodeWatcher.Start() + if err != nil { + return err + } + } + + if p.namespaceWatcher != nil { + if err := p.namespaceWatcher.Start(); err != nil { + return err + } + } + return p.watcher.Start() } // Stop stops the eventer func (p *pod) Stop() { p.watcher.Stop() + + if p.namespaceWatcher != nil { + p.namespaceWatcher.Stop() + } + + if p.nodeWatcher != nil { + p.nodeWatcher.Stop() + } } func (p *pod) emit(pod *kubernetes.Pod, flag string) { @@ -231,7 +283,8 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet "image": c.Image, "runtime": runtimes[c.Name], } - meta := p.metagen.ContainerMetadata(pod, c.Name, c.Image) + meta := p.metagen.Generate(pod, metadata.WithFields("container.name", c.Name), + metadata.WithFields("container.image", c.Image)) // Information that can be used in discovering a workload kubemeta := meta.Clone() diff --git a/libbeat/autodiscover/providers/kubernetes/pod_test.go b/libbeat/autodiscover/providers/kubernetes/pod_test.go index b3f3b6e2f18..a5672c7e55d 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod_test.go +++ b/libbeat/autodiscover/providers/kubernetes/pod_test.go @@ -21,6 +21,8 @@ import ( "testing" "time" + "github.com/elastic/beats/libbeat/common/kubernetes/metadata" + "github.com/gofrs/uuid" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" @@ -175,6 +177,11 @@ func TestEmitEvent(t *testing.T) { t.Fatal(err) } + typeMeta := metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + } + tests := []struct { Message string Flag string @@ -192,6 +199,7 @@ func TestEmitEvent(t *testing.T) { Labels: map[string]string{}, Annotations: map[string]string{}, }, + TypeMeta: typeMeta, Status: v1.PodStatus{ PodIP: podIP, ContainerStatuses: []kubernetes.PodContainerStatus{ @@ -264,6 +272,7 @@ func TestEmitEvent(t *testing.T) { Labels: map[string]string{}, Annotations: map[string]string{}, }, + TypeMeta: typeMeta, Status: v1.PodStatus{ ContainerStatuses: []kubernetes.PodContainerStatus{ { @@ -295,6 +304,7 @@ func TestEmitEvent(t *testing.T) { Labels: map[string]string{}, Annotations: map[string]string{}, }, + TypeMeta: typeMeta, Status: v1.PodStatus{ PodIP: podIP, ContainerStatuses: []kubernetes.PodContainerStatus{ @@ -326,6 +336,7 @@ func TestEmitEvent(t *testing.T) { Labels: map[string]string{}, Annotations: map[string]string{}, }, + TypeMeta: typeMeta, Status: v1.PodStatus{ ContainerStatuses: []kubernetes.PodContainerStatus{ { @@ -393,6 +404,7 @@ func TestEmitEvent(t *testing.T) { Labels: map[string]string{}, Annotations: map[string]string{}, }, + TypeMeta: typeMeta, Status: v1.PodStatus{ PodIP: podIP, ContainerStatuses: []kubernetes.PodContainerStatus{ @@ -459,11 +471,7 @@ func TestEmitEvent(t *testing.T) { t.Fatal(err) } - metaGen, err := kubernetes.NewMetaGenerator(common.NewConfig()) - if err != nil { - t.Fatal(err) - } - + metaGen := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil) p := &Provider{ config: defaultConfig(), bus: bus.New("test"), diff --git a/libbeat/autodiscover/providers/kubernetes/service.go b/libbeat/autodiscover/providers/kubernetes/service.go index 3dbe379943f..f840f9c0969 100644 --- a/libbeat/autodiscover/providers/kubernetes/service.go +++ b/libbeat/autodiscover/providers/kubernetes/service.go @@ -21,6 +21,8 @@ import ( "fmt" "time" + "github.com/elastic/beats/libbeat/common/kubernetes/metadata" + "github.com/gofrs/uuid" k8s "k8s.io/client-go/kubernetes" @@ -33,44 +35,59 @@ import ( ) type service struct { - uuid uuid.UUID - config *Config - metagen kubernetes.MetaGenerator - logger *logp.Logger - publish func(bus.Event) - watcher kubernetes.Watcher + uuid uuid.UUID + config *Config + metagen metadata.MetaGen + logger *logp.Logger + publish func(bus.Event) + watcher kubernetes.Watcher + namespaceWatcher kubernetes.Watcher } // NewServiceEventer creates an eventer that can discover and process service objects func NewServiceEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, publish func(event bus.Event)) (Eventer, error) { - metagen, err := kubernetes.NewMetaGenerator(cfg) - if err != nil { - return nil, err - } - logger := logp.NewLogger("autodiscover.service") config := defaultConfig() - err = cfg.Unpack(&config) + err := cfg.Unpack(&config) if err != nil { return nil, err } watcher, err := kubernetes.NewWatcher(client, &kubernetes.Service{}, kubernetes.WatchOptions{ SyncTimeout: config.SyncPeriod, - }) + Namespace: config.Namespace, + }, nil) if err != nil { return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Service{}, err) } + var namespaceMeta metadata.MetaGen + var namespaceWatcher kubernetes.Watcher + metaConf := config.AddResourceMetadata + if metaConf != nil { + if metaConf.Namespace != nil && metaConf.Namespace.Enabled() { + namespaceWatcher, err = kubernetes.NewWatcher(client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + Namespace: config.Namespace, + }, nil) + if err != nil { + return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + } + + namespaceMeta = metadata.NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store()) + } + } + p := &service{ - config: config, - uuid: uuid, - publish: publish, - metagen: metagen, - logger: logger, - watcher: watcher, + config: config, + uuid: uuid, + publish: publish, + metagen: metadata.NewServiceMetadataGenerator(cfg, watcher.Store(), namespaceMeta), + namespaceWatcher: namespaceWatcher, + logger: logger, + watcher: watcher, } watcher.AddEventHandler(p) @@ -79,7 +96,7 @@ func NewServiceEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, // OnAdd ensures processing of service objects that are newly created func (s *service) OnAdd(obj interface{}) { - s.logger.Debugf("Watcher Node add: %+v", obj) + s.logger.Debugf("Watcher service add: %+v", obj) s.emit(obj.(*kubernetes.Service), "start") } @@ -90,7 +107,7 @@ func (s *service) OnUpdate(obj interface{}) { if svc.GetObjectMeta().GetDeletionTimestamp() != nil { time.AfterFunc(s.config.CleanupTimeout, func() { s.emit(svc, "stop") }) } else { - s.logger.Debugf("Watcher Node update: %+v", obj) + s.logger.Debugf("Watcher service update: %+v", obj) s.emit(svc, "stop") s.emit(svc, "start") } @@ -98,7 +115,7 @@ func (s *service) OnUpdate(obj interface{}) { // OnDelete ensures processing of service objects that are deleted func (s *service) OnDelete(obj interface{}) { - s.logger.Debugf("Watcher Node delete: %+v", obj) + s.logger.Debugf("Watcher service delete: %+v", obj) time.AfterFunc(s.config.CleanupTimeout, func() { s.emit(obj.(*kubernetes.Service), "stop") }) } @@ -138,12 +155,21 @@ func (s *service) GenerateHints(event bus.Event) bus.Event { // Start starts the eventer func (s *service) Start() error { + if s.namespaceWatcher != nil { + if err := s.namespaceWatcher.Start(); err != nil { + return err + } + } return s.watcher.Start() } // Stop stops the eventer func (s *service) Stop() { s.watcher.Stop() + + if s.namespaceWatcher != nil { + s.namespaceWatcher.Stop() + } } func (s *service) emit(svc *kubernetes.Service, flag string) { @@ -155,11 +181,7 @@ func (s *service) emit(svc *kubernetes.Service, flag string) { } eventID := fmt.Sprint(svc.GetObjectMeta().GetUID()) - meta := s.metagen.ResourceMetadata(svc) - - // TODO: Refactor metagen to make sure that this is seamless - meta.Put("service.name", svc.Name) - meta.Put("service.uid", string(svc.GetObjectMeta().GetUID())) + meta := s.metagen.Generate(svc) kubemeta := meta.Clone() // Pass annotations to all events so that it can be used in templating and by annotation builders. diff --git a/libbeat/autodiscover/providers/kubernetes/service_test.go b/libbeat/autodiscover/providers/kubernetes/service_test.go index e598a4d6507..04ab7ec725b 100644 --- a/libbeat/autodiscover/providers/kubernetes/service_test.go +++ b/libbeat/autodiscover/providers/kubernetes/service_test.go @@ -21,6 +21,8 @@ import ( "testing" "time" + "github.com/elastic/beats/libbeat/common/kubernetes/metadata" + "github.com/gofrs/uuid" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" @@ -119,6 +121,11 @@ func TestEmitEvent_Service(t *testing.T) { t.Fatal(err) } + typeMeta := metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + } + tests := []struct { Message string Flag string @@ -136,6 +143,7 @@ func TestEmitEvent_Service(t *testing.T) { Labels: map[string]string{}, Annotations: map[string]string{}, }, + TypeMeta: typeMeta, Spec: v1.ServiceSpec{ Ports: []v1.ServicePort{ { @@ -183,6 +191,7 @@ func TestEmitEvent_Service(t *testing.T) { Labels: map[string]string{}, Annotations: map[string]string{}, }, + TypeMeta: typeMeta, Spec: v1.ServiceSpec{ Ports: []v1.ServicePort{ { @@ -205,6 +214,7 @@ func TestEmitEvent_Service(t *testing.T) { Labels: map[string]string{}, Annotations: map[string]string{}, }, + TypeMeta: typeMeta, Spec: v1.ServiceSpec{ ClusterIP: clusterIP, }, @@ -222,6 +232,7 @@ func TestEmitEvent_Service(t *testing.T) { Labels: map[string]string{}, Annotations: map[string]string{}, }, + TypeMeta: typeMeta, Spec: v1.ServiceSpec{ Ports: []v1.ServicePort{ { @@ -266,10 +277,7 @@ func TestEmitEvent_Service(t *testing.T) { t.Fatal(err) } - metaGen, err := kubernetes.NewMetaGenerator(common.NewConfig()) - if err != nil { - t.Fatal(err) - } + metaGen := metadata.NewServiceMetadataGenerator(common.NewConfig(), nil, nil) p := &Provider{ config: defaultConfig(), diff --git a/libbeat/common/kubernetes/eventhandler.go b/libbeat/common/kubernetes/eventhandler.go index 228b3a76021..eebe33d4d76 100644 --- a/libbeat/common/kubernetes/eventhandler.go +++ b/libbeat/common/kubernetes/eventhandler.go @@ -69,6 +69,25 @@ func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) { } } +// NoOpEventHandlerFuncs ensures that watcher reconciliation can happen even without the required funcs +type NoOpEventHandlerFuncs struct { +} + +// OnAdd does a no-op on an add event +func (n NoOpEventHandlerFuncs) OnAdd(obj interface{}) { + +} + +// OnUpdate does a no-op on an update event +func (n NoOpEventHandlerFuncs) OnUpdate(obj interface{}) { + +} + +// OnDelete does a no-op on a delete event +func (n NoOpEventHandlerFuncs) OnDelete(obj interface{}) { + +} + // FilteringResourceEventHandler applies the provided filter to all events coming // in, ensuring the appropriate nested handler method is invoked. An object // that starts passing the filter after an update is considered an add, and an diff --git a/libbeat/common/kubernetes/informer.go b/libbeat/common/kubernetes/informer.go new file mode 100644 index 00000000000..0e7f4a9f225 --- /dev/null +++ b/libbeat/common/kubernetes/informer.go @@ -0,0 +1,159 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package kubernetes + +import ( + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +func nodeSelector(options *metav1.ListOptions, opt WatchOptions) { + if opt.Node != "" { + options.FieldSelector = "spec.nodeName=" + opt.Node + } +} + +func nameSelector(options *metav1.ListOptions, name string) { + if name != "" { + options.FieldSelector = "metadata.name=" + name + } +} + +// NewInformer creates an informer for a given resource +func NewInformer(client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (cache.SharedInformer, string, error) { + var objType string + + var listwatch *cache.ListWatch + switch resource.(type) { + case *Pod: + p := client.CoreV1().Pods(opts.Namespace) + listwatch = &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + nodeSelector(&options, opts) + return p.List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + nodeSelector(&options, opts) + return p.Watch(options) + }, + } + + objType = "pod" + case *Event: + e := client.CoreV1().Events(opts.Namespace) + listwatch = &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return e.List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return e.Watch(options) + }, + } + + objType = "event" + case *Node: + n := client.CoreV1().Nodes() + listwatch = &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + nameSelector(&options, opts.Node) + return n.List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + nameSelector(&options, opts.Node) + return n.Watch(options) + }, + } + + objType = "node" + case *Namespace: + ns := client.CoreV1().Namespaces() + listwatch = &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + nameSelector(&options, opts.Namespace) + return ns.List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + nameSelector(&options, opts.Namespace) + return ns.Watch(options) + }, + } + + objType = "namespace" + case *Deployment: + d := client.AppsV1().Deployments(opts.Namespace) + listwatch = &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return d.List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return d.Watch(options) + }, + } + + objType = "deployment" + case *ReplicaSet: + rs := client.AppsV1().ReplicaSets(opts.Namespace) + listwatch = &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return rs.List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return rs.Watch(options) + }, + } + + objType = "replicaset" + case *StatefulSet: + ss := client.AppsV1().StatefulSets(opts.Namespace) + listwatch = &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return ss.List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return ss.Watch(options) + }, + } + + objType = "statefulset" + case *Service: + svc := client.CoreV1().Services(opts.Namespace) + listwatch = &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return svc.List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return svc.Watch(options) + }, + } + + objType = "service" + default: + return nil, "", fmt.Errorf("unsupported resource type for watching %T", resource) + } + + if indexers != nil { + return cache.NewSharedIndexInformer(listwatch, resource, opts.SyncTimeout, indexers), objType, nil + } + + return cache.NewSharedInformer(listwatch, resource, opts.SyncTimeout), objType, nil +} diff --git a/libbeat/common/kubernetes/metadata.go b/libbeat/common/kubernetes/metadata.go deleted file mode 100644 index 34be4480cb9..00000000000 --- a/libbeat/common/kubernetes/metadata.go +++ /dev/null @@ -1,179 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package kubernetes - -import ( - "strings" - - "k8s.io/apimachinery/pkg/api/meta" - - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/common/safemapstr" -) - -// MetaGenerator builds metadata objects for pods and containers -type MetaGenerator interface { - // ResourceMetadata generates metadata for the given kubernetes object taking to account certain filters - ResourceMetadata(obj Resource) common.MapStr - - // PodMetadata generates metadata for the given pod taking to account certain filters - PodMetadata(pod *Pod) common.MapStr - - // Containermetadata generates metadata for the given container of a pod - ContainerMetadata(pod *Pod, container string, image string) common.MapStr -} - -// MetaGeneratorConfig settings -type MetaGeneratorConfig struct { - IncludeLabels []string `config:"include_labels"` - ExcludeLabels []string `config:"exclude_labels"` - IncludeAnnotations []string `config:"include_annotations"` - - LabelsDedot bool `config:"labels.dedot"` - AnnotationsDedot bool `config:"annotations.dedot"` - - // Undocumented settings, to be deprecated in favor of `drop_fields` processor: - IncludeCreatorMetadata bool `config:"include_creator_metadata"` -} - -type metaGenerator = MetaGeneratorConfig - -// DefaultMetaGeneratorConfig initializes and returns a new MetaGeneratorConfig with default values -func DefaultMetaGeneratorConfig() MetaGeneratorConfig { - return MetaGeneratorConfig{ - IncludeCreatorMetadata: true, - LabelsDedot: true, - AnnotationsDedot: true, - } -} - -// NewMetaGenerator initializes and returns a new kubernetes metadata generator -func NewMetaGenerator(cfg *common.Config) (MetaGenerator, error) { - generator := DefaultMetaGeneratorConfig() - - err := cfg.Unpack(&generator) - return &generator, err -} - -// NewMetaGeneratorFromConfig initializes and returns a new kubernetes metadata generator -func NewMetaGeneratorFromConfig(cfg *MetaGeneratorConfig) MetaGenerator { - return cfg -} - -// ResourceMetadata generates metadata for the given kubernetes object taking to account certain filters -func (g *metaGenerator) ResourceMetadata(obj Resource) common.MapStr { - accessor, err := meta.Accessor(obj) - if err != nil { - return nil - } - - labelMap := common.MapStr{} - if len(g.IncludeLabels) == 0 { - for k, v := range accessor.GetLabels() { - if g.LabelsDedot { - label := common.DeDot(k) - labelMap.Put(label, v) - } else { - safemapstr.Put(labelMap, k, v) - } - } - } else { - labelMap = generateMapSubset(accessor.GetLabels(), g.IncludeLabels, g.LabelsDedot) - } - - // Exclude any labels that are present in the exclude_labels config - for _, label := range g.ExcludeLabels { - labelMap.Delete(label) - } - - annotationsMap := generateMapSubset(accessor.GetAnnotations(), g.IncludeAnnotations, g.AnnotationsDedot) - meta := common.MapStr{} - if accessor.GetNamespace() != "" { - meta["namespace"] = accessor.GetNamespace() - } - - // Add controller metadata if present - if g.IncludeCreatorMetadata { - for _, ref := range accessor.GetOwnerReferences() { - if ref.Controller != nil && *ref.Controller { - switch ref.Kind { - // TODO grow this list as we keep adding more `state_*` metricsets - case "Deployment", - "ReplicaSet", - "StatefulSet": - safemapstr.Put(meta, strings.ToLower(ref.Kind)+".name", ref.Name) - } - } - } - } - - if len(labelMap) != 0 { - meta["labels"] = labelMap - } - - if len(annotationsMap) != 0 { - meta["annotations"] = annotationsMap - } - - return meta -} - -// PodMetadata generates metadata for the given pod taking to account certain filters -func (g *metaGenerator) PodMetadata(pod *Pod) common.MapStr { - podMeta := g.ResourceMetadata(pod) - - safemapstr.Put(podMeta, "pod.uid", string(pod.GetObjectMeta().GetUID())) - safemapstr.Put(podMeta, "pod.name", pod.GetObjectMeta().GetName()) - safemapstr.Put(podMeta, "node.name", pod.Spec.NodeName) - - return podMeta -} - -// Containermetadata generates metadata for the given container of a pod -func (g *metaGenerator) ContainerMetadata(pod *Pod, container string, image string) common.MapStr { - podMeta := g.PodMetadata(pod) - - // Add container details - podMeta["container"] = common.MapStr{ - "name": container, - "image": image, - } - - return podMeta -} - -func generateMapSubset(input map[string]string, keys []string, dedot bool) common.MapStr { - output := common.MapStr{} - if input == nil { - return output - } - - for _, key := range keys { - value, ok := input[key] - if ok { - if dedot { - dedotKey := common.DeDot(key) - output.Put(dedotKey, value) - } else { - safemapstr.Put(output, key, value) - } - } - } - - return output -} diff --git a/libbeat/common/kubernetes/metadata/config.go b/libbeat/common/kubernetes/metadata/config.go new file mode 100644 index 00000000000..78abe6d8e53 --- /dev/null +++ b/libbeat/common/kubernetes/metadata/config.go @@ -0,0 +1,52 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package metadata + +import "github.com/elastic/beats/libbeat/common" + +// Config declares supported configuration for metadata generation +type Config struct { + IncludeLabels []string `config:"include_labels"` + ExcludeLabels []string `config:"exclude_labels"` + IncludeAnnotations []string `config:"include_annotations"` + + LabelsDedot bool `config:"labels.dedot"` + AnnotationsDedot bool `config:"annotations.dedot"` + + // Undocumented settings, to be deprecated in favor of `drop_fields` processor: + IncludeCreatorMetadata bool `config:"include_creator_metadata"` +} + +// AddResourceMetadataConfig allows adding config for enriching additional resources +type AddResourceMetadataConfig struct { + Node *common.Config `config:"node"` + Namespace *common.Config `config:"namespace"` +} + +func defaultConfig() Config { + return Config{ + IncludeCreatorMetadata: true, + LabelsDedot: true, + AnnotationsDedot: true, + } +} + +// Unmarshal unpacks a Config into the metagen Config +func (c *Config) Unmarshal(cfg *common.Config) error { + return cfg.Unpack(c) +} diff --git a/libbeat/common/kubernetes/metadata/metadata.go b/libbeat/common/kubernetes/metadata/metadata.go new file mode 100644 index 00000000000..b2205c91dd9 --- /dev/null +++ b/libbeat/common/kubernetes/metadata/metadata.go @@ -0,0 +1,42 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package metadata + +import ( + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/kubernetes" + "github.com/elastic/beats/libbeat/common/safemapstr" +) + +// MetaGen allows creation of metadata from either Kubernetes resources or their Resource names. +type MetaGen interface { + // Generate generates metadata for a given resource + Generate(kubernetes.Resource, ...FieldOptions) common.MapStr + // GenerateFromName generates metadata for a given resource based on it's name + GenerateFromName(string, ...FieldOptions) common.MapStr +} + +// FieldOptions allows additional enrichment to be done on top of existing metadata +type FieldOptions func(common.MapStr) + +// WithFields FieldOption allows adding specific fields into the generated metadata +func WithFields(key string, value interface{}) FieldOptions { + return func(meta common.MapStr) { + safemapstr.Put(meta, key, value) + } +} diff --git a/libbeat/common/kubernetes/metadata/namespace.go b/libbeat/common/kubernetes/metadata/namespace.go new file mode 100644 index 00000000000..7eb2d12eb5a --- /dev/null +++ b/libbeat/common/kubernetes/metadata/namespace.go @@ -0,0 +1,96 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package metadata + +import ( + "k8s.io/client-go/tools/cache" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/kubernetes" +) + +const resource = "namespace" + +type namespace struct { + store cache.Store + resource *Resource +} + +// NewNamespaceMetadataGenerator creates a metagen for namespace resources +func NewNamespaceMetadataGenerator(cfg *common.Config, namespaces cache.Store) MetaGen { + return &namespace{ + resource: NewResourceMetadataGenerator(cfg), + store: namespaces, + } +} + +// Generate generates namespace metadata from a resource object +func (n *namespace) Generate(obj kubernetes.Resource, opts ...FieldOptions) common.MapStr { + _, ok := obj.(*kubernetes.Namespace) + if !ok { + return nil + } + + meta := n.resource.Generate(resource, obj, opts...) + // TODO: remove this call when moving to 8.0 + meta = flattenMetadata(meta) + + // TODO: Add extra fields in here if need be + return meta +} + +// GenerateFromName generates pod metadata from a namespace name +func (n *namespace) GenerateFromName(name string, opts ...FieldOptions) common.MapStr { + if n.store == nil { + return nil + } + + if obj, ok, _ := n.store.GetByKey(name); ok { + no, ok := obj.(*kubernetes.Namespace) + if !ok { + return nil + } + + return n.Generate(no, opts...) + } else { + return nil + } +} + +func flattenMetadata(in common.MapStr) common.MapStr { + out := common.MapStr{} + rawFields, err := in.GetValue(resource) + if err != nil { + return nil + } + + fields, ok := rawFields.(common.MapStr) + if !ok { + return nil + } + + for k, v := range fields { + if k == "name" { + out[resource] = v + } else { + out[resource+"_"+k] = v + } + } + + return out +} diff --git a/libbeat/common/kubernetes/metadata/namespace_test.go b/libbeat/common/kubernetes/metadata/namespace_test.go new file mode 100644 index 00000000000..4011a2f0053 --- /dev/null +++ b/libbeat/common/kubernetes/metadata/namespace_test.go @@ -0,0 +1,147 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package metadata + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/kubernetes" +) + +func TestNamespace_Generate(t *testing.T) { + uid := "005f3b90-4b9d-12f8-acf0-31020a840133" + name := "obj" + tests := []struct { + input kubernetes.Resource + output common.MapStr + name string + }{ + { + name: "test simple object", + input: &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Labels: map[string]string{ + "foo": "bar", + }, + Annotations: map[string]string{}, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + }, + // Use this for 8.0 + /*output: common.MapStr{ + "namespace": common.MapStr{ + "name": "obj", + "uid": uid, + "labels": common.MapStr{ + "foo": "bar", + }, + }, + },*/ + output: common.MapStr{ + "namespace": "obj", + "namespace_uid": uid, + "namespace_labels": common.MapStr{ + "foo": "bar", + }, + }, + }, + } + + cfg := common.NewConfig() + metagen := NewNamespaceMetadataGenerator(cfg, nil) + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.output, metagen.Generate(test.input)) + }) + } +} + +func TestNamespace_GenerateFromName(t *testing.T) { + uid := "005f3b90-4b9d-12f8-acf0-31020a840133" + name := "obj" + tests := []struct { + input kubernetes.Resource + output common.MapStr + name string + }{ + { + name: "test simple object", + input: &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Labels: map[string]string{ + "foo": "bar", + }, + Annotations: map[string]string{}, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + }, + // Use this for 8.0 + /* + output: common.MapStr{ + "namespace": common.MapStr{ + "name": "obj", + "uid": uid, + "labels": common.MapStr{ + "foo": "bar", + }, + }, + },*/ + output: common.MapStr{ + "namespace": "obj", + "namespace_uid": uid, + "namespace_labels": common.MapStr{ + "foo": "bar", + }, + }, + }, + } + + for _, test := range tests { + cfg := common.NewConfig() + namespaces := cache.NewStore(cache.MetaNamespaceKeyFunc) + namespaces.Add(test.input) + metagen := NewNamespaceMetadataGenerator(cfg, namespaces) + + accessor, err := meta.Accessor(test.input) + require.Nil(t, err) + + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.output, metagen.GenerateFromName(fmt.Sprint(accessor.GetName()))) + }) + } +} diff --git a/libbeat/common/kubernetes/metadata/node.go b/libbeat/common/kubernetes/metadata/node.go new file mode 100644 index 00000000000..309e7489a8c --- /dev/null +++ b/libbeat/common/kubernetes/metadata/node.go @@ -0,0 +1,68 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package metadata + +import ( + "k8s.io/client-go/tools/cache" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/kubernetes" +) + +type node struct { + store cache.Store + resource *Resource +} + +// NewNodeMetadataGenerator creates a metagen for service resources +func NewNodeMetadataGenerator(cfg *common.Config, nodes cache.Store) MetaGen { + return &node{ + resource: NewResourceMetadataGenerator(cfg), + store: nodes, + } +} + +// Generate generates service metadata from a resource object +func (n *node) Generate(obj kubernetes.Resource, opts ...FieldOptions) common.MapStr { + _, ok := obj.(*kubernetes.Node) + if !ok { + return nil + } + + meta := n.resource.Generate("node", obj, opts...) + // TODO: Add extra fields in here if need be + return meta +} + +// GenerateFromName generates pod metadata from a service name +func (n *node) GenerateFromName(name string, opts ...FieldOptions) common.MapStr { + if n.store == nil { + return nil + } + + if obj, ok, _ := n.store.GetByKey(name); ok { + no, ok := obj.(*kubernetes.Node) + if !ok { + return nil + } + + return n.Generate(no, opts...) + } + + return nil +} diff --git a/libbeat/common/kubernetes/metadata/node_test.go b/libbeat/common/kubernetes/metadata/node_test.go new file mode 100644 index 00000000000..9d198361ab9 --- /dev/null +++ b/libbeat/common/kubernetes/metadata/node_test.go @@ -0,0 +1,130 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package metadata + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/kubernetes" +) + +func TestNode_Generate(t *testing.T) { + uid := "005f3b90-4b9d-12f8-acf0-31020a840133" + name := "obj" + tests := []struct { + input kubernetes.Resource + output common.MapStr + name string + }{ + { + name: "test simple object", + input: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Labels: map[string]string{ + "foo": "bar", + }, + Annotations: map[string]string{}, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Node", + APIVersion: "v1", + }, + }, + output: common.MapStr{ + "node": common.MapStr{ + "name": "obj", + "uid": uid, + "labels": common.MapStr{ + "foo": "bar", + }, + }, + }, + }, + } + + cfg := common.NewConfig() + metagen := NewNodeMetadataGenerator(cfg, nil) + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.output, metagen.Generate(test.input)) + }) + } +} + +func TestNode_GenerateFromName(t *testing.T) { + uid := "005f3b90-4b9d-12f8-acf0-31020a840133" + name := "obj" + tests := []struct { + input kubernetes.Resource + output common.MapStr + name string + }{ + { + name: "test simple object", + input: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Labels: map[string]string{ + "foo": "bar", + }, + Annotations: map[string]string{}, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Node", + APIVersion: "v1", + }, + }, + output: common.MapStr{ + "node": common.MapStr{ + "name": "obj", + "uid": uid, + "labels": common.MapStr{ + "foo": "bar", + }, + }, + }, + }, + } + + for _, test := range tests { + cfg := common.NewConfig() + nodes := cache.NewStore(cache.MetaNamespaceKeyFunc) + nodes.Add(test.input) + metagen := NewNodeMetadataGenerator(cfg, nodes) + + accessor, err := meta.Accessor(test.input) + require.Nil(t, err) + + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.output, metagen.GenerateFromName(fmt.Sprint(accessor.GetName()))) + }) + } +} diff --git a/libbeat/common/kubernetes/metadata/pod.go b/libbeat/common/kubernetes/metadata/pod.go new file mode 100644 index 00000000000..b42fa005cc0 --- /dev/null +++ b/libbeat/common/kubernetes/metadata/pod.go @@ -0,0 +1,91 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package metadata + +import ( + "k8s.io/client-go/tools/cache" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/kubernetes" +) + +type pod struct { + store cache.Store + node MetaGen + namespace MetaGen + resource *Resource +} + +// NewPodMetadataGenerator creates a metagen for pod resources +func NewPodMetadataGenerator(cfg *common.Config, pods cache.Store, node MetaGen, namespace MetaGen) MetaGen { + return &pod{ + resource: NewResourceMetadataGenerator(cfg), + store: pods, + node: node, + namespace: namespace, + } +} + +// Generate generates pod metadata from a resource object +func (p *pod) Generate(obj kubernetes.Resource, opts ...FieldOptions) common.MapStr { + po, ok := obj.(*kubernetes.Pod) + if !ok { + return nil + } + + out := p.resource.Generate("pod", obj, opts...) + + if p.node != nil { + meta := p.node.GenerateFromName(po.Spec.NodeName) + if meta != nil { + out.Put("node", meta["node"]) + } else { + out.Put("node.name", po.Spec.NodeName) + } + } else { + out.Put("node.name", po.Spec.NodeName) + } + + if p.namespace != nil { + meta := p.namespace.GenerateFromName(po.GetNamespace()) + if meta != nil { + // Use this in 8.0 + //out.Put("namespace", meta["namespace"]) + out.DeepUpdate(meta) + } + } + return out +} + +// GenerateFromName generates pod metadata from a pod name +func (p *pod) GenerateFromName(name string, opts ...FieldOptions) common.MapStr { + if p.store == nil { + return nil + } + + if obj, ok, _ := p.store.GetByKey(name); ok { + po, ok := obj.(*kubernetes.Pod) + if !ok { + return nil + } + + return p.Generate(po, opts...) + } + + return nil +} diff --git a/libbeat/common/kubernetes/metadata/pod_test.go b/libbeat/common/kubernetes/metadata/pod_test.go new file mode 100644 index 00000000000..2c42958a759 --- /dev/null +++ b/libbeat/common/kubernetes/metadata/pod_test.go @@ -0,0 +1,345 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package metadata + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/kubernetes" +) + +func TestPod_Generate(t *testing.T) { + uid := "005f3b90-4b9d-12f8-acf0-31020a840133" + namespace := "default" + name := "obj" + boolean := true + tests := []struct { + input kubernetes.Resource + output common.MapStr + name string + }{ + { + name: "test simple object", + input: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{ + "foo": "bar", + }, + Annotations: map[string]string{}, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + Spec: v1.PodSpec{ + NodeName: "testnode", + }, + }, + output: common.MapStr{ + "pod": common.MapStr{ + "name": "obj", + "uid": uid, + "labels": common.MapStr{ + "foo": "bar", + }, + }, + "namespace": "default", + "node": common.MapStr{ + "name": "testnode", + }, + }, + }, + { + name: "test object with owner reference", + input: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{ + "foo": "bar", + }, + Annotations: map[string]string{}, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps", + Kind: "Deployment", + Name: "owner", + UID: "005f3b90-4b9d-12f8-acf0-31020a840144", + Controller: &boolean, + }, + }, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + Spec: v1.PodSpec{ + NodeName: "testnode", + }, + }, + output: common.MapStr{ + "pod": common.MapStr{ + "name": "obj", + "uid": uid, + "labels": common.MapStr{ + "foo": "bar", + }, + }, + "namespace": "default", + "deployment": common.MapStr{ + "name": "owner", + }, + "node": common.MapStr{ + "name": "testnode", + }, + }, + }, + } + + cfg := common.NewConfig() + metagen := NewPodMetadataGenerator(cfg, nil, nil, nil) + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.output, metagen.Generate(test.input)) + }) + } +} + +func TestPod_GenerateFromName(t *testing.T) { + uid := "005f3b90-4b9d-12f8-acf0-31020a840133" + namespace := "default" + name := "obj" + boolean := true + tests := []struct { + input kubernetes.Resource + output common.MapStr + name string + }{ + { + name: "test simple object", + input: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{ + "foo": "bar", + }, + Annotations: map[string]string{}, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + Spec: v1.PodSpec{ + NodeName: "testnode", + }, + }, + output: common.MapStr{ + "pod": common.MapStr{ + "name": "obj", + "uid": uid, + "labels": common.MapStr{ + "foo": "bar", + }, + }, + "namespace": "default", + "node": common.MapStr{ + "name": "testnode", + }, + }, + }, + { + name: "test object with owner reference", + input: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{ + "foo": "bar", + }, + Annotations: map[string]string{}, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps", + Kind: "Deployment", + Name: "owner", + UID: "005f3b90-4b9d-12f8-acf0-31020a840144", + Controller: &boolean, + }, + }, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + Spec: v1.PodSpec{ + NodeName: "testnode", + }, + }, + output: common.MapStr{ + "pod": common.MapStr{ + "name": "obj", + "uid": uid, + "labels": common.MapStr{ + "foo": "bar", + }, + }, + "namespace": "default", + "deployment": common.MapStr{ + "name": "owner", + }, + "node": common.MapStr{ + "name": "testnode", + }, + }, + }, + } + + for _, test := range tests { + cfg := common.NewConfig() + pods := cache.NewStore(cache.MetaNamespaceKeyFunc) + pods.Add(test.input) + metagen := NewPodMetadataGenerator(cfg, pods, nil, nil) + + accessor, err := meta.Accessor(test.input) + require.Nil(t, err) + + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.output, metagen.GenerateFromName(fmt.Sprint(accessor.GetNamespace(), "/", accessor.GetName()))) + }) + } +} + +func TestPod_GenerateWithNodeNamespace(t *testing.T) { + uid := "005f3b90-4b9d-12f8-acf0-31020a840133" + namespace := "default" + name := "obj" + tests := []struct { + input kubernetes.Resource + node kubernetes.Resource + namespace kubernetes.Resource + output common.MapStr + name string + }{ + { + name: "test simple object", + input: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{ + "foo": "bar", + }, + Annotations: map[string]string{}, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + Spec: v1.PodSpec{ + NodeName: "testnode", + }, + }, + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testnode", + UID: types.UID(uid), + Labels: map[string]string{ + "nodekey": "nodevalue", + }, + Annotations: map[string]string{}, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Node", + APIVersion: "v1", + }, + }, + namespace: &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + UID: types.UID(uid), + Labels: map[string]string{ + "nskey": "nsvalue", + }, + Annotations: map[string]string{}, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + }, + output: common.MapStr{ + "pod": common.MapStr{ + "name": "obj", + "uid": uid, + "labels": common.MapStr{ + "foo": "bar", + }, + }, + "namespace": "default", + "namespace_uid": uid, + "namespace_labels": common.MapStr{ + "nskey": "nsvalue", + }, + "node": common.MapStr{ + "name": "testnode", + "uid": uid, + "labels": common.MapStr{ + "nodekey": "nodevalue", + }, + }, + }, + }, + } + + for _, test := range tests { + cfg := common.NewConfig() + pods := cache.NewStore(cache.MetaNamespaceKeyFunc) + pods.Add(test.input) + + nodes := cache.NewStore(cache.MetaNamespaceKeyFunc) + nodes.Add(test.node) + nodeMeta := NewNodeMetadataGenerator(cfg, nodes) + + namespaces := cache.NewStore(cache.MetaNamespaceKeyFunc) + namespaces.Add(test.namespace) + nsMeta := NewNamespaceMetadataGenerator(cfg, namespaces) + + metagen := NewPodMetadataGenerator(cfg, pods, nodeMeta, nsMeta) + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.output, metagen.Generate(test.input)) + }) + } +} diff --git a/libbeat/common/kubernetes/metadata/resource.go b/libbeat/common/kubernetes/metadata/resource.go new file mode 100644 index 00000000000..eb00455ffab --- /dev/null +++ b/libbeat/common/kubernetes/metadata/resource.go @@ -0,0 +1,134 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package metadata + +import ( + "strings" + + "k8s.io/apimachinery/pkg/api/meta" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/kubernetes" + "github.com/elastic/beats/libbeat/common/safemapstr" +) + +// Resource generates metadata for any kubernetes resource +type Resource struct { + config *Config +} + +// NewResourceMetadataGenerator creates a metadata generator for a generic resource +func NewResourceMetadataGenerator(cfg *common.Config) *Resource { + config := defaultConfig() + config.Unmarshal(cfg) + + return &Resource{ + config: &config, + } +} + +// Generate takes a kind and an object and creates metadata for the same +func (r *Resource) Generate(kind string, obj kubernetes.Resource, options ...FieldOptions) common.MapStr { + accessor, err := meta.Accessor(obj) + if err != nil { + return nil + } + + labelMap := common.MapStr{} + if len(r.config.IncludeLabels) == 0 { + for k, v := range accessor.GetLabels() { + if r.config.LabelsDedot { + label := common.DeDot(k) + labelMap.Put(label, v) + } else { + safemapstr.Put(labelMap, k, v) + } + } + } else { + labelMap = generateMapSubset(accessor.GetLabels(), r.config.IncludeLabels, r.config.LabelsDedot) + } + + // Exclude any labels that are present in the exclude_labels config + for _, label := range r.config.ExcludeLabels { + labelMap.Delete(label) + } + + annotationsMap := generateMapSubset(accessor.GetAnnotations(), r.config.IncludeAnnotations, r.config.AnnotationsDedot) + + meta := common.MapStr{ + strings.ToLower(kind): common.MapStr{ + "name": accessor.GetName(), + "uid": string(accessor.GetUID()), + }, + } + + if accessor.GetNamespace() != "" { + // TODO make this namespace.name in 8.0 + safemapstr.Put(meta, "namespace", accessor.GetNamespace()) + } + + // Add controller metadata if present + if r.config.IncludeCreatorMetadata { + for _, ref := range accessor.GetOwnerReferences() { + if ref.Controller != nil && *ref.Controller { + switch ref.Kind { + // TODO grow this list as we keep adding more `state_*` metricsets + case "Deployment", + "ReplicaSet", + "StatefulSet": + safemapstr.Put(meta, strings.ToLower(ref.Kind)+".name", ref.Name) + } + } + } + } + + if len(labelMap) != 0 { + safemapstr.Put(meta, strings.ToLower(kind)+".labels", labelMap) + } + + if len(annotationsMap) != 0 { + safemapstr.Put(meta, strings.ToLower(kind)+".annotations", annotationsMap) + } + + for _, option := range options { + option(meta) + } + + return meta +} + +func generateMapSubset(input map[string]string, keys []string, dedot bool) common.MapStr { + output := common.MapStr{} + if input == nil { + return output + } + + for _, key := range keys { + value, ok := input[key] + if ok { + if dedot { + dedotKey := common.DeDot(key) + output.Put(dedotKey, value) + } else { + safemapstr.Put(output, key, value) + } + } + } + + return output +} diff --git a/libbeat/common/kubernetes/metadata/resource_test.go b/libbeat/common/kubernetes/metadata/resource_test.go new file mode 100644 index 00000000000..dbf168644af --- /dev/null +++ b/libbeat/common/kubernetes/metadata/resource_test.go @@ -0,0 +1,121 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package metadata + +import ( + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/kubernetes" +) + +func TestResource_Generate(t *testing.T) { + uid := "005f3b90-4b9d-12f8-acf0-31020a840133" + namespace := "default" + name := "obj" + boolean := true + tests := []struct { + input kubernetes.Resource + output common.MapStr + name string + }{ + { + name: "test simple object", + input: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{ + "foo": "bar", + }, + Annotations: map[string]string{}, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + }, + output: common.MapStr{ + "pod": common.MapStr{ + "name": "obj", + "uid": uid, + "labels": common.MapStr{ + "foo": "bar", + }, + }, + "namespace": "default", + }, + }, + { + name: "test object with owner reference", + input: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{ + "foo": "bar", + }, + Annotations: map[string]string{}, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps", + Kind: "Deployment", + Name: "owner", + UID: "005f3b90-4b9d-12f8-acf0-31020a840144", + Controller: &boolean, + }, + }, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + }, + output: common.MapStr{ + "pod": common.MapStr{ + "name": "obj", + "uid": uid, + "labels": common.MapStr{ + "foo": "bar", + }, + }, + "namespace": "default", + "deployment": common.MapStr{ + "name": "owner", + }, + }, + }, + } + + cfg := defaultConfig() + metagen := &Resource{ + config: &cfg, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.output, metagen.Generate("pod", test.input)) + }) + } +} diff --git a/libbeat/common/kubernetes/metadata/service.go b/libbeat/common/kubernetes/metadata/service.go new file mode 100644 index 00000000000..7c2fab68fd3 --- /dev/null +++ b/libbeat/common/kubernetes/metadata/service.go @@ -0,0 +1,79 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package metadata + +import ( + "k8s.io/client-go/tools/cache" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/kubernetes" +) + +type service struct { + store cache.Store + namespace MetaGen + resource *Resource +} + +// NewServiceMetadataGenerator creates a metagen for service resources +func NewServiceMetadataGenerator(cfg *common.Config, services cache.Store, namespace MetaGen) MetaGen { + return &service{ + resource: NewResourceMetadataGenerator(cfg), + store: services, + namespace: namespace, + } +} + +// Generate generates service metadata from a resource object +func (s *service) Generate(obj kubernetes.Resource, opts ...FieldOptions) common.MapStr { + svc, ok := obj.(*kubernetes.Service) + if !ok { + return nil + } + + out := s.resource.Generate("service", obj, opts...) + + if s.namespace != nil { + meta := s.namespace.GenerateFromName(svc.GetNamespace()) + if meta != nil { + // Use this in 8.0 + //out.Put("namespace", meta["namespace"]) + out.DeepUpdate(meta) + } + } + + return out +} + +// GenerateFromName generates pod metadata from a service name +func (s *service) GenerateFromName(name string, opts ...FieldOptions) common.MapStr { + if s.store == nil { + return nil + } + + if obj, ok, _ := s.store.GetByKey(name); ok { + svc, ok := obj.(*kubernetes.Service) + if !ok { + return nil + } + + return s.Generate(svc, opts...) + } + + return nil +} diff --git a/libbeat/common/kubernetes/metadata/service_test.go b/libbeat/common/kubernetes/metadata/service_test.go new file mode 100644 index 00000000000..dfda4e9db27 --- /dev/null +++ b/libbeat/common/kubernetes/metadata/service_test.go @@ -0,0 +1,292 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package metadata + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/kubernetes" +) + +func TestService_Generate(t *testing.T) { + uid := "005f3b90-4b9d-12f8-acf0-31020a840133" + namespace := "default" + name := "obj" + boolean := true + tests := []struct { + input kubernetes.Resource + output common.MapStr + name string + }{ + { + name: "test simple object", + input: &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{ + "foo": "bar", + }, + Annotations: map[string]string{}, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + }, + output: common.MapStr{ + "service": common.MapStr{ + "name": "obj", + "uid": uid, + "labels": common.MapStr{ + "foo": "bar", + }, + }, + "namespace": "default", + }, + }, + { + name: "test object with owner reference", + input: &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{ + "foo": "bar", + }, + Annotations: map[string]string{}, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps", + Kind: "Deployment", + Name: "owner", + UID: "005f3b90-4b9d-12f8-acf0-31020a840144", + Controller: &boolean, + }, + }, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + }, + output: common.MapStr{ + "service": common.MapStr{ + "name": "obj", + "uid": uid, + "labels": common.MapStr{ + "foo": "bar", + }, + }, + "namespace": "default", + "deployment": common.MapStr{ + "name": "owner", + }, + }, + }, + } + + cfg := common.NewConfig() + metagen := NewServiceMetadataGenerator(cfg, nil, nil) + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.output, metagen.Generate(test.input)) + }) + } +} + +func TestService_GenerateFromName(t *testing.T) { + uid := "005f3b90-4b9d-12f8-acf0-31020a840133" + namespace := "default" + name := "obj" + boolean := true + tests := []struct { + input kubernetes.Resource + output common.MapStr + name string + }{ + { + name: "test simple object", + input: &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{ + "foo": "bar", + }, + Annotations: map[string]string{}, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + }, + output: common.MapStr{ + "service": common.MapStr{ + "name": "obj", + "uid": uid, + "labels": common.MapStr{ + "foo": "bar", + }, + }, + "namespace": "default", + }, + }, + { + name: "test object with owner reference", + input: &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{ + "foo": "bar", + }, + Annotations: map[string]string{}, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps", + Kind: "Deployment", + Name: "owner", + UID: "005f3b90-4b9d-12f8-acf0-31020a840144", + Controller: &boolean, + }, + }, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + }, + output: common.MapStr{ + "service": common.MapStr{ + "name": "obj", + "uid": uid, + "labels": common.MapStr{ + "foo": "bar", + }, + }, + "namespace": "default", + "deployment": common.MapStr{ + "name": "owner", + }, + }, + }, + } + + for _, test := range tests { + cfg := common.NewConfig() + services := cache.NewStore(cache.MetaNamespaceKeyFunc) + services.Add(test.input) + metagen := NewServiceMetadataGenerator(cfg, services, nil) + + accessor, err := meta.Accessor(test.input) + require.Nil(t, err) + + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.output, metagen.GenerateFromName(fmt.Sprint(accessor.GetNamespace(), "/", accessor.GetName()))) + }) + } +} + +func TestService_GenerateWithNamespace(t *testing.T) { + uid := "005f3b90-4b9d-12f8-acf0-31020a840133" + namespace := "default" + name := "obj" + tests := []struct { + input kubernetes.Resource + namespace kubernetes.Resource + output common.MapStr + name string + }{ + { + name: "test simple object", + input: &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{ + "foo": "bar", + }, + Annotations: map[string]string{}, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + }, + namespace: &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + UID: types.UID(uid), + Labels: map[string]string{ + "nskey": "nsvalue", + }, + Annotations: map[string]string{}, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + }, + output: common.MapStr{ + "service": common.MapStr{ + "name": "obj", + "uid": uid, + "labels": common.MapStr{ + "foo": "bar", + }, + }, + "namespace": "default", + "namespace_uid": uid, + "namespace_labels": common.MapStr{ + "nskey": "nsvalue", + }, + }, + }, + } + + for _, test := range tests { + cfg := common.NewConfig() + services := cache.NewStore(cache.MetaNamespaceKeyFunc) + services.Add(test.input) + + namespaces := cache.NewStore(cache.MetaNamespaceKeyFunc) + namespaces.Add(test.namespace) + nsMeta := NewNamespaceMetadataGenerator(cfg, namespaces) + + metagen := NewServiceMetadataGenerator(cfg, services, nsMeta) + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.output, metagen.Generate(test.input)) + }) + } +} diff --git a/libbeat/common/kubernetes/metadata_test.go b/libbeat/common/kubernetes/metadata_test.go deleted file mode 100644 index 241f366bab5..00000000000 --- a/libbeat/common/kubernetes/metadata_test.go +++ /dev/null @@ -1,295 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package kubernetes - -import ( - "testing" - - "github.com/stretchr/testify/assert" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - - "github.com/elastic/beats/libbeat/common" -) - -func TestPodMetadata(t *testing.T) { - UID := "005f3b90-4b9d-12f8-acf0-31020a840133" - Deployment := "Deployment" - test := "test" - ReplicaSet := "ReplicaSet" - StatefulSet := "StatefulSet" - True := true - False := false - tests := []struct { - name string - pod *Pod - meta common.MapStr - }{ - { - name: "standalone Pod", - pod: &Pod{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{"a.key": "foo", "a": "bar"}, - UID: types.UID(UID), - Namespace: test, - }, - Spec: v1.PodSpec{ - NodeName: test, - }, - }, - meta: common.MapStr{ - "pod": common.MapStr{ - "name": "", - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - }, - "node": common.MapStr{"name": "test"}, - "namespace": "test", - "labels": common.MapStr{"a": common.MapStr{"value": "bar", "key": "foo"}}, - }, - }, - { - name: "Deployment + Replicaset owned Pod", - pod: &Pod{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{"a.key": "foo", "a": "bar"}, - UID: types.UID(UID), - OwnerReferences: []metav1.OwnerReference{ - { - Kind: Deployment, - Name: test, - Controller: &True, - }, - { - Kind: ReplicaSet, - Name: ReplicaSet, - Controller: &False, - }, - }, - }, - Spec: v1.PodSpec{ - NodeName: test, - }, - }, - meta: common.MapStr{ - "pod": common.MapStr{ - "name": "", - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - }, - "node": common.MapStr{"name": "test"}, - "labels": common.MapStr{"a": common.MapStr{"value": "bar", "key": "foo"}}, - "deployment": common.MapStr{"name": "test"}, - }, - }, - { - name: "StatefulSet + Deployment owned Pod", - pod: &Pod{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{"a.key": "foo", "a": "bar"}, - UID: types.UID(UID), - OwnerReferences: []metav1.OwnerReference{ - { - Kind: Deployment, - Name: test, - Controller: &False, - }, - { - Kind: ReplicaSet, - Name: ReplicaSet, - Controller: &True, - }, - { - Kind: StatefulSet, - Name: StatefulSet, - Controller: &True, - }, - }, - }, - Spec: v1.PodSpec{ - NodeName: test, - }, - }, - meta: common.MapStr{ - "pod": common.MapStr{ - "name": "", - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - }, - "node": common.MapStr{"name": "test"}, - "labels": common.MapStr{"a": common.MapStr{"value": "bar", "key": "foo"}}, - "replicaset": common.MapStr{"name": "ReplicaSet"}, - "statefulset": common.MapStr{"name": "StatefulSet"}, - }, - }, - { - name: "empty owner reference Pod", - pod: &Pod{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{"a.key": "foo", "a": "bar"}, - UID: types.UID(UID), - OwnerReferences: []metav1.OwnerReference{{}}, - Namespace: test, - }, - Spec: v1.PodSpec{ - NodeName: test, - }, - }, - meta: common.MapStr{ - "pod": common.MapStr{ - "name": "", - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - }, - "node": common.MapStr{"name": "test"}, - "namespace": "test", - "labels": common.MapStr{"a": common.MapStr{"value": "bar", "key": "foo"}}, - }, - }, - } - - for _, test := range tests { - config, err := common.NewConfigFrom(map[string]interface{}{ - "labels.dedot": false, - "annotations.dedot": false, - "include_annotations": []string{"b", "b.key"}, - }) - - metaGen, err := NewMetaGenerator(config) - if err != nil { - t.Fatalf("case %q failed: %s", test.name, err.Error()) - } - assert.Equal(t, metaGen.PodMetadata(test.pod), test.meta, "test failed for case %q", test.name) - } -} - -func TestPodMetadataDeDot(t *testing.T) { - UID := "005f3b90-4b9d-12f8-acf0-31020a840133" - Deployment := "Deployment" - test := "test" - ReplicaSet := "ReplicaSet" - StatefulSet := "StatefulSet" - True := true - False := false - tests := []struct { - pod *Pod - meta common.MapStr - }{ - { - pod: &Pod{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{"a.key": "foo", "a": "bar"}, - UID: types.UID(UID), - Namespace: test, - Annotations: map[string]string{"b.key": "foo", "b": "bar"}, - }, - Spec: v1.PodSpec{ - NodeName: test, - }, - }, - meta: common.MapStr{ - "pod": common.MapStr{ - "name": "", - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - }, - "node": common.MapStr{"name": "test"}, - "namespace": "test", - "labels": common.MapStr{"a": "bar", "a_key": "foo"}, - "annotations": common.MapStr{"b": "bar", "b_key": "foo"}, - }, - }, - { - pod: &Pod{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{"a.key": "foo", "a": "bar"}, - UID: types.UID(UID), - OwnerReferences: []metav1.OwnerReference{ - { - Kind: Deployment, - Name: test, - Controller: &True, - }, - { - Kind: ReplicaSet, - Name: ReplicaSet, - Controller: &False, - }, - }, - }, - Spec: v1.PodSpec{ - NodeName: test, - }, - }, - meta: common.MapStr{ - "pod": common.MapStr{ - "name": "", - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - }, - "node": common.MapStr{"name": "test"}, - "labels": common.MapStr{"a": "bar", "a_key": "foo"}, - "deployment": common.MapStr{"name": "test"}, - }, - }, - { - pod: &Pod{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{"a.key": "foo", "a": "bar"}, - UID: types.UID(UID), - OwnerReferences: []metav1.OwnerReference{ - { - Kind: Deployment, - Name: test, - Controller: &False, - }, - { - Kind: ReplicaSet, - Name: ReplicaSet, - Controller: &True, - }, - { - Kind: StatefulSet, - Name: StatefulSet, - Controller: &True, - }, - }, - }, - Spec: v1.PodSpec{ - NodeName: test, - }, - }, - meta: common.MapStr{ - "pod": common.MapStr{ - "name": "", - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - }, - "node": common.MapStr{"name": "test"}, - "labels": common.MapStr{"a": "bar", "a_key": "foo"}, - "replicaset": common.MapStr{"name": "ReplicaSet"}, - "statefulset": common.MapStr{"name": "StatefulSet"}, - }, - }, - } - - for _, test := range tests { - config, err := common.NewConfigFrom(map[string]interface{}{ - "include_annotations": []string{"b", "b.key"}, - }) - metaGen, err := NewMetaGenerator(config) - if err != nil { - t.Fatal(err) - } - assert.Equal(t, metaGen.PodMetadata(test.pod), test.meta) - } -} diff --git a/libbeat/common/kubernetes/types.go b/libbeat/common/kubernetes/types.go index 9b98d22c399..cd600e9f60c 100644 --- a/libbeat/common/kubernetes/types.go +++ b/libbeat/common/kubernetes/types.go @@ -46,6 +46,9 @@ type PodStatus = v1.PodStatus // Node data type Node = v1.Node +// Namespace data +type Namespace = v1.Namespace + // Container data type Container = v1.Container diff --git a/libbeat/common/kubernetes/watcher.go b/libbeat/common/kubernetes/watcher.go index 899574184cb..79f2b862579 100644 --- a/libbeat/common/kubernetes/watcher.go +++ b/libbeat/common/kubernetes/watcher.go @@ -23,11 +23,9 @@ import ( "time" "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" @@ -55,6 +53,9 @@ type Watcher interface { // AddEventHandler add event handlers for corresponding event type watched AddEventHandler(ResourceEventHandler) + + // Store returns the store object for the watcher + Store() cache.Store } // WatchOptions controls watch behaviors @@ -83,121 +84,17 @@ type watcher struct { logger *logp.Logger } -func nodeSelector(options *metav1.ListOptions, opt WatchOptions) { - if opt.Node != "" { - options.FieldSelector = "spec.nodeName=" + opt.Node - } -} - -func nameSelector(options *metav1.ListOptions, opt WatchOptions) { - if opt.Node != "" { - options.FieldSelector = "metadata.name=" + opt.Node - } -} - // NewWatcher initializes the watcher client to provide a events handler for // resource from the cluster (filtered to the given node) -func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOptions) (Watcher, error) { - var informer cache.SharedInformer +func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) { var store cache.Store var queue workqueue.RateLimitingInterface - var objType string - - var listwatch *cache.ListWatch - switch resource.(type) { - case *Pod: - p := client.CoreV1().Pods(opts.Namespace) - listwatch = &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - nodeSelector(&options, opts) - return p.List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - nodeSelector(&options, opts) - return p.Watch(options) - }, - } - objType = "pod" - case *Event: - e := client.CoreV1().Events(opts.Namespace) - listwatch = &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return e.List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return e.Watch(options) - }, - } - - objType = "event" - case *Node: - n := client.CoreV1().Nodes() - listwatch = &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - nameSelector(&options, opts) - return n.List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - nameSelector(&options, opts) - return n.Watch(options) - }, - } - - objType = "node" - case *Deployment: - d := client.AppsV1().Deployments(opts.Namespace) - listwatch = &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return d.List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return d.Watch(options) - }, - } - - objType = "deployment" - case *ReplicaSet: - rs := client.AppsV1().ReplicaSets(opts.Namespace) - listwatch = &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return rs.List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return rs.Watch(options) - }, - } - - objType = "replicaset" - case *StatefulSet: - ss := client.AppsV1().StatefulSets(opts.Namespace) - listwatch = &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return ss.List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return ss.Watch(options) - }, - } - - objType = "statefulset" - case *Service: - svc := client.CoreV1().Services(opts.Namespace) - listwatch = &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return svc.List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return svc.Watch(options) - }, - } - - objType = "service" - default: - return nil, fmt.Errorf("unsupported resource type for watching %T", resource) + informer, objType, err := NewInformer(client, resource, opts, indexers) + if err != nil { + return nil, err } - informer = cache.NewSharedInformer(listwatch, resource, opts.SyncTimeout) store = informer.GetStore() queue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), objType) ctx, cancel := context.WithCancel(context.Background()) @@ -210,6 +107,7 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption ctx: ctx, stop: cancel, logger: logp.NewLogger("kubernetes"), + handler: NoOpEventHandlerFuncs{}, } w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -233,23 +131,16 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption return w, nil } -// enqueue takes the most recent object that was received, figures out the namespace/name of the object -// and adds it to the work queue for processing. -func (w *watcher) enqueue(obj interface{}, state string) { - // DeletionHandlingMetaNamespaceKeyFunc that we get a key only if the resource's state is not Unknown. - key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) - if err != nil { - return - } - - w.queue.Add(&item{key, state}) -} - // AddEventHandler adds a resource handler to process each request that is coming into the watcher func (w *watcher) AddEventHandler(h ResourceEventHandler) { w.handler = h } +// Store returns the store object for the resource that is being watched +func (w *watcher) Store() cache.Store { + return w.store +} + // Start watching pods func (w *watcher) Start() error { go w.informer.Run(w.ctx.Done()) @@ -271,6 +162,23 @@ func (w *watcher) Start() error { return nil } +func (w *watcher) Stop() { + w.queue.ShutDown() + w.stop() +} + +// enqueue takes the most recent object that was received, figures out the namespace/name of the object +// and adds it to the work queue for processing. +func (w *watcher) enqueue(obj interface{}, state string) { + // DeletionHandlingMetaNamespaceKeyFunc that we get a key only if the resource's state is not Unknown. + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + return + } + + w.queue.Add(&item{key, state}) +} + // process gets the top of the work queue and processes the object that is received. func (w *watcher) process(ctx context.Context) bool { keyObj, quit := w.queue.Get() @@ -318,8 +226,3 @@ func (w *watcher) process(ctx context.Context) bool { return true } - -func (w *watcher) Stop() { - w.queue.ShutDown() - w.stop() -} diff --git a/libbeat/docs/shared-autodiscover.asciidoc b/libbeat/docs/shared-autodiscover.asciidoc index 616400efcc1..9e7087e72be 100644 --- a/libbeat/docs/shared-autodiscover.asciidoc +++ b/libbeat/docs/shared-autodiscover.asciidoc @@ -227,6 +227,19 @@ running configuration for a container, 60s by default. either take `node` or `cluster` as values. `node` scope allows discovery of resources in the specified node. `cluster` scope allows cluster wide discovery. Only `pod` and `node` resources can be discovered at node scope. +`add_resource_metadata`:: (Optional) Specify resources against which additional enrichment needs to be done one. +`add_resource_metadata` can be done for `node` or `namespace`. Example: + +["source","yaml",subs="attributes"] +------------------------------------------------------------------------------------- + add_resource_metadata: + namespace: + enabled: true + include_labels: ["namespacelabel1"] + node: + enabled: true + include_labels: ["nodelabel2"] +------------------------------------------------------------------------------------- include::../../{beatname_lc}/docs/autodiscover-kubernetes-config.asciidoc[] diff --git a/libbeat/processors/add_kubernetes_metadata/indexers.go b/libbeat/processors/add_kubernetes_metadata/indexers.go index 23328e64541..3d6dd601fcb 100644 --- a/libbeat/processors/add_kubernetes_metadata/indexers.go +++ b/libbeat/processors/add_kubernetes_metadata/indexers.go @@ -21,6 +21,8 @@ import ( "fmt" "sync" + "github.com/elastic/beats/libbeat/common/kubernetes/metadata" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/kubernetes" "github.com/elastic/beats/libbeat/logp" @@ -58,10 +60,10 @@ type Indexers struct { } // IndexerConstructor builds a new indexer from its settings -type IndexerConstructor func(config common.Config, metaGen kubernetes.MetaGenerator) (Indexer, error) +type IndexerConstructor func(config common.Config, metaGen metadata.MetaGen) (Indexer, error) // NewIndexers builds indexers object -func NewIndexers(configs PluginConfig, metaGen kubernetes.MetaGenerator) *Indexers { +func NewIndexers(configs PluginConfig, metaGen metadata.MetaGen) *Indexers { indexers := []Indexer{} for _, pluginConfigs := range configs { for name, pluginConfig := range pluginConfigs { @@ -125,17 +127,17 @@ func (i *Indexers) Empty() bool { // PodNameIndexer implements default indexer based on pod name type PodNameIndexer struct { - metaGen kubernetes.MetaGenerator + metaGen metadata.MetaGen } // NewPodNameIndexer initializes and returns a PodNameIndexer -func NewPodNameIndexer(_ common.Config, metaGen kubernetes.MetaGenerator) (Indexer, error) { +func NewPodNameIndexer(_ common.Config, metaGen metadata.MetaGen) (Indexer, error) { return &PodNameIndexer{metaGen: metaGen}, nil } // GetMetadata returns metadata for the given pod, if it matches the index func (p *PodNameIndexer) GetMetadata(pod *kubernetes.Pod) []MetadataIndex { - data := p.metaGen.PodMetadata(pod) + data := p.metaGen.Generate(pod) return []MetadataIndex{ { Index: fmt.Sprintf("%s/%s", pod.GetObjectMeta().GetNamespace(), pod.GetObjectMeta().GetName()), @@ -151,17 +153,17 @@ func (p *PodNameIndexer) GetIndexes(pod *kubernetes.Pod) []string { // PodUIDIndexer indexes pods based on the pod UID type PodUIDIndexer struct { - metaGen kubernetes.MetaGenerator + metaGen metadata.MetaGen } // NewPodUIDIndexer initializes and returns a PodUIDIndexer -func NewPodUIDIndexer(_ common.Config, metaGen kubernetes.MetaGenerator) (Indexer, error) { +func NewPodUIDIndexer(_ common.Config, metaGen metadata.MetaGen) (Indexer, error) { return &PodUIDIndexer{metaGen: metaGen}, nil } // GetMetadata returns the composed metadata from PodNameIndexer and the pod UID func (p *PodUIDIndexer) GetMetadata(pod *kubernetes.Pod) []MetadataIndex { - data := p.metaGen.PodMetadata(pod) + data := p.metaGen.Generate(pod) return []MetadataIndex{ { Index: string(pod.GetObjectMeta().GetUID()), @@ -177,29 +179,30 @@ func (p *PodUIDIndexer) GetIndexes(pod *kubernetes.Pod) []string { // ContainerIndexer indexes pods based on all their containers IDs type ContainerIndexer struct { - metaGen kubernetes.MetaGenerator + metaGen metadata.MetaGen } // NewContainerIndexer initializes and returns a ContainerIndexer -func NewContainerIndexer(_ common.Config, metaGen kubernetes.MetaGenerator) (Indexer, error) { +func NewContainerIndexer(_ common.Config, metaGen metadata.MetaGen) (Indexer, error) { return &ContainerIndexer{metaGen: metaGen}, nil } // GetMetadata returns the composed metadata list from all registered indexers func (c *ContainerIndexer) GetMetadata(pod *kubernetes.Pod) []MetadataIndex { - var metadata []MetadataIndex + var m []MetadataIndex for _, status := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) { cID := kubernetes.ContainerID(status) if cID == "" { continue } - metadata = append(metadata, MetadataIndex{ + m = append(m, MetadataIndex{ Index: cID, - Data: c.metaGen.ContainerMetadata(pod, status.Name, status.Image), + Data: c.metaGen.Generate(pod, metadata.WithFields("container.name", status.Name), + metadata.WithFields("container.image", status.Image)), }) } - return metadata + return m } // GetIndexes returns the indexes for the given Pod @@ -217,41 +220,42 @@ func (c *ContainerIndexer) GetIndexes(pod *kubernetes.Pod) []string { // IPPortIndexer indexes pods based on all their host:port combinations type IPPortIndexer struct { - metaGen kubernetes.MetaGenerator + metaGen metadata.MetaGen } // NewIPPortIndexer creates and returns a new indexer for pod IP & ports -func NewIPPortIndexer(_ common.Config, metaGen kubernetes.MetaGenerator) (Indexer, error) { +func NewIPPortIndexer(_ common.Config, metaGen metadata.MetaGen) (Indexer, error) { return &IPPortIndexer{metaGen: metaGen}, nil } // GetMetadata returns metadata for the given pod, if it matches the index func (h *IPPortIndexer) GetMetadata(pod *kubernetes.Pod) []MetadataIndex { - var metadata []MetadataIndex + var m []MetadataIndex if pod.Status.PodIP == "" { - return metadata + return m } // Add pod IP - metadata = append(metadata, MetadataIndex{ + m = append(m, MetadataIndex{ Index: pod.Status.PodIP, - Data: h.metaGen.PodMetadata(pod), + Data: h.metaGen.Generate(pod), }) for _, container := range pod.Spec.Containers { for _, port := range container.Ports { if port.ContainerPort != 0 { - metadata = append(metadata, MetadataIndex{ + m = append(m, MetadataIndex{ Index: fmt.Sprintf("%s:%d", pod.Status.PodIP, port.ContainerPort), - Data: h.metaGen.ContainerMetadata(pod, container.Name, container.Image), + Data: h.metaGen.Generate(pod, metadata.WithFields("container.name", container.Name), + metadata.WithFields("container.image", container.Image)), }) } } } - return metadata + return m } // GetIndexes returns the indexes for the given Pod diff --git a/libbeat/processors/add_kubernetes_metadata/indexers_test.go b/libbeat/processors/add_kubernetes_metadata/indexers_test.go index b1995d0750b..0d74cece012 100644 --- a/libbeat/processors/add_kubernetes_metadata/indexers_test.go +++ b/libbeat/processors/add_kubernetes_metadata/indexers_test.go @@ -21,6 +21,8 @@ import ( "fmt" "testing" + "github.com/elastic/beats/libbeat/common/kubernetes/metadata" + "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -30,7 +32,7 @@ import ( "github.com/elastic/beats/libbeat/common/kubernetes" ) -var metagen, _ = kubernetes.NewMetaGenerator(common.NewConfig()) +var metagen = metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil) func TestPodIndexer(t *testing.T) { var testConfig = common.NewConfig() @@ -64,11 +66,11 @@ func TestPodIndexer(t *testing.T) { "pod": common.MapStr{ "name": "testpod", "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "labels": common.MapStr{ + "labelkey": "labelvalue", + }, }, "namespace": "testns", - "labels": common.MapStr{ - "labelkey": "labelvalue", - }, "node": common.MapStr{ "name": "testnode", }, @@ -84,8 +86,7 @@ func TestPodIndexer(t *testing.T) { func TestPodUIDIndexer(t *testing.T) { var testConfig = common.NewConfig() - metaGenWithPodUID, err := kubernetes.NewMetaGenerator(common.NewConfig()) - assert.Nil(t, err) + metaGenWithPodUID := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil) podUIDIndexer, err := NewPodUIDIndexer(*testConfig, metaGenWithPodUID) assert.Nil(t, err) @@ -116,11 +117,11 @@ func TestPodUIDIndexer(t *testing.T) { "pod": common.MapStr{ "name": "testpod", "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "labels": common.MapStr{ + "labelkey": "labelvalue", + }, }, "namespace": "testns", - "labels": common.MapStr{ - "labelkey": "labelvalue", - }, "node": common.MapStr{ "name": "testnode", }, @@ -172,11 +173,11 @@ func TestContainerIndexer(t *testing.T) { "pod": common.MapStr{ "name": "testpod", "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "labels": common.MapStr{ + "labelkey": "labelvalue", + }, }, "namespace": "testns", - "labels": common.MapStr{ - "labelkey": "labelvalue", - }, "node": common.MapStr{ "name": "testnode", }, @@ -249,14 +250,14 @@ func TestFilteredGenMeta(t *testing.T) { indexers := podIndexer.GetMetadata(&pod) assert.Equal(t, len(indexers), 1) - rawLabels, _ := indexers[0].Data["labels"] + rawLabels, _ := indexers[0].Data.GetValue("pod.labels") assert.NotNil(t, rawLabels) labelMap, ok := rawLabels.(common.MapStr) assert.Equal(t, ok, true) assert.Equal(t, len(labelMap), 2) - rawAnnotations := indexers[0].Data["annotations"] + rawAnnotations, _ := indexers[0].Data.GetValue("pod.annotations") assert.Nil(t, rawAnnotations) config, err := common.NewConfigFrom(map[string]interface{}{ @@ -265,8 +266,7 @@ func TestFilteredGenMeta(t *testing.T) { }) assert.Nil(t, err) - filteredGen, err := kubernetes.NewMetaGenerator(config) - assert.Nil(t, err) + filteredGen := metadata.NewPodMetadataGenerator(config, nil, nil, nil) podIndexer, err = NewPodNameIndexer(*testConfig, filteredGen) assert.Nil(t, err) @@ -274,7 +274,7 @@ func TestFilteredGenMeta(t *testing.T) { indexers = podIndexer.GetMetadata(&pod) assert.Equal(t, len(indexers), 1) - rawLabels, _ = indexers[0].Data["labels"] + rawLabels, _ = indexers[0].Data.GetValue("pod.labels") assert.NotNil(t, rawLabels) labelMap, ok = rawLabels.(common.MapStr) @@ -284,7 +284,7 @@ func TestFilteredGenMeta(t *testing.T) { ok, _ = labelMap.HasKey("foo") assert.Equal(t, ok, true) - rawAnnotations = indexers[0].Data["annotations"] + rawAnnotations, _ = indexers[0].Data.GetValue("pod.annotations") assert.NotNil(t, rawAnnotations) annotationsMap, ok := rawAnnotations.(common.MapStr) @@ -303,8 +303,7 @@ func TestFilteredGenMetaExclusion(t *testing.T) { }) assert.Nil(t, err) - filteredGen, err := kubernetes.NewMetaGenerator(config) - assert.Nil(t, err) + filteredGen := metadata.NewPodMetadataGenerator(config, nil, nil, nil) podIndexer, err := NewPodNameIndexer(*testConfig, filteredGen) assert.Nil(t, err) @@ -332,7 +331,7 @@ func TestFilteredGenMetaExclusion(t *testing.T) { indexers := podIndexer.GetMetadata(&pod) assert.Equal(t, len(indexers), 1) - rawLabels, _ := indexers[0].Data["labels"] + rawLabels, _ := indexers[0].Data.GetValue("pod.labels") assert.NotNil(t, rawLabels) labelMap, ok := rawLabels.(common.MapStr) @@ -393,11 +392,11 @@ func TestIpPortIndexer(t *testing.T) { "pod": common.MapStr{ "name": "testpod", "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "labels": common.MapStr{ + "labelkey": "labelvalue", + }, }, "namespace": "testns", - "labels": common.MapStr{ - "labelkey": "labelvalue", - }, "node": common.MapStr{ "name": "testnode", }, diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index 00ecf06d98d..5d02056871e 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -24,6 +24,8 @@ import ( "os" "time" + "github.com/elastic/beats/libbeat/common/kubernetes/metadata" + k8sclient "k8s.io/client-go/kubernetes" "github.com/elastic/beats/libbeat/beat" @@ -96,11 +98,6 @@ func New(cfg *common.Config) (processors.Processor, error) { Indexing.RUnlock() } - metaGen, err := kubernetes.NewMetaGenerator(cfg) - if err != nil { - return nil, err - } - processor := &kubernetesAnnotator{ cache: newCache(config.CleanupTimeout), kubernetesAvailable: false, @@ -122,8 +119,6 @@ func New(cfg *common.Config) (processors.Processor, error) { return processor, nil } - processor.indexers = NewIndexers(config.Indexers, metaGen) - matchers := NewMatchers(config.Matchers) if matchers.Empty() { @@ -141,12 +136,14 @@ func New(cfg *common.Config) (processors.Processor, error) { SyncTimeout: config.SyncPeriod, Node: config.Host, Namespace: config.Namespace, - }) + }, nil) if err != nil { logp.Err("kubernetes: Couldn't create watcher for %T", &kubernetes.Pod{}) return nil, err } + metaGen := metadata.NewPodMetadataGenerator(cfg, watcher.Store(), nil, nil) + processor.indexers = NewIndexers(config.Indexers, metaGen) processor.watcher = watcher processor.kubernetesAvailable = true diff --git a/metricbeat/module/kubernetes/event/event.go b/metricbeat/module/kubernetes/event/event.go index 2ef97eedd24..81f6b237697 100644 --- a/metricbeat/module/kubernetes/event/event.go +++ b/metricbeat/module/kubernetes/event/event.go @@ -74,7 +74,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { Namespace: config.Namespace, } - watcher, err := kubernetes.NewWatcher(client, &kubernetes.Event{}, watchOptions) + watcher, err := kubernetes.NewWatcher(client, &kubernetes.Event{}, watchOptions, nil) if err != nil { return nil, fmt.Errorf("fail to init kubernetes watcher: %s", err.Error()) } diff --git a/metricbeat/module/kubernetes/util/kubernetes.go b/metricbeat/module/kubernetes/util/kubernetes.go index 2f208cd804b..82b936c194c 100644 --- a/metricbeat/module/kubernetes/util/kubernetes.go +++ b/metricbeat/module/kubernetes/util/kubernetes.go @@ -22,6 +22,8 @@ import ( "sync" "time" + "github.com/elastic/beats/libbeat/common/kubernetes/metadata" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" @@ -94,7 +96,7 @@ func GetWatcher(base mb.BaseMetricSet, resource kubernetes.Resource, nodeScope b logp.Debug("kubernetes", "Initializing a new Kubernetes watcher using host: %v", config.Host) - return kubernetes.NewWatcher(client, resource, options) + return kubernetes.NewWatcher(client, resource, options, nil) } // NewResourceMetadataEnricher returns an Enricher configured for kubernetes resource events @@ -114,13 +116,16 @@ func NewResourceMetadataEnricher( return &nilEnricher{} } - metaConfig := kubernetes.DefaultMetaGeneratorConfig() + metaConfig := metadata.Config{} if err := base.Module().UnpackConfig(&metaConfig); err != nil { logp.Err("Error initializing Kubernetes metadata enricher: %s", err) return &nilEnricher{} } - metaGen := kubernetes.NewMetaGeneratorFromConfig(&metaConfig) + cfg, _ := common.NewConfigFrom(&metaConfig) + + metaGen := metadata.NewResourceMetadataGenerator(cfg) + podMetaGen := metadata.NewPodMetadataGenerator(cfg, nil, nil, nil) enricher := buildMetadataEnricher(watcher, // update func(m map[string]common.MapStr, r kubernetes.Resource) { @@ -129,7 +134,7 @@ func NewResourceMetadataEnricher( switch r := r.(type) { case *kubernetes.Pod: - m[id] = metaGen.PodMetadata(r) + m[id] = podMetaGen.Generate(r) case *kubernetes.Node: // Report node allocatable resources to PerfMetrics cache @@ -145,10 +150,18 @@ func NewResourceMetadataEnricher( } } - m[id] = metaGen.ResourceMetadata(r) + m[id] = metaGen.Generate("node", r) + case *kubernetes.Deployment: + m[id] = metaGen.Generate("deployment", r) + case *kubernetes.StatefulSet: + m[id] = metaGen.Generate("statefulset", r) + case *kubernetes.Namespace: + m[id] = metaGen.Generate("namespace", r) + case *kubernetes.ReplicaSet: + m[id] = metaGen.Generate("replicaset", r) default: - m[id] = metaGen.ResourceMetadata(r) + m[id] = metaGen.Generate(r.GetObjectKind().GroupVersionKind().Kind, r) } }, // delete @@ -188,18 +201,20 @@ func NewContainerMetadataEnricher( return &nilEnricher{} } - metaConfig := kubernetes.DefaultMetaGeneratorConfig() + metaConfig := metadata.Config{} if err := base.Module().UnpackConfig(&metaConfig); err != nil { logp.Err("Error initializing Kubernetes metadata enricher: %s", err) return &nilEnricher{} } - metaGen := kubernetes.NewMetaGeneratorFromConfig(&metaConfig) + cfg, _ := common.NewConfigFrom(&metaConfig) + + metaGen := metadata.NewPodMetadataGenerator(cfg, nil, nil, nil) enricher := buildMetadataEnricher(watcher, // update func(m map[string]common.MapStr, r kubernetes.Resource) { pod := r.(*kubernetes.Pod) - meta := metaGen.PodMetadata(pod) + meta := metaGen.Generate(pod) for _, container := range append(pod.Spec.Containers, pod.Spec.InitContainers...) { cuid := ContainerUID(pod.GetObjectMeta().GetNamespace(), pod.GetObjectMeta().GetName(), container.Name) diff --git a/metricbeat/module/kubernetes/util/kubernetes_test.go b/metricbeat/module/kubernetes/util/kubernetes_test.go index b4a6e6af76a..210baa3801c 100644 --- a/metricbeat/module/kubernetes/util/kubernetes_test.go +++ b/metricbeat/module/kubernetes/util/kubernetes_test.go @@ -20,6 +20,8 @@ package util import ( "testing" + "k8s.io/client-go/tools/cache" + "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" @@ -147,6 +149,11 @@ func (m *mockWatcher) Start() error { func (m *mockWatcher) Stop() { } + func (m *mockWatcher) AddEventHandler(r kubernetes.ResourceEventHandler) { m.handler = r } + +func (m *mockWatcher) Store() cache.Store { + return nil +}