Skip to content

Commit

Permalink
[receiver/k8s_cluster] Do not store unused data in the k8s API cache (o…
Browse files Browse the repository at this point in the history
…pen-telemetry#23417)

This change removes unused k8s informer data from the cache to reduce RAM utilization.

Tried it on a cluster with 40 nodes and 1000 pods, and it gave up to 30% reduction in RAM usage.
  • Loading branch information
dmitryax authored Jun 16, 2023
1 parent 260f34f commit 5cfcdeb
Show file tree
Hide file tree
Showing 13 changed files with 589 additions and 26 deletions.
15 changes: 15 additions & 0 deletions .chloggen/k8scluster-dont-store-unused-data-cache.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: receiver/k8s_cluster

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Do not store unused data in the k8s API cache to reduce RAM usage

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [23417]
31 changes: 31 additions & 0 deletions receiver/k8sclusterreceiver/informer_transform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package k8sclusterreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver"

import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/jobs"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/node"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/pod"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/replicaset"
)

// transformObject transforms the k8s object by removing the data that is not utilized by the receiver.
// Only highly utilized objects are transformed here while others are kept as is.
func transformObject(object interface{}) (interface{}, error) {
switch o := object.(type) {
case *corev1.Pod:
return pod.Transform(o), nil
case *corev1.Node:
return node.Transform(o), nil
case *appsv1.ReplicaSet:
return replicaset.Transform(o), nil
case *batchv1.Job:
return jobs.Transform(o), nil
}
return object, nil
}
87 changes: 87 additions & 0 deletions receiver/k8sclusterreceiver/informer_transform_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package k8sclusterreceiver

import (
"testing"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils"
)

func TestTransformObject(t *testing.T) {
i := 1
intPtr := &i
tests := []struct {
name string
object interface{}
want interface{}
same bool
}{
{
name: "pod",
object: testutils.NewPodWithContainer(
"1",
testutils.NewPodSpecWithContainer("container-name"),
testutils.NewPodStatusWithContainer("container-name", "container-id"),
),
want: func() *corev1.Pod {
pod := testutils.NewPodWithContainer(
"1",
testutils.NewPodSpecWithContainer("container-name"),
testutils.NewPodStatusWithContainer("container-name", "container-id"),
)
pod.Spec.Containers[0].Image = ""
pod.Status.ContainerStatuses[0].State = corev1.ContainerState{}
return pod
}(),
same: false,
},
{
name: "node",
object: testutils.NewNode("1"),
want: testutils.NewNode("1"),
same: false,
},
{
name: "replicaset",
object: testutils.NewReplicaSet("1"),
want: testutils.NewReplicaSet("1"),
same: false,
},
{
name: "job",
object: testutils.NewJob("1"),
want: testutils.NewJob("1"),
same: false,
},
{
// This is a case where we don't transform the object.
name: "hpa",
object: testutils.NewHPA("1"),
want: testutils.NewHPA("1"),
same: true,
},
{
name: "invalid_type",
object: intPtr,
want: intPtr,
same: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := transformObject(tt.object)
assert.NoError(t, err)
assert.Equal(t, tt.want, got)
if tt.same {
assert.Same(t, tt.object, got)
} else {
assert.NotSame(t, tt.object, got)
}
})
}
}
16 changes: 0 additions & 16 deletions receiver/k8sclusterreceiver/internal/collection/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,6 @@ func TestDataCollectorSyncMetadata(t *testing.T) {
},
},
},
{
name: "Empty container id skips container resource",
metadataStore: &metadata.Store{},
resource: testutils.NewPodWithContainer(
"0",
testutils.NewPodSpecWithContainer("container-name"),
testutils.NewPodStatusWithContainer("container-name", ""),
),
want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{
experimentalmetricmetadata.ResourceID("test-pod-0-uid"): {
ResourceIDKey: "k8s.pod.uid",
ResourceID: "test-pod-0-uid",
Metadata: commonPodMetadata,
},
},
},
{
name: "Pod with Owner Reference",
metadataStore: &metadata.Store{},
Expand Down
23 changes: 23 additions & 0 deletions receiver/k8sclusterreceiver/internal/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
batchv1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants"
Expand Down Expand Up @@ -51,6 +52,28 @@ var podsSuccessfulMetric = &metricspb.MetricDescriptor{
Type: metricspb.MetricDescriptor_GAUGE_INT64,
}

// Transform transforms the job to remove the fields that we don't use to reduce RAM utilization.
// IMPORTANT: Make sure to update this function when using a new job fields.
func Transform(job *batchv1.Job) *batchv1.Job {
return &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: job.ObjectMeta.Name,
Namespace: job.ObjectMeta.Namespace,
UID: job.ObjectMeta.UID,
Labels: job.ObjectMeta.Labels,
},
Spec: batchv1.JobSpec{
Completions: job.Spec.Completions,
Parallelism: job.Spec.Parallelism,
},
Status: batchv1.JobStatus{
Active: job.Status.Active,
Succeeded: job.Status.Succeeded,
Failed: job.Status.Failed,
},
}
}

