Skip to content

Commit

Permalink
Account for Logstash memory in resource aggregator (elastic#7853)
Browse files Browse the repository at this point in the history
Logstash resources should be counted towards the ECK managed memory and ERU usage.
  • Loading branch information
pebrc authored May 28, 2024
1 parent f4e48cc commit 5be7250
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 5 deletions.
10 changes: 7 additions & 3 deletions pkg/controller/logstash/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,20 @@ const (

// VersionLabelName is a label used to track the version of a Logstash Pod.
VersionLabelName = "logstash.k8s.elastic.co/version"

// EnvJavaOpts is the documented environment variable to set JVM options for Logstash.
EnvJavaOpts = "LS_JAVA_OPTS"
)

var (
DefaultResources = corev1.ResourceRequirements{
DefaultMemoryLimit = resource.MustParse("2Gi")
DefaultResources = corev1.ResourceRequirements{
Limits: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceMemory: resource.MustParse("2Gi"),
corev1.ResourceMemory: DefaultMemoryLimit,
corev1.ResourceCPU: resource.MustParse("2000m"),
},
Requests: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceMemory: resource.MustParse("2Gi"),
corev1.ResourceMemory: DefaultMemoryLimit,
corev1.ResourceCPU: resource.MustParse("1000m"),
},
}
Expand Down
30 changes: 30 additions & 0 deletions pkg/license/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import (
esv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/elasticsearch/v1"
entv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/enterprisesearch/v1"
kbv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/kibana/v1"
lsv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/apmserver"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/elasticsearch/nodespec"
essettings "github.com/elastic/cloud-on-k8s/v2/pkg/controller/elasticsearch/settings"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/enterprisesearch"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/kibana"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash"
"github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s"
ulog "github.com/elastic/cloud-on-k8s/v2/pkg/utils/log"
)
Expand All @@ -44,6 +46,7 @@ func (a Aggregator) AggregateMemory(ctx context.Context) (resource.Quantity, err
a.aggregateKibanaMemory,
a.aggregateApmServerMemory,
a.aggregateEnterpriseSearchMemory,
a.aggregateLogstashMemory,
} {
memory, err := f(ctx)
if err != nil {
Expand Down Expand Up @@ -138,6 +141,33 @@ func (a Aggregator) aggregateKibanaMemory(ctx context.Context) (resource.Quantit
return total, nil
}

func (a Aggregator) aggregateLogstashMemory(ctx context.Context) (resource.Quantity, error) {
var lsList lsv1alpha1.LogstashList
err := a.client.List(context.Background(), &lsList)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Logstash memory")
}

var total resource.Quantity
for _, ls := range lsList.Items {
mem, err := containerMemLimits(
ls.Spec.PodTemplate.Spec.Containers,
lsv1alpha1.LogstashContainerName,
logstash.EnvJavaOpts, memFromJavaOpts,
logstash.DefaultMemoryLimit,
)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Logstash memory")
}

total.Add(multiply(mem, ls.Spec.Count))
ulog.FromContext(ctx).V(1).Info("Collecting", "namespace", ls.Namespace, "logstash_name", ls.Name,
"memory", mem.String(), "count", ls.Spec.Count)
}

return total, nil
}

func (a Aggregator) aggregateApmServerMemory(ctx context.Context) (resource.Quantity, error) {
var asList apmv1.ApmServerList
err := a.client.List(context.Background(), &asList)
Expand Down
4 changes: 3 additions & 1 deletion pkg/license/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
esv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/elasticsearch/v1"
entv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/enterprisesearch/v1"
kbv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/kibana/v1"
lsv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1"
"github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s"
)

Expand Down Expand Up @@ -141,7 +142,7 @@ func TestAggregator(t *testing.T) {

val, err := aggregator.AggregateMemory(context.Background())
require.NoError(t, err)
require.Equal(t, 325.9073486328125, inGiB(val))
require.Equal(t, 329.9073486328125, inGiB(val))
}

func readObjects(t *testing.T, filePath string) []client.Object {
Expand All @@ -152,6 +153,7 @@ func readObjects(t *testing.T, filePath string) []client.Object {
scheme.AddKnownTypes(kbv1.GroupVersion, &kbv1.Kibana{}, &kbv1.KibanaList{})
scheme.AddKnownTypes(apmv1.GroupVersion, &apmv1.ApmServer{}, &apmv1.ApmServerList{})
scheme.AddKnownTypes(entv1.GroupVersion, &entv1.EnterpriseSearch{}, &entv1.EnterpriseSearchList{})
scheme.AddKnownTypes(lsv1alpha1.GroupVersion, &lsv1alpha1.Logstash{}, &lsv1alpha1.LogstashList{})
decoder := serializer.NewCodecFactory(scheme).UniversalDeserializer()

f, err := os.Open(filePath)
Expand Down
22 changes: 21 additions & 1 deletion pkg/license/testdata/stack.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,24 @@ spec:
memory: 8Gi
env:
- name: JAVA_OPTS
value: -Xms7500m -Xmx7500m
value: -Xms7500m -Xmx7500m
---
apiVersion: logstash.k8s.elastic.co/v1alpha1
kind: Logstash
metadata:
name: logstash-sample
spec:
count: 1
podTemplate:
spec:
containers:
- name: logstash
resources:
requests:
cpu: 3
memory: 1Gi
limits:
memory: 4Gi
env:
- name: LS_JAVA_OPTS
value: -Xms3g -Xmx3g

0 comments on commit 5be7250

Please sign in to comment.