Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/k8sattribute] support attribute k8s.deployment.uid #14003

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/k8sattributeprocessor-support-deployment-uid.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sattributesprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Support adding attribute `k8s.deployment.uid`."

# One or more tracking issues related to the change
issues: [14003]

fatsheep9146 marked this conversation as resolved.
Show resolved Hide resolved
# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
34 changes: 18 additions & 16 deletions processor/k8sattributesprocessor/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ import (

// fakeClient is used as a replacement for WatchClient in test cases.
type fakeClient struct {
Pods map[kube.PodIdentifier]*kube.Pod
Rules kube.ExtractionRules
Filters kube.Filters
Associations []kube.Association
Informer cache.SharedInformer
NamespaceInformer cache.SharedInformer
Namespaces map[string]*kube.Namespace
StopCh chan struct{}
Pods map[kube.PodIdentifier]*kube.Pod
Rules kube.ExtractionRules
Filters kube.Filters
Associations []kube.Association
Informer cache.SharedInformer
NamespaceInformer cache.SharedInformer
ReplicaSetInformer cache.SharedInformer
Namespaces map[string]*kube.Namespace
StopCh chan struct{}
}

func selectors() (labels.Selector, fields.Selector) {
Expand All @@ -43,18 +44,19 @@ func selectors() (labels.Selector, fields.Selector) {
}

// newFakeClient instantiates a new FakeClient object and satisfies the ClientProvider type
func newFakeClient(_ *zap.Logger, apiCfg k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, exclude kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace) (kube.Client, error) {
func newFakeClient(_ *zap.Logger, apiCfg k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, exclude kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet) (kube.Client, error) {
cs := fake.NewSimpleClientset()

ls, fs := selectors()
return &fakeClient{
Pods: map[kube.PodIdentifier]*kube.Pod{},
Rules: rules,
Filters: filters,
Associations: associations,
Informer: kube.NewFakeInformer(cs, "", ls, fs),
NamespaceInformer: kube.NewFakeInformer(cs, "", ls, fs),
StopCh: make(chan struct{}),
Pods: map[kube.PodIdentifier]*kube.Pod{},
Rules: rules,
Filters: filters,
Associations: associations,
Informer: kube.NewFakeInformer(cs, "", ls, fs),
NamespaceInformer: kube.NewFakeInformer(cs, "", ls, fs),
ReplicaSetInformer: kube.NewFakeInformer(cs, "", ls, fs),
StopCh: make(chan struct{}),
}, nil
}

Expand Down
3 changes: 3 additions & 0 deletions processor/k8sattributesprocessor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func TestE2E(t *testing.T) {
"k8s.node.name": newExpectedValue(exist, ""),
"k8s.namespace.name": newExpectedValue(equal, "default"),
"k8s.deployment.name": newExpectedValue(equal, "telemetrygen-"+testID+"-traces-deployment"),
"k8s.deployment.uid": newExpectedValue(exist, ""),
"k8s.replicaset.name": newExpectedValue(regex, "telemetrygen-"+testID+"-traces-deployment-[a-z0-9]*"),
"k8s.replicaset.uid": newExpectedValue(exist, ""),
"k8s.annotations.workload": newExpectedValue(equal, "deployment"),
Expand Down Expand Up @@ -244,6 +245,7 @@ func TestE2E(t *testing.T) {
"k8s.node.name": newExpectedValue(exist, ""),
"k8s.namespace.name": newExpectedValue(equal, "default"),
"k8s.deployment.name": newExpectedValue(equal, "telemetrygen-"+testID+"-metrics-deployment"),
"k8s.deployment.uid": newExpectedValue(exist, ""),
"k8s.replicaset.name": newExpectedValue(regex, "telemetrygen-"+testID+"-metrics-deployment-[a-z0-9]*"),
"k8s.replicaset.uid": newExpectedValue(exist, ""),
"k8s.annotations.workload": newExpectedValue(equal, "deployment"),
Expand Down Expand Up @@ -329,6 +331,7 @@ func TestE2E(t *testing.T) {
"k8s.node.name": newExpectedValue(exist, ""),
"k8s.namespace.name": newExpectedValue(equal, "default"),
"k8s.deployment.name": newExpectedValue(equal, "telemetrygen-"+testID+"-logs-deployment"),
"k8s.deployment.uid": newExpectedValue(exist, ""),
"k8s.replicaset.name": newExpectedValue(regex, "telemetrygen-"+testID+"-logs-deployment-[a-z0-9]*"),
"k8s.replicaset.uid": newExpectedValue(exist, ""),
"k8s.annotations.workload": newExpectedValue(equal, "deployment"),
Expand Down
133 changes: 116 additions & 17 deletions processor/k8sattributesprocessor/internal/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
"go.uber.org/zap"
apps_v1 "k8s.io/api/apps/v1"
api_v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -36,16 +37,17 @@ import (

// WatchClient is the main interface provided by this package to a kubernetes cluster.
type WatchClient struct {
m sync.RWMutex
deleteMut sync.Mutex
logger *zap.Logger
kc kubernetes.Interface
informer cache.SharedInformer
namespaceInformer cache.SharedInformer
replicasetRegex *regexp.Regexp
cronJobRegex *regexp.Regexp
deleteQueue []deleteRequest
stopCh chan struct{}
m sync.RWMutex
deleteMut sync.Mutex
logger *zap.Logger
kc kubernetes.Interface
informer cache.SharedInformer
namespaceInformer cache.SharedInformer
replicasetInformer cache.SharedInformer
replicasetRegex *regexp.Regexp
cronJobRegex *regexp.Regexp
deleteQueue []deleteRequest
stopCh chan struct{}

// A map containing Pod related data, used to associate them with resources.
// Key can be either an IP address or Pod UID
Expand All @@ -58,6 +60,10 @@ type WatchClient struct {
// A map containing Namespace related data, used to associate them with resources.
// Key is namespace name
Namespaces map[string]*Namespace

// A map containing ReplicaSets related data, used to associate them with resources.
// Key is replicaset uid
ReplicaSets map[string]*ReplicaSet
}

// Extract replicaset name from the pod name. Pod name is created using
Expand All @@ -69,7 +75,7 @@ var rRegex = regexp.MustCompile(`^(.*)-[0-9a-zA-Z]+$`)
var cronJobRegex = regexp.MustCompile(`^(.*)-[0-9]+$`)

// New initializes a new k8s Client.
func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules, filters Filters, associations []Association, exclude Excludes, newClientSet APIClientsetProvider, newInformer InformerProvider, newNamespaceInformer InformerProviderNamespace) (Client, error) {
func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules, filters Filters, associations []Association, exclude Excludes, newClientSet APIClientsetProvider, newInformer InformerProvider, newNamespaceInformer InformerProviderNamespace, newReplicaSetInformer InformerProviderReplicaSet) (Client, error) {
c := &WatchClient{
logger: logger,
Rules: rules,
Expand All @@ -84,6 +90,7 @@ func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules,

c.Pods = map[PodIdentifier]*Pod{}
c.Namespaces = map[string]*Namespace{}
c.ReplicaSets = map[string]*ReplicaSet{}
if newClientSet == nil {
newClientSet = k8sconfig.MakeClient
}
Expand Down Expand Up @@ -117,6 +124,14 @@ func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules,
} else {
c.namespaceInformer = NewNoOpInformer(c.kc)
}

if rules.DeploymentName || rules.DeploymentUID {
if newReplicaSetInformer == nil {
newReplicaSetInformer = newReplicaSetSharedInformer
}
c.replicasetInformer = newReplicaSetInformer(c.kc, c.Filters.Namespace)
}

return c, err
}

Expand All @@ -141,6 +156,18 @@ func (c *WatchClient) Start() {
c.logger.Error("error adding event handler to namespace informer", zap.Error(err))
}
go c.namespaceInformer.Run(c.stopCh)

if c.Rules.DeploymentName || c.Rules.DeploymentUID {
_, err = c.replicasetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleReplicaSetAdd,
UpdateFunc: c.handleReplicaSetUpdate,
DeleteFunc: c.handleReplicaSetDelete,
})
if err != nil {
c.logger.Error("error adding event handler to replicaset informer", zap.Error(err))
}
go c.replicasetInformer.Run(c.stopCh)
}
}

// Stop signals the the k8s watcher/informer to stop watching for new events.
Expand Down Expand Up @@ -312,7 +339,8 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
c.Rules.DaemonSetUID || c.Rules.DaemonSetName ||
c.Rules.JobUID || c.Rules.JobName ||
c.Rules.StatefulSetUID || c.Rules.StatefulSetName ||
c.Rules.Deployment || c.Rules.CronJobName {
c.Rules.DeploymentName || c.Rules.DeploymentUID ||
c.Rules.CronJobName {
for _, ref := range pod.OwnerReferences {
switch ref.Kind {
case "ReplicaSet":
Expand All @@ -322,11 +350,18 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
if c.Rules.ReplicaSetName {
tags[conventions.AttributeK8SReplicaSetName] = ref.Name
}
if c.Rules.Deployment {
// format: [deployment-name]-[Random-String-For-ReplicaSet]
parts := c.replicasetRegex.FindStringSubmatch(ref.Name)
if len(parts) == 2 {
tags[conventions.AttributeK8SDeploymentName] = parts[1]
if c.Rules.DeploymentName {
if replicaset, ok := c.getReplicaSet(string(ref.UID)); ok {
if replicaset.Deployment.Name != "" {
tags[conventions.AttributeK8SDeploymentName] = replicaset.Deployment.Name
}
}
}
if c.Rules.DeploymentUID {
if replicaset, ok := c.getReplicaSet(string(ref.UID)); ok {
if replicaset.Deployment.Name != "" {
tags[conventions.AttributeK8SDeploymentUID] = replicaset.Deployment.UID
}
}
}
case "DaemonSet":
Expand Down Expand Up @@ -661,3 +696,67 @@ func needContainerAttributes(rules ExtractionRules) bool {
rules.ContainerImageTag ||
rules.ContainerID
}

func (c *WatchClient) handleReplicaSetAdd(obj interface{}) {
observability.RecordReplicaSetAdded()
if replicaset, ok := obj.(*apps_v1.ReplicaSet); ok {
c.addOrUpdateReplicaSet(replicaset)
} else {
c.logger.Error("object received was not of type apps_v1.ReplicaSet", zap.Any("received", obj))
}
}

func (c *WatchClient) handleReplicaSetUpdate(old, new interface{}) {
observability.RecordReplicaSetUpdated()
if replicaset, ok := new.(*apps_v1.ReplicaSet); ok {
c.addOrUpdateReplicaSet(replicaset)
} else {
c.logger.Error("object received was not of type apps_v1.ReplicaSet", zap.Any("received", new))
}
}

func (c *WatchClient) handleReplicaSetDelete(obj interface{}) {
observability.RecordReplicaSetDeleted()
if replicaset, ok := obj.(*apps_v1.ReplicaSet); ok {
c.m.Lock()
key := string(replicaset.UID)
delete(c.ReplicaSets, key)
c.m.Unlock()
} else {
c.logger.Error("object received was not of type apps_v1.ReplicaSet", zap.Any("received", obj))
}
}

func (c *WatchClient) addOrUpdateReplicaSet(replicaset *apps_v1.ReplicaSet) {
newReplicaSet := &ReplicaSet{
Name: replicaset.Name,
Namespace: replicaset.Namespace,
UID: string(replicaset.UID),
}

for _, ownerReference := range replicaset.OwnerReferences {
if ownerReference.Kind == "Deployment" && ownerReference.Controller != nil && *ownerReference.Controller {
newReplicaSet.Deployment = Deployment{
Name: ownerReference.Name,
UID: string(ownerReference.UID),
}
break
}
}

c.m.Lock()
if replicaset.UID != "" {
c.ReplicaSets[string(replicaset.UID)] = newReplicaSet
}
c.m.Unlock()
}

func (c *WatchClient) getReplicaSet(uid string) (*ReplicaSet, bool) {
c.m.RLock()
replicaset, ok := c.ReplicaSets[uid]
c.m.RUnlock()
if ok {
return replicaset, ok
}
return nil, false
}
Loading