-
Notifications
You must be signed in to change notification settings - Fork 708
/
aggregator.go
246 lines (212 loc) · 7.9 KB
/
aggregator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package license
import (
"context"
"fmt"
"regexp"
"strconv"
"strings"
apmv1 "github.com/elastic/cloud-on-k8s/pkg/apis/apm/v1"
esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
entv1 "github.com/elastic/cloud-on-k8s/pkg/apis/enterprisesearch/v1"
kbv1 "github.com/elastic/cloud-on-k8s/pkg/apis/kibana/v1"
"github.com/elastic/cloud-on-k8s/pkg/controller/apmserver"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/nodespec"
essettings "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/settings"
"github.com/elastic/cloud-on-k8s/pkg/controller/enterprisesearch"
"github.com/elastic/cloud-on-k8s/pkg/controller/kibana"
"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)
// Aggregator aggregates the total of resources of all Elastic managed components
type Aggregator struct {
client k8s.Client
}
type aggregate func() (resource.Quantity, error)
// AggregateMemory aggregates the total memory of all Elastic managed components
func (a Aggregator) AggregateMemory() (resource.Quantity, error) {
var totalMemory resource.Quantity
for _, f := range []aggregate{
a.aggregateElasticsearchMemory,
a.aggregateKibanaMemory,
a.aggregateApmServerMemory,
a.aggregateEnterpriseSearchMemory,
} {
memory, err := f()
if err != nil {
return resource.Quantity{}, err
}
totalMemory.Add(memory)
}
return totalMemory, nil
}
func (a Aggregator) aggregateElasticsearchMemory() (resource.Quantity, error) {
var esList esv1.ElasticsearchList
err := a.client.List(context.Background(), &esList)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Elasticsearch memory")
}
var total resource.Quantity
for _, es := range esList.Items {
for _, nodeSet := range es.Spec.NodeSets {
mem, err := containerMemLimits(
nodeSet.PodTemplate.Spec.Containers,
esv1.ElasticsearchContainerName,
essettings.EnvEsJavaOpts, memFromJavaOpts,
nodespec.DefaultMemoryLimits,
)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Elasticsearch memory")
}
total.Add(multiply(mem, nodeSet.Count))
log.V(1).Info("Collecting", "namespace", es.Namespace, "es_name", es.Name,
"memory", mem.String(), "count", nodeSet.Count)
}
}
return total, nil
}
func (a Aggregator) aggregateEnterpriseSearchMemory() (resource.Quantity, error) {
var entList entv1.EnterpriseSearchList
err := a.client.List(context.Background(), &entList)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Enterprise Search memory")
}
var total resource.Quantity
for _, ent := range entList.Items {
mem, err := containerMemLimits(
ent.Spec.PodTemplate.Spec.Containers,
entv1.EnterpriseSearchContainerName,
enterprisesearch.EnvJavaOpts, memFromJavaOpts,
enterprisesearch.DefaultMemoryLimits,
)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Enterprise Search memory")
}
total.Add(multiply(mem, ent.Spec.Count))
log.V(1).Info("Collecting", "namespace", ent.Namespace, "ent_name", ent.Name,
"memory", mem.String(), "count", ent.Spec.Count)
}
return total, nil
}
func (a Aggregator) aggregateKibanaMemory() (resource.Quantity, error) {
var kbList kbv1.KibanaList
err := a.client.List(context.Background(), &kbList)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Kibana memory")
}
var total resource.Quantity
for _, kb := range kbList.Items {
mem, err := containerMemLimits(
kb.Spec.PodTemplate.Spec.Containers,
kbv1.KibanaContainerName,
kibana.EnvNodeOptions, memFromNodeOptions,
kibana.DefaultMemoryLimits,
)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Kibana memory")
}
total.Add(multiply(mem, kb.Spec.Count))
log.V(1).Info("Collecting", "namespace", kb.Namespace, "kibana_name", kb.Name,
"memory", mem.String(), "count", kb.Spec.Count)
}
return total, nil
}
func (a Aggregator) aggregateApmServerMemory() (resource.Quantity, error) {
var asList apmv1.ApmServerList
err := a.client.List(context.Background(), &asList)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate APM Server memory")
}
var total resource.Quantity
for _, as := range asList.Items {
mem, err := containerMemLimits(
as.Spec.PodTemplate.Spec.Containers,
apmv1.ApmServerContainerName,
"", nil, // no fallback with limits defined in an env var
apmserver.DefaultMemoryLimits,
)
if err != nil {
return resource.Quantity{}, errors.Wrap(err, "failed to aggregate APM Server memory")
}
total.Add(multiply(mem, as.Spec.Count))
log.V(1).Info("Collecting", "namespace", as.Namespace, "as_name", as.Name,
"memory", mem.String(), "count", as.Spec.Count)
}
return total, nil
}
// containerMemLimits reads the container memory limits from the resource specification with fallback
// on the environment variable and on the default limits
func containerMemLimits(
containers []corev1.Container,
containerName string,
envVarName string,
envLookup func(envVar string) (resource.Quantity, error),
defaultLimit resource.Quantity,
) (resource.Quantity, error) {
var mem resource.Quantity
for _, container := range containers {
//nolint:nestif
if container.Name == containerName {
mem = *container.Resources.Limits.Memory()
// if memory is defined at the container level, maybe fallback to the environment variable
if envLookup != nil && mem.IsZero() {
for _, envVar := range container.Env {
if envVar.Name == envVarName {
var err error
mem, err = envLookup(envVar.Value)
if err != nil {
return resource.Quantity{}, err
}
}
}
}
}
}
// if still no memory found, fallback to the default limits
if mem.IsZero() {
mem = defaultLimit
}
return mem, nil
}
// maxHeapSizeRe is the pattern to extract the max Java heap size (-Xmx<size>[g|G|m|M|k|K] in binary units)
var maxHeapSizeRe = regexp.MustCompile(`-Xmx([0-9]+)([gGmMkK]?)(?:\s.+|$)`)
// memFromJavaOpts extracts the maximum Java heap size from a Java options string, multiplies the value by 2
// (giving twice the JVM memory to the container is a common thing people do)
// and converts it to a resource.Quantity
func memFromJavaOpts(javaOpts string) (resource.Quantity, error) {
match := maxHeapSizeRe.FindStringSubmatch(javaOpts)
if len(match) != 3 {
return resource.Quantity{}, errors.Errorf("cannot extract max jvm heap size from %s", javaOpts)
}
value, err := strconv.Atoi(match[1])
if err != nil {
return resource.Quantity{}, err
}
suffix := match[2]
if suffix != "" {
// capitalize the suffix and add `i` to have a surjection of [g|G|m|M|k|K] in [Gi|Mi|Ki]
suffix = strings.ToUpper(match[2]) + "i"
}
// multiply by 2 and convert it to a quantity using the suffix
return resource.ParseQuantity(fmt.Sprintf("%d%s", value*2, suffix))
}
// nodeHeapSizeRe is the pattern to extract the max heap size of the node memory (--max-old-space-size=<mb_size>)
var nodeHeapSizeRe = regexp.MustCompile("--max-old-space-size=([0-9]*)")
// memFromNodeOptions extracts the Node heap size from a Node options string and converts it to a resource.Quantity
func memFromNodeOptions(nodeOpts string) (resource.Quantity, error) {
match := nodeHeapSizeRe.FindStringSubmatch(nodeOpts)
if len(match) != 2 {
return resource.Quantity{}, errors.Errorf("cannot extract max node heap size from %s", nodeOpts)
}
return resource.ParseQuantity(match[1] + "M")
}
// multiply multiplies a resource.Quantity by a value
func multiply(q resource.Quantity, v int32) resource.Quantity {
var result resource.Quantity
result.Set(q.Value() * int64(v))
return result
}