From 49438fe2f7098e228e72b87ae349c915aac1df3d Mon Sep 17 00:00:00 2001 From: xu0o0 Date: Tue, 31 Oct 2023 11:41:00 +0800 Subject: [PATCH] [processor/k8sattribute] Support adding labels and annotations from node (#28570) **Description:** support adding labels and annotations from the node as additional resource attributes on telemetry processed through the `k8sattributes` processor. **Link to tracking Issue:** Resolve #22620 --------- Co-authored-by: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> --- .chloggen/feat-22620.yaml | 27 +++ .../workflows/configs/e2e-kind-config.yaml | 5 + processor/k8sattributesprocessor/README.md | 17 +- .../k8sattributesprocessor/client_test.go | 8 + processor/k8sattributesprocessor/config.go | 6 +- processor/k8sattributesprocessor/e2e_test.go | 9 + .../internal/kube/client.go | 109 ++++++++++++ .../internal/kube/client_test.go | 165 ++++++++++++++++++ .../internal/kube/informer.go | 28 +++ .../internal/kube/kube.go | 22 ++- .../internal/observability/observability.go | 42 +++++ .../observability/observability_test.go | 12 ++ processor/k8sattributesprocessor/options.go | 6 +- .../k8sattributesprocessor/options_test.go | 74 +++++++- processor/k8sattributesprocessor/processor.go | 18 ++ .../testdata/e2e/collector/clusterrole.yaml | 4 +- .../testdata/e2e/collector/configmap.yaml | 2 + 17 files changed, 536 insertions(+), 18 deletions(-) create mode 100755 .chloggen/feat-22620.yaml diff --git a/.chloggen/feat-22620.yaml b/.chloggen/feat-22620.yaml new file mode 100755 index 000000000000..fa61d10709f2 --- /dev/null +++ b/.chloggen/feat-22620.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/k8sattribute + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: support adding labels and annotations from node + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [22620] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/.github/workflows/configs/e2e-kind-config.yaml b/.github/workflows/configs/e2e-kind-config.yaml index a20a0dcb0813..21eecccabfc1 100644 --- a/.github/workflows/configs/e2e-kind-config.yaml +++ b/.github/workflows/configs/e2e-kind-config.yaml @@ -4,3 +4,8 @@ kubeadmConfigPatches: - | kind: KubeletConfiguration serverTLSBootstrap: true +nodes: + - role: control-plane + labels: + # used in k8sattributesprocessor e2e test + foo: too diff --git a/processor/k8sattributesprocessor/README.md b/processor/k8sattributesprocessor/README.md index 7193bd6d38f1..1b6b79311b60 100644 --- a/processor/k8sattributesprocessor/README.md +++ b/processor/k8sattributesprocessor/README.md @@ -88,12 +88,12 @@ Additional container level attributes can be extracted provided that certain res instance. If it's not set, the latest container instance will be used: - container.id (not added by default, has to be specified in `metadata`) -The k8sattributesprocessor can also set resource attributes from k8s labels and annotations of pods and namespaces. -The config for associating the data passing through the processor (spans, metrics and logs) with specific Pod/Namespace annotations/labels is configured via "annotations" and "labels" keys. -This config represents a list of annotations/labels that are extracted from pods/namespaces and added to spans, metrics and logs. +The k8sattributesprocessor can also set resource attributes from k8s labels and annotations of pods, namespaces and nodes. +The config for associating the data passing through the processor (spans, metrics and logs) with specific Pod/Namespace/Node annotations/labels is configured via "annotations" and "labels" keys. +This config represents a list of annotations/labels that are extracted from pods/namespaces/nodes and added to spans, metrics and logs. Each item is specified as a config of tag_name (representing the tag name to tag the spans with), key (representing the key used to extract value) and from (representing the kubernetes object used to extract the value). -The "from" field has only two possible values "pod" and "namespace" and defaults to "pod" if none is specified. +The "from" field has only three possible values "pod", "namespace" and "node" and defaults to "pod" if none is specified. A few examples to use this config are as follows: @@ -106,6 +106,10 @@ annotations: key: annotation-two regex: field=(?P.+) from: namespace + - tag_name: a3 # extracts value of annotation from nodes with key `annotation-three` with regexp and inserts it as a tag with key `a3` + key: annotation-three + regex: field=(?P.+) + from: node labels: - tag_name: l1 # extracts value of label from namespaces with key `label1` and inserts it as a tag with key `l1` @@ -115,6 +119,9 @@ labels: key: label2 regex: field=(?P.+) from: pod + - tag_name: l3 # extracts value of label from nodes with key `label3` and inserts it as a tag with key `l3` + key: label3 + from: node ``` ### Config example @@ -147,7 +154,7 @@ k8sattributes/2: ## Role-based access control -The k8sattributesprocessor needs `get`, `watch` and `list` permissions on both `pods` and `namespaces` resources, for all namespaces and pods included in the configured filters. Additionally, when using `k8s.deployment.uid` or `k8s.deployment.name` the processor also needs `get`, `watch` and `list` permissions for `replicaset` resources. +The k8sattributesprocessor needs `get`, `watch` and `list` permissions on both `pods` and `namespaces` resources, for all namespaces and pods included in the configured filters. Additionally, when using `k8s.deployment.uid` or `k8s.deployment.name` the processor also needs `get`, `watch` and `list` permissions for `replicaset` resources. When extracting metadatas from `node`, the processor needs `get`, `watch` and `list` permissions for `node` resources. Here is an example of a `ClusterRole` to give a `ServiceAccount` the necessary permissions for all pods and namespaces in the cluster (replace `` with a namespace where collector is deployed): diff --git a/processor/k8sattributesprocessor/client_test.go b/processor/k8sattributesprocessor/client_test.go index 17300db25833..0ab0d830d28f 100644 --- a/processor/k8sattributesprocessor/client_test.go +++ b/processor/k8sattributesprocessor/client_test.go @@ -23,7 +23,9 @@ type fakeClient struct { Informer cache.SharedInformer NamespaceInformer cache.SharedInformer ReplicaSetInformer cache.SharedInformer + NodeInformer cache.SharedInformer Namespaces map[string]*kube.Namespace + Nodes map[string]*kube.Node StopCh chan struct{} } @@ -44,6 +46,7 @@ func newFakeClient(_ *zap.Logger, _ k8sconfig.APIConfig, rules kube.ExtractionRu Associations: associations, Informer: kube.NewFakeInformer(cs, "", ls, fs), NamespaceInformer: kube.NewFakeInformer(cs, "", ls, fs), + NodeInformer: kube.NewFakeInformer(cs, "", ls, fs), ReplicaSetInformer: kube.NewFakeInformer(cs, "", ls, fs), StopCh: make(chan struct{}), }, nil @@ -61,6 +64,11 @@ func (f *fakeClient) GetNamespace(namespace string) (*kube.Namespace, bool) { return ns, ok } +func (f *fakeClient) GetNode(nodeName string) (*kube.Node, bool) { + node, ok := f.Nodes[nodeName] + return node, ok +} + // Start is a noop for FakeClient. func (f *fakeClient) Start() { if f.Informer != nil { diff --git a/processor/k8sattributesprocessor/config.go b/processor/k8sattributesprocessor/config.go index 5a2b16656756..2361d3280cfa 100644 --- a/processor/k8sattributesprocessor/config.go +++ b/processor/k8sattributesprocessor/config.go @@ -57,9 +57,9 @@ func (cfg *Config) Validate() error { } switch f.From { - case "", kube.MetadataFromPod, kube.MetadataFromNamespace: + case "", kube.MetadataFromPod, kube.MetadataFromNamespace, kube.MetadataFromNode: default: - return fmt.Errorf("%s is not a valid choice for From. Must be one of: pod, namespace", f.From) + return fmt.Errorf("%s is not a valid choice for From. Must be one of: pod, namespace, node", f.From) } if f.Regex != "" { @@ -117,7 +117,7 @@ func (cfg *Config) Validate() error { // ExtractConfig section allows specifying extraction rules to extract // data from k8s pod specs. type ExtractConfig struct { - // Metadata allows to extract pod/namespace metadata from a list of metadata fields. + // Metadata allows to extract pod/namespace/node metadata from a list of metadata fields. // The field accepts a list of strings. // // Metadata fields supported right now are, diff --git a/processor/k8sattributesprocessor/e2e_test.go b/processor/k8sattributesprocessor/e2e_test.go index 11c7479f6e37..962ee19583a0 100644 --- a/processor/k8sattributesprocessor/e2e_test.go +++ b/processor/k8sattributesprocessor/e2e_test.go @@ -107,6 +107,7 @@ func TestE2E(t *testing.T) { "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), "container.image.tag": newExpectedValue(equal, "latest"), "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), }, }, { @@ -129,6 +130,7 @@ func TestE2E(t *testing.T) { "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), "container.image.tag": newExpectedValue(equal, "latest"), "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), }, }, { @@ -175,6 +177,7 @@ func TestE2E(t *testing.T) { "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), "container.image.tag": newExpectedValue(equal, "latest"), "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), }, }, { @@ -197,6 +200,7 @@ func TestE2E(t *testing.T) { "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), "container.image.tag": newExpectedValue(equal, "latest"), "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), }, }, { @@ -219,6 +223,7 @@ func TestE2E(t *testing.T) { "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), "container.image.tag": newExpectedValue(equal, "latest"), "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), }, }, { @@ -265,6 +270,7 @@ func TestE2E(t *testing.T) { "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), "container.image.tag": newExpectedValue(equal, "latest"), "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), }, }, { @@ -287,6 +293,7 @@ func TestE2E(t *testing.T) { "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), "container.image.tag": newExpectedValue(equal, "latest"), "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), }, }, { @@ -333,6 +340,7 @@ func TestE2E(t *testing.T) { "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), "container.image.tag": newExpectedValue(equal, "latest"), "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), }, }, { @@ -355,6 +363,7 @@ func TestE2E(t *testing.T) { "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), "container.image.tag": newExpectedValue(equal, "latest"), "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), }, }, } diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index 47ab6255848e..70e46ed4049a 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -42,6 +42,7 @@ type WatchClient struct { kc kubernetes.Interface informer cache.SharedInformer namespaceInformer cache.SharedInformer + nodeInformer cache.SharedInformer replicasetInformer cache.SharedInformer replicasetRegex *regexp.Regexp cronJobRegex *regexp.Regexp @@ -60,6 +61,10 @@ type WatchClient struct { // Key is namespace name Namespaces map[string]*Namespace + // A map containing Node related data, used to associate them with resources. + // Key is node name + Nodes map[string]*Node + // A map containing ReplicaSets related data, used to associate them with resources. // Key is replicaset uid ReplicaSets map[string]*ReplicaSet @@ -89,6 +94,7 @@ func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules, c.Pods = map[PodIdentifier]*Pod{} c.Namespaces = map[string]*Namespace{} + c.Nodes = map[string]*Node{} c.ReplicaSets = map[string]*ReplicaSet{} if newClientSet == nil { newClientSet = k8sconfig.MakeClient @@ -162,6 +168,10 @@ func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules, } } + if c.extractNodeLabelsAnnotations() { + c.nodeInformer = newNodeSharedInformer(c.kc, c.Filters.Node) + } + return c, err } @@ -198,6 +208,18 @@ func (c *WatchClient) Start() { } go c.replicasetInformer.Run(c.stopCh) } + + if c.nodeInformer != nil { + _, err = c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.handleNodeAdd, + UpdateFunc: c.handleNodeUpdate, + DeleteFunc: c.handleNodeDelete, + }) + if err != nil { + c.logger.Error("error adding event handler to node informer", zap.Error(err)) + } + go c.nodeInformer.Run(c.stopCh) + } } // Stop signals the the k8s watcher/informer to stop watching for new events. @@ -273,6 +295,37 @@ func (c *WatchClient) handleNamespaceDelete(obj interface{}) { } } +func (c *WatchClient) handleNodeAdd(obj interface{}) { + observability.RecordNodeAdded() + if node, ok := obj.(*api_v1.Node); ok { + c.addOrUpdateNode(node) + } else { + c.logger.Error("object received was not of type api_v1.Node", zap.Any("received", obj)) + } +} + +func (c *WatchClient) handleNodeUpdate(_, newNode interface{}) { + observability.RecordNodeUpdated() + if node, ok := newNode.(*api_v1.Node); ok { + c.addOrUpdateNode(node) + } else { + c.logger.Error("object received was not of type api_v1.Node", zap.Any("received", newNode)) + } +} + +func (c *WatchClient) handleNodeDelete(obj interface{}) { + observability.RecordNodeDeleted() + if node, ok := ignoreDeletedFinalStateUnknown(obj).(*api_v1.Node); ok { + c.m.Lock() + if n, ok := c.Nodes[node.Name]; ok { + delete(c.Nodes, n.Name) + } + c.m.Unlock() + } else { + c.logger.Error("object received was not of type api_v1.Node", zap.Any("received", obj)) + } +} + func (c *WatchClient) deleteLoop(interval time.Duration, gracePeriod time.Duration) { // This loop runs after N seconds and deletes pods from cache. // It iterates over the delete queue and deletes all that aren't @@ -339,6 +392,17 @@ func (c *WatchClient) GetNamespace(namespace string) (*Namespace, bool) { return nil, false } +// GetNode takes a node name and returns the node object the node name is associated with. +func (c *WatchClient) GetNode(nodeName string) (*Node, bool) { + c.m.RLock() + node, ok := c.Nodes[nodeName] + c.m.RUnlock() + if ok { + return node, ok + } + return nil, false +} + func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string { tags := map[string]string{} if c.Rules.PodName { @@ -614,10 +678,25 @@ func (c *WatchClient) extractNamespaceAttributes(namespace *api_v1.Namespace) ma return tags } +func (c *WatchClient) extractNodeAttributes(node *api_v1.Node) map[string]string { + tags := map[string]string{} + + for _, r := range c.Rules.Labels { + r.extractFromNodeMetadata(node.Labels, tags, "k8s.node.labels.%s") + } + + for _, r := range c.Rules.Annotations { + r.extractFromNodeMetadata(node.Annotations, tags, "k8s.node.annotations.%s") + } + + return tags +} + func (c *WatchClient) podFromAPI(pod *api_v1.Pod) *Pod { newPod := &Pod{ Name: pod.Name, Namespace: pod.GetNamespace(), + NodeName: pod.Spec.NodeName, Address: pod.Status.PodIP, HostNetwork: pod.Spec.HostNetwork, PodUID: string(pod.UID), @@ -832,6 +911,36 @@ func (c *WatchClient) extractNamespaceLabelsAnnotations() bool { return false } +func (c *WatchClient) extractNodeLabelsAnnotations() bool { + for _, r := range c.Rules.Labels { + if r.From == MetadataFromNode { + return true + } + } + + for _, r := range c.Rules.Annotations { + if r.From == MetadataFromNode { + return true + } + } + + return false +} + +func (c *WatchClient) addOrUpdateNode(node *api_v1.Node) { + newNode := &Node{ + Name: node.Name, + NodeUID: string(node.UID), + } + newNode.Attributes = c.extractNodeAttributes(node) + + c.m.Lock() + if node.Name != "" { + c.Nodes[node.Name] = newNode + } + c.m.Unlock() +} + func needContainerAttributes(rules ExtractionRules) bool { return rules.ContainerImageName || rules.ContainerName || diff --git a/processor/k8sattributesprocessor/internal/kube/client_test.go b/processor/k8sattributesprocessor/internal/kube/client_test.go index 480dea7c5903..38feedde29e4 100644 --- a/processor/k8sattributesprocessor/internal/kube/client_test.go +++ b/processor/k8sattributesprocessor/internal/kube/client_test.go @@ -116,6 +116,33 @@ func namespaceAddAndUpdateTest(t *testing.T, c *WatchClient, handler func(obj in assert.Equal(t, "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", got.NamespaceUID) } +func nodeAddAndUpdateTest(t *testing.T, c *WatchClient, handler func(obj interface{})) { + assert.Equal(t, 0, len(c.Nodes)) + + node := &api_v1.Node{} + handler(node) + assert.Equal(t, 0, len(c.Nodes)) + + node = &api_v1.Node{} + node.Name = "nodeA" + handler(node) + assert.Equal(t, 1, len(c.Nodes)) + got, ok := c.GetNode("nodeA") + assert.True(t, ok) + assert.Equal(t, "nodeA", got.Name) + assert.Equal(t, "", got.NodeUID) + + node = &api_v1.Node{} + node.Name = "nodeB" + node.UID = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" + handler(node) + assert.Equal(t, 2, len(c.Nodes)) + got, ok = c.GetNode("nodeB") + assert.True(t, ok) + assert.Equal(t, "nodeB", got.Name) + assert.Equal(t, "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", got.NodeUID) +} + func TestDefaultClientset(t *testing.T) { c, err := New(zap.NewNop(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, nil, nil, nil, nil) assert.Error(t, err) @@ -193,6 +220,11 @@ func TestNamespaceAdd(t *testing.T) { namespaceAddAndUpdateTest(t, c, c.handleNamespaceAdd) } +func TestNodeAdd(t *testing.T) { + c, _ := newTestClient(t) + nodeAddAndUpdateTest(t, c, c.handleNodeAdd) +} + func TestReplicaSetHandler(t *testing.T) { c, _ := newTestClient(t) assert.Equal(t, len(c.ReplicaSets), 0) @@ -392,6 +424,14 @@ func TestNamespaceUpdate(t *testing.T) { }) } +func TestNodeUpdate(t *testing.T) { + c, _ := newTestClient(t) + nodeAddAndUpdateTest(t, c, func(obj interface{}) { + // first argument (old node) is not used right now + c.handleNodeUpdate(&api_v1.Node{}, obj) + }) +} + func TestPodDelete(t *testing.T) { c, _ := newTestClient(t) podAddAndUpdateTest(t, c, c.handlePodAdd) @@ -493,6 +533,41 @@ func TestNamespaceDelete(t *testing.T) { assert.Equal(t, 0, len(c.Namespaces)) } +func TestNodeDelete(t *testing.T) { + c, _ := newTestClient(t) + nodeAddAndUpdateTest(t, c, c.handleNodeAdd) + assert.Equal(t, 2, len(c.Nodes)) + assert.Equal(t, "nodeA", c.Nodes["nodeA"].Name) + + // delete empty node + c.handleNodeDelete(&api_v1.Node{}) + + // delete non-existent node + node := &api_v1.Node{} + node.Name = "nodeC" + c.handleNodeDelete(node) + assert.Equal(t, 2, len(c.Nodes)) + got := c.Nodes["nodeA"] + assert.Equal(t, "nodeA", got.Name) + // delete non-existent namespace when DeletedFinalStateUnknown + c.handleNodeDelete(cache.DeletedFinalStateUnknown{Obj: node}) + assert.Equal(t, 2, len(c.Nodes)) + got = c.Nodes["nodeA"] + assert.Equal(t, "nodeA", got.Name) + + // delete node A + node.Name = "nodeA" + c.handleNodeDelete(node) + assert.Equal(t, 1, len(c.Nodes)) + got = c.Nodes["nodeB"] + assert.Equal(t, "nodeB", got.Name) + + // delete node B when DeletedFinalStateUnknown + node.Name = "nodeB" + c.handleNodeDelete(cache.DeletedFinalStateUnknown{Obj: node}) + assert.Equal(t, 0, len(c.Nodes)) +} + func TestDeleteQueue(t *testing.T) { c, _ := newTestClient(t) podAddAndUpdateTest(t, c, c.handlePodAdd) @@ -1270,6 +1345,96 @@ func TestNamespaceExtractionRules(t *testing.T) { } } +func TestNodeExtractionRules(t *testing.T) { + c, _ := newTestClientWithRulesAndFilters(t, Filters{}) + + node := &api_v1.Node{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: "k8s-node-example", + UID: "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + CreationTimestamp: meta_v1.Now(), + Labels: map[string]string{ + "label1": "lv1", + }, + Annotations: map[string]string{ + "annotation1": "av1", + }, + }, + } + + testCases := []struct { + name string + rules ExtractionRules + attributes map[string]string + }{{ + name: "no-rules", + rules: ExtractionRules{}, + attributes: nil, + }, { + name: "labels", + rules: ExtractionRules{ + Annotations: []FieldExtractionRule{{ + Name: "a1", + Key: "annotation1", + From: MetadataFromNode, + }, + }, + Labels: []FieldExtractionRule{{ + Name: "l1", + Key: "label1", + From: MetadataFromNode, + }, + }, + }, + attributes: map[string]string{ + "l1": "lv1", + "a1": "av1", + }, + }, + { + name: "all-labels", + rules: ExtractionRules{ + Labels: []FieldExtractionRule{{ + KeyRegex: regexp.MustCompile("^(?:la.*)$"), + From: MetadataFromNode, + }, + }, + }, + attributes: map[string]string{ + "k8s.node.labels.label1": "lv1", + }, + }, + { + name: "all-annotations", + rules: ExtractionRules{ + Annotations: []FieldExtractionRule{{ + KeyRegex: regexp.MustCompile("^(?:an.*)$"), + From: MetadataFromNode, + }, + }, + }, + attributes: map[string]string{ + "k8s.node.annotations.annotation1": "av1", + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + c.Rules = tc.rules + c.handleNodeAdd(node) + n, ok := c.GetNode(node.Name) + require.True(t, ok) + + assert.Equal(t, len(tc.attributes), len(n.Attributes)) + for k, v := range tc.attributes { + got, ok := n.Attributes[k] + assert.True(t, ok) + assert.Equal(t, v, got) + } + }) + } +} + func TestFilters(t *testing.T) { testCases := []struct { name string diff --git a/processor/k8sattributesprocessor/internal/kube/informer.go b/processor/k8sattributesprocessor/internal/kube/informer.go index db5cf220851c..73b355f491b3 100644 --- a/processor/k8sattributesprocessor/internal/kube/informer.go +++ b/processor/k8sattributesprocessor/internal/kube/informer.go @@ -34,6 +34,34 @@ type InformerProviderNamespace func( client kubernetes.Interface, ) cache.SharedInformer +// InformerProviderNode defines a function type that returns a new SharedInformer. It is used to +// allow passing custom shared informers to the watch client for fetching node objects. +type InformerProviderNode func( + client kubernetes.Interface, +) cache.SharedInformer + +func newNodeSharedInformer(client kubernetes.Interface, nodeName string) cache.SharedInformer { + informer := cache.NewSharedInformer( + &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + if nodeName != "" { + opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", nodeName).String() + } + return client.CoreV1().Nodes().List(context.Background(), opts) + }, + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + if nodeName != "" { + opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", nodeName).String() + } + return client.CoreV1().Nodes().Watch(context.Background(), opts) + }, + }, + &api_v1.Node{}, + watchSyncPeriod, + ) + return informer +} + // InformerProviderReplicaSet defines a function type that returns a new SharedInformer. It is used to // allow passing custom shared informers to the watch client. type InformerProviderReplicaSet func( diff --git a/processor/k8sattributesprocessor/internal/kube/kube.go b/processor/k8sattributesprocessor/internal/kube/kube.go index 6b046b599536..47e95e98da33 100644 --- a/processor/k8sattributesprocessor/internal/kube/kube.go +++ b/processor/k8sattributesprocessor/internal/kube/kube.go @@ -26,7 +26,9 @@ const ( // MetadataFromPod is used to specify to extract metadata/labels/annotations from pod MetadataFromPod = "pod" // MetadataFromNamespace is used to specify to extract metadata/labels/annotations from namespace - MetadataFromNamespace = "namespace" + MetadataFromNamespace = "namespace" + // MetadataFromNode is used to specify to extract metadata/labels/annotations from node + MetadataFromNode = "node" PodIdentifierMaxLength = 4 ResourceSource = "resource_attribute" @@ -88,6 +90,7 @@ var ( type Client interface { GetPod(PodIdentifier) (*Pod, bool) GetNamespace(string) (*Namespace, bool) + GetNode(string) (*Node, bool) Start() Stop() } @@ -108,6 +111,7 @@ type Pod struct { StartTime *metav1.Time Ignore bool Namespace string + NodeName string HostNetwork bool // Containers specifies all containers in this pod. @@ -148,6 +152,13 @@ type Namespace struct { DeletedAt time.Time } +// Node represents a kubernetes node. +type Node struct { + Name string + NodeUID string + Attributes map[string]string +} + type deleteRequest struct { // id is identifier (IP address or Pod UID) of pod to remove from pods map id PodIdentifier @@ -247,9 +258,10 @@ type FieldExtractionRule struct { // Full value is extracted when no regexp is provided. Regex *regexp.Regexp // From determines the kubernetes object the field should be retrieved from. - // Currently only two values are supported, + // Currently only three values are supported, // - pod // - namespace + // - node From string } @@ -266,6 +278,12 @@ func (r *FieldExtractionRule) extractFromNamespaceMetadata(metadata map[string]s } } +func (r *FieldExtractionRule) extractFromNodeMetadata(metadata map[string]string, tags map[string]string, formatter string) { + if r.From == MetadataFromNode { + r.extractFromMetadata(metadata, tags, formatter) + } +} + func (r *FieldExtractionRule) extractFromMetadata(metadata map[string]string, tags map[string]string, formatter string) { if r.KeyRegex != nil { for k, v := range metadata { diff --git a/processor/k8sattributesprocessor/internal/observability/observability.go b/processor/k8sattributesprocessor/internal/observability/observability.go index 1bb486788b43..749b801f03f4 100644 --- a/processor/k8sattributesprocessor/internal/observability/observability.go +++ b/processor/k8sattributesprocessor/internal/observability/observability.go @@ -23,6 +23,9 @@ func init() { viewNamespacesAdded, viewNamespacesUpdated, viewNamespacesDeleted, + viewNodesAdded, + viewNodesUpdated, + viewNodesDeleted, ) } @@ -35,6 +38,9 @@ var ( mNamespacesUpdated = stats.Int64("otelsvc/k8s/namespace_updated", "Number of namespace update events received", "1") mNamespacesAdded = stats.Int64("otelsvc/k8s/namespace_added", "Number of namespace add events received", "1") mNamespacesDeleted = stats.Int64("otelsvc/k8s/namespace_deleted", "Number of namespace delete events received", "1") + mNodesUpdated = stats.Int64("otelsvc/k8s/node_updated", "Number of node update events received", "1") + mNodesAdded = stats.Int64("otelsvc/k8s/node_added", "Number of node add events received", "1") + mNodesDeleted = stats.Int64("otelsvc/k8s/node_deleted", "Number of node delete events received", "1") mReplicaSetsUpdated = stats.Int64("otelsvc/k8s/replicaset_updated", "Number of ReplicaSet update events received", "1") mReplicaSetsAdded = stats.Int64("otelsvc/k8s/replicaset_added", "Number of ReplicaSet add events received", "1") mReplicaSetsDeleted = stats.Int64("otelsvc/k8s/replicaset_deleted", "Number of ReplicaSet delete events received", "1") @@ -96,6 +102,27 @@ var viewNamespacesDeleted = &view.View{ Aggregation: view.Sum(), } +var viewNodesUpdated = &view.View{ + Name: mNodesUpdated.Name(), + Description: mNodesUpdated.Description(), + Measure: mNodesUpdated, + Aggregation: view.Sum(), +} + +var viewNodesAdded = &view.View{ + Name: mNodesAdded.Name(), + Description: mNodesAdded.Description(), + Measure: mNodesAdded, + Aggregation: view.Sum(), +} + +var viewNodesDeleted = &view.View{ + Name: mNodesDeleted.Name(), + Description: mNodesDeleted.Description(), + Measure: mNodesDeleted, + Aggregation: view.Sum(), +} + // RecordPodUpdated increments the metric that records pod update events received. func RecordPodUpdated() { stats.Record(context.Background(), mPodsUpdated.M(int64(1))) @@ -136,6 +163,21 @@ func RecordNamespaceDeleted() { stats.Record(context.Background(), mNamespacesDeleted.M(int64(1))) } +// RecordNodeUpdated increments the metric that records node update events received. +func RecordNodeUpdated() { + stats.Record(context.Background(), mNodesUpdated.M(int64(1))) +} + +// RecordNodeAdded increments the metric that records node add events receiver. +func RecordNodeAdded() { + stats.Record(context.Background(), mNodesAdded.M(int64(1))) +} + +// RecordNodeDeleted increments the metric that records node events deleted. +func RecordNodeDeleted() { + stats.Record(context.Background(), mNodesDeleted.M(int64(1))) +} + // RecordReplicaSetUpdated increments the metric that records ReplicaSet update events received. func RecordReplicaSetUpdated() { stats.Record(context.Background(), mReplicaSetsUpdated.M(int64(1))) diff --git a/processor/k8sattributesprocessor/internal/observability/observability_test.go b/processor/k8sattributesprocessor/internal/observability/observability_test.go index ff8d26e29288..70567a7b9827 100644 --- a/processor/k8sattributesprocessor/internal/observability/observability_test.go +++ b/processor/k8sattributesprocessor/internal/observability/observability_test.go @@ -88,6 +88,18 @@ func TestMetrics(t *testing.T) { "otelsvc/k8s/namespace_deleted", RecordNamespaceDeleted, }, + { + "otelsvc/k8s/node_added", + RecordNodeAdded, + }, + { + "otelsvc/k8s/node_updated", + RecordNodeUpdated, + }, + { + "otelsvc/k8s/node_deleted", + RecordNodeDeleted, + }, } var ( diff --git a/processor/k8sattributesprocessor/options.go b/processor/k8sattributesprocessor/options.go index 0a9b00a4512e..ab4b4695ab39 100644 --- a/processor/k8sattributesprocessor/options.go +++ b/processor/k8sattributesprocessor/options.go @@ -212,11 +212,7 @@ func extractFieldRules(fieldType string, fields ...FieldExtractConfig) ([]kube.F if name == "" && a.Key != "" { // name for KeyRegex case is set at extraction time/runtime, skipped here - if a.From == kube.MetadataFromPod { - name = fmt.Sprintf("k8s.pod.%s.%s", fieldType, a.Key) - } else if a.From == kube.MetadataFromNamespace { - name = fmt.Sprintf("k8s.namespace.%s.%s", fieldType, a.Key) - } + name = fmt.Sprintf("k8s.%v.%v.%v", a.From, fieldType, a.Key) } var r *regexp.Regexp diff --git a/processor/k8sattributesprocessor/options_test.go b/processor/k8sattributesprocessor/options_test.go index 40460bb738e9..a87874597c7c 100644 --- a/processor/k8sattributesprocessor/options_test.go +++ b/processor/k8sattributesprocessor/options_test.go @@ -122,6 +122,24 @@ func TestWithExtractAnnotations(t *testing.T) { }, "", }, + { + "basic-node", + []FieldExtractConfig{ + { + TagName: "tag1", + Key: "key1", + From: kube.MetadataFromNode, + }, + }, + []kube.FieldExtractionRule{ + { + Name: "tag1", + Key: "key1", + From: kube.MetadataFromNode, + }, + }, + "", + }, { "basic-pod-keyregex", []FieldExtractConfig{ @@ -158,6 +176,24 @@ func TestWithExtractAnnotations(t *testing.T) { }, "", }, + { + "basic-node-keyregex", + []FieldExtractConfig{ + { + TagName: "tag1", + KeyRegex: "key*", + From: kube.MetadataFromNode, + }, + }, + []kube.FieldExtractionRule{ + { + Name: "tag1", + KeyRegex: regexp.MustCompile("^(?:key*)$"), + From: kube.MetadataFromNode, + }, + }, + "", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -228,6 +264,24 @@ func TestWithExtractLabels(t *testing.T) { }, "", }, + { + "basic-node", + []FieldExtractConfig{ + { + TagName: "tag1", + Key: "key1", + From: kube.MetadataFromNode, + }, + }, + []kube.FieldExtractionRule{ + { + Name: "tag1", + Key: "key1", + From: kube.MetadataFromNode, + }, + }, + "", + }, { "basic-pod-keyregex", []FieldExtractConfig{ @@ -247,7 +301,7 @@ func TestWithExtractLabels(t *testing.T) { "", }, { - "basic-namespace", + "basic-namespace-keyregex", []FieldExtractConfig{ { TagName: "tag1", @@ -264,6 +318,24 @@ func TestWithExtractLabels(t *testing.T) { }, "", }, + { + "basic-node-keyregex", + []FieldExtractConfig{ + { + TagName: "tag1", + KeyRegex: "key*", + From: kube.MetadataFromNode, + }, + }, + []kube.FieldExtractionRule{ + { + Name: "tag1", + KeyRegex: regexp.MustCompile("^(?:key*)$"), + From: kube.MetadataFromNode, + }, + }, + "", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/processor/k8sattributesprocessor/processor.go b/processor/k8sattributesprocessor/processor.go index f5cc5e3d3b4d..ce0d095e92ab 100644 --- a/processor/k8sattributesprocessor/processor.go +++ b/processor/k8sattributesprocessor/processor.go @@ -141,6 +141,16 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pco } } } + + nodeName := stringAttributeFromMap(resource.Attributes(), conventions.AttributeK8SNodeName) + if nodeName != "" { + attrsToAdd := kp.getAttributesForPodsNode(nodeName) + for key, val := range attrsToAdd { + if _, found := resource.Attributes().Get(key); !found { + resource.Attributes().PutStr(key, val) + } + } + } } // addContainerAttributes looks if pod has any container identifiers and adds additional container attributes @@ -215,6 +225,14 @@ func (kp *kubernetesprocessor) getAttributesForPodsNamespace(namespace string) m return ns.Attributes } +func (kp *kubernetesprocessor) getAttributesForPodsNode(nodeName string) map[string]string { + node, ok := kp.kc.GetNode(nodeName) + if !ok { + return nil + } + return node.Attributes +} + // intFromAttribute extracts int value from an attribute stored as string or int func intFromAttribute(val pcommon.Value) (int, error) { switch val.Type() { diff --git a/processor/k8sattributesprocessor/testdata/e2e/collector/clusterrole.yaml b/processor/k8sattributesprocessor/testdata/e2e/collector/clusterrole.yaml index 097362a941bf..5bc4bfb348ab 100644 --- a/processor/k8sattributesprocessor/testdata/e2e/collector/clusterrole.yaml +++ b/processor/k8sattributesprocessor/testdata/e2e/collector/clusterrole.yaml @@ -4,8 +4,8 @@ metadata: name: {{ .Name }} rules: - apiGroups: [""] - resources: ["pods", "namespaces"] + resources: ["pods", "namespaces", "nodes"] verbs: ["get", "watch", "list"] - apiGroups: ["apps"] resources: ["replicasets"] - verbs: ["get", "watch", "list"] \ No newline at end of file + verbs: ["get", "watch", "list"] diff --git a/processor/k8sattributesprocessor/testdata/e2e/collector/configmap.yaml b/processor/k8sattributesprocessor/testdata/e2e/collector/configmap.yaml index c8347529582e..e82cead3fdec 100644 --- a/processor/k8sattributesprocessor/testdata/e2e/collector/configmap.yaml +++ b/processor/k8sattributesprocessor/testdata/e2e/collector/configmap.yaml @@ -23,6 +23,8 @@ data: - from: pod key: app tag_name: k8s.labels.app + - from: node + key: foo metadata: - k8s.pod.name - k8s.pod.start_time