func GetMetrics(j *batchv1.Job) []*agentmetricspb.ExportMetricsServiceRequest {
metrics := make([]*metricspb.Metric, 0, 5)
metrics = append(metrics, []*metricspb.Metric{
Expand Down
70 changes: 70 additions & 0 deletions receiver/k8sclusterreceiver/internal/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ import (
"testing"

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils"
Expand Down Expand Up @@ -60,3 +64,69 @@ func TestJobMetrics(t *testing.T) {
testutils.AssertMetricsInt(t, actualResourceMetrics[0].Metrics[2], "k8s.job.successful_pods",
metricspb.MetricDescriptor_GAUGE_INT64, 3)
}

func TestTransform(t *testing.T) {
originalJob := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "my-job",
Namespace: "default",
UID: "my-job-uid",
Labels: map[string]string{
"app": "my-app",
},
},
Spec: batchv1.JobSpec{
Completions: func() *int32 { completions := int32(1); return &completions }(),
Parallelism: func() *int32 { parallelism := int32(1); return &parallelism }(),
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "my-app",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "my-container",
Image: "busybox",
Command: []string{"echo", "Hello, World!"},
ImagePullPolicy: corev1.PullAlways,
},
},
RestartPolicy: corev1.RestartPolicyNever,
},
},
},
Status: batchv1.JobStatus{
Active: 1,
Succeeded: 2,
Failed: 3,
Conditions: []batchv1.JobCondition{
{
Type: batchv1.JobComplete,
Status: corev1.ConditionTrue,
},
},
},
}
wantJob := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "my-job",
Namespace: "default",
UID: "my-job-uid",
Labels: map[string]string{
"app": "my-app",
},
},
Spec: batchv1.JobSpec{
Completions: func() *int32 { completions := int32(1); return &completions }(),
Parallelism: func() *int32 { parallelism := int32(1); return &parallelism }(),
},
Status: batchv1.JobStatus{
Active: 1,
Succeeded: 2,
Failed: 3,
},
}
assert.Equal(t, wantJob, Transform(originalJob))
}
23 changes: 23 additions & 0 deletions receiver/k8sclusterreceiver/internal/node/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/maps"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
Expand All @@ -34,6 +35,28 @@ var allocatableDesciption = map[string]string{
"storage": "How many bytes of storage remaining that the node can allocate to pods",
}

// Transform transforms the node to remove the fields that we don't use to reduce RAM utilization.
// IMPORTANT: Make sure to update this function when using a new node fields.
func Transform(node *corev1.Node) *corev1.Node {
newNode := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: node.ObjectMeta.Name,
UID: node.ObjectMeta.UID,
Labels: node.ObjectMeta.Labels,
},
Status: corev1.NodeStatus{
Allocatable: node.Status.Allocatable,
},
}
for _, c := range node.Status.Conditions {
newNode.Status.Conditions = append(newNode.Status.Conditions, corev1.NodeCondition{
Type: c.Type,
Status: c.Status,
})
}
return newNode
}

func GetMetrics(node *corev1.Node, nodeConditionTypesToReport, allocatableTypesToReport []string, logger *zap.Logger) []*agentmetricspb.ExportMetricsServiceRequest {
metrics := make([]*metricspb.Metric, 0, len(nodeConditionTypesToReport)+len(allocatableTypesToReport))
// Adding 'node condition type' metrics
Expand Down
63 changes: 63 additions & 0 deletions receiver/k8sclusterreceiver/internal/node/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import (
"testing"

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils"
Expand Down Expand Up @@ -140,3 +143,63 @@ func TestNodeConditionValue(t *testing.T) {
})
}
}

func TestTransform(t *testing.T) {
originalNode := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "my-node",
UID: "my-node-uid",
Labels: map[string]string{
"node-role": "worker",
},
},
Status: corev1.NodeStatus{
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
},
Capacity: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("8"),
corev1.ResourceMemory: resource.MustParse("16Gi"),
},
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("4"),
corev1.ResourceMemory: resource.MustParse("8Gi"),
},
Addresses: []corev1.NodeAddress{
{
Type: corev1.NodeHostName,
Address: "my-node-hostname",
},
{
Type: corev1.NodeInternalIP,
Address: "192.168.1.100",
},
},
},
}
wantNode := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "my-node",
UID: "my-node-uid",
Labels: map[string]string{
"node-role": "worker",
},
},
Status: corev1.NodeStatus{
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
},
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("4"),
corev1.ResourceMemory: resource.MustParse("8Gi"),
},
},
}
assert.Equal(t, wantNode, Transform(originalNode))
}
Loading

0 comments on commit 5cfcdeb

Please sign in to comment.