diff --git a/cmd/licensing-info/main.go b/cmd/licensing-info/main.go new file mode 100644 index 0000000000..92f7dc6979 --- /dev/null +++ b/cmd/licensing-info/main.go @@ -0,0 +1,68 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package main + +import ( + "encoding/json" + "fmt" + "log" + + eckscheme "github.com/elastic/cloud-on-k8s/pkg/controller/common/scheme" + "github.com/elastic/cloud-on-k8s/pkg/license" + "k8s.io/client-go/kubernetes/scheme" + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" // auth on gke + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/config" +) + +// Simple program that returns the licensing information, including the total memory of all Elastic managed components by +// the operator and its equivalent in "Enterprise Resource Units". +// +// The main objective of its existence is to show a use of the ResourceReporter and also to propose an alternative to +// immediately retrieve the licensing information. +// +// Example of use: +// +// > go run cmd/licensing-info/main.go +// { +// "timestamp": "2019-12-17T11:56:02+01:00", +// "license_level": "basic", +// "memory": "5.37GB", +// "enterprise_resource_units": "1" +// } +// + +func main() { + licensingInfo, err := license.NewResourceReporter(newK8sClient()).Get() + if err != nil { + log.Fatal(err, "Failed to get licensing info") + } + + bytes, err := json.Marshal(licensingInfo) + if err != nil { + log.Fatal(err, "Failed to marshal licensing info") + } + + fmt.Print(string(bytes)) +} + +func newK8sClient() client.Client { + cfg, err := config.GetConfig() + if err != nil { + log.Fatal(err, "Failed to get a Kubernetes config") + } + + err = eckscheme.SetupScheme() + if err != nil { + log.Fatal(err, "Failed to set up the ECK scheme") + } + + c, err := client.New(cfg, client.Options{Scheme: scheme.Scheme}) + if err != nil { + log.Fatal(err, "Failed to create a new Kubernetes client") + } + + return c +} diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 3f3135754b..d925677888 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -12,8 +12,6 @@ import ( "strings" "time" - "k8s.io/client-go/rest" - // allow gcp authentication _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" @@ -21,6 +19,7 @@ import ( "github.com/spf13/viper" "k8s.io/client-go/kubernetes" clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" logf "sigs.k8s.io/controller-runtime/pkg/log" @@ -45,6 +44,7 @@ import ( "github.com/elastic/cloud-on-k8s/pkg/controller/webhook" "github.com/elastic/cloud-on-k8s/pkg/dev" "github.com/elastic/cloud-on-k8s/pkg/dev/portforward" + licensing "github.com/elastic/cloud-on-k8s/pkg/license" "github.com/elastic/cloud-on-k8s/pkg/utils/net" ) @@ -321,6 +321,13 @@ func execute() { log.Error(err, "unable to create controller", "controller", "LicenseTrial") os.Exit(1) } + + go func() { + time.Sleep(10 * time.Second) // wait some arbitrary time for the manager to start + mgr.GetCache().WaitForCacheSync(nil) // wait until k8s client cache is initialized + r := licensing.NewResourceReporter(mgr.GetClient()) + r.Start(operatorNamespace, licensing.ResourceReporterFrequency) + }() } log.Info("Starting the manager", "uuid", operatorInfo.OperatorUUID, diff --git a/pkg/controller/apmserver/pod.go b/pkg/controller/apmserver/pod.go index 7343497998..1dd22222a1 100644 --- a/pkg/controller/apmserver/pod.go +++ b/pkg/controller/apmserver/pod.go @@ -31,14 +31,17 @@ const ( ConfigVolumePath = ApmBaseDir + "/config" ) -var DefaultResources = corev1.ResourceRequirements{ - Requests: map[corev1.ResourceName]resource.Quantity{ - corev1.ResourceMemory: resource.MustParse("512Mi"), - }, - Limits: map[corev1.ResourceName]resource.Quantity{ - corev1.ResourceMemory: resource.MustParse("512Mi"), - }, -} +var ( + DefaultMemoryLimits = resource.MustParse("512Mi") + DefaultResources = corev1.ResourceRequirements{ + Requests: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceMemory: DefaultMemoryLimits, + }, + Limits: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceMemory: DefaultMemoryLimits, + }, + } +) func readinessProbe(tls bool) corev1.Probe { scheme := corev1.URISchemeHTTP diff --git a/pkg/controller/common/license/check.go b/pkg/controller/common/license/check.go index ae04934bab..48d9586c6f 100644 --- a/pkg/controller/common/license/check.go +++ b/pkg/controller/common/license/check.go @@ -5,6 +5,7 @@ package license import ( + "sort" "time" "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" @@ -14,6 +15,7 @@ import ( ) type Checker interface { + CurrentEnterpriseLicense() (*EnterpriseLicense, error) EnterpriseFeaturesEnabled() (bool, error) Valid(l EnterpriseLicense) (bool, error) } @@ -45,27 +47,49 @@ func (lc *checker) publicKeyFor(l EnterpriseLicense) ([]byte, error) { }, &signatureSec) } -// EnterpriseFeaturesEnabled returns true if a valid enterprise license is installed. -func (lc *checker) EnterpriseFeaturesEnabled() (bool, error) { +// CurrentEnterpriseLicense returns the currently valid Enterprise license if installed. +func (lc *checker) CurrentEnterpriseLicense() (*EnterpriseLicense, error) { licenses, err := EnterpriseLicenses(lc.k8sClient) if err != nil { - return false, errors.Wrap(err, "failed to list enterprise licenses") + return nil, errors.Wrap(err, "failed to list enterprise licenses") } + sort.Slice(licenses, func(i, j int) bool { + t1, t2 := EnterpriseLicenseTypeOrder[licenses[i].License.Type], EnterpriseLicenseTypeOrder[licenses[j].License.Type] + if t1 != t2 { // sort by type (first the most features) + return t1 > t2 + } + // and by expiry date (first which expires last) + return licenses[i].License.ExpiryDateInMillis > licenses[j].License.ExpiryDateInMillis + }) + + // pick the first valid Enterprise license in the sorted slice for _, l := range licenses { valid, err := lc.Valid(l) if err != nil { - return false, err + return nil, err } if valid { - return true, nil + return &l, nil } } - return false, nil + return nil, nil +} + +// EnterpriseFeaturesEnabled returns true if a valid enterprise license is installed. +func (lc *checker) EnterpriseFeaturesEnabled() (bool, error) { + license, err := lc.CurrentEnterpriseLicense() + if err != nil { + return false, err + } + return license != nil, nil } // Valid returns true if the given Enterprise license is valid or an error if any. func (lc *checker) Valid(l EnterpriseLicense) (bool, error) { + if l.IsTrial() { + return true, nil + } pk, err := lc.publicKeyFor(l) if err != nil { return false, errors.Wrap(err, "while loading signature secret") @@ -83,6 +107,10 @@ func (lc *checker) Valid(l EnterpriseLicense) (bool, error) { type MockChecker struct{} +func (MockChecker) CurrentEnterpriseLicense() (*EnterpriseLicense, error) { + return &EnterpriseLicense{}, nil +} + func (MockChecker) EnterpriseFeaturesEnabled() (bool, error) { return true, nil } diff --git a/pkg/controller/common/license/check_test.go b/pkg/controller/common/license/check_test.go index 46ffb84aec..b2af7952e4 100644 --- a/pkg/controller/common/license/check_test.go +++ b/pkg/controller/common/license/check_test.go @@ -82,3 +82,103 @@ func TestChecker_EnterpriseFeaturesEnabled(t *testing.T) { }) } } + +func Test_CurrentEnterpriseLicense(t *testing.T) { + privKey, err := x509.ParsePKCS1PrivateKey(privateKeyFixture) + require.NoError(t, err) + + validLicenseFixture := licenseFixtureV3 + validLicenseFixture.License.ExpiryDateInMillis = chrono.ToMillis(time.Now().Add(1 * time.Hour)) + signatureBytes, err := NewSigner(privKey).Sign(validLicenseFixture) + require.NoError(t, err) + validLicense := asRuntimeObjects(validLicenseFixture, signatureBytes) + + validTrialLicenseFixture := trialLicenseFixture + validTrialLicenseFixture.License.ExpiryDateInMillis = chrono.ToMillis(time.Now().Add(1 * time.Hour)) + trialSignatureBytes, err := NewSigner(privKey).Sign(validTrialLicenseFixture) + require.NoError(t, err) + validTrialLicense := asRuntimeObjects(validTrialLicenseFixture, trialSignatureBytes) + + type fields struct { + initialObjects []runtime.Object + operatorNamespace string + publicKey []byte + } + + tests := []struct { + name string + fields fields + want bool + wantErr bool + wantType OperatorLicenseType + }{ + { + name: "get valid enterprise license: OK", + fields: fields{ + initialObjects: validLicense, + operatorNamespace: "test-system", + publicKey: publicKeyBytesFixture(t), + }, + want: true, + wantType: LicenseTypeEnterprise, + wantErr: false, + }, + { + name: "get valid trial enterprise license: OK", + fields: fields{ + initialObjects: validTrialLicense, + operatorNamespace: "test-system", + publicKey: publicKeyBytesFixture(t), + }, + want: true, + wantType: LicenseTypeEnterpriseTrial, + wantErr: false, + }, + { + name: "get valid enterprise license among two licenses: OK", + fields: fields{ + initialObjects: append(validLicense, validTrialLicense...), + operatorNamespace: "test-system", + publicKey: publicKeyBytesFixture(t), + }, + want: true, + wantType: LicenseTypeEnterprise, + wantErr: false, + }, + { + name: "no license: OK", + fields: fields{ + initialObjects: []runtime.Object{}, + operatorNamespace: "test-system", + }, + want: false, + wantErr: false, + }, + { + name: "invalid public key: FAIL", + fields: fields{ + initialObjects: validLicense, + operatorNamespace: "test-system", + publicKey: []byte("not a public key"), + }, + want: false, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + lc := &checker{ + k8sClient: k8s.WrappedFakeClient(tt.fields.initialObjects...), + operatorNamespace: tt.fields.operatorNamespace, + publicKey: tt.fields.publicKey, + } + got, err := lc.CurrentEnterpriseLicense() + if (err != nil) != tt.wantErr { + t.Errorf("Checker.CurrentEnterpriseLicense() err = %v, wantErr %v", err, tt.wantErr) + } + if tt.want != (got != nil) { + t.Errorf("Checker.CurrentEnterpriseLicense() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/controller/common/license/fixtures_test.go b/pkg/controller/common/license/fixtures_test.go index ecf9aab1ce..b27d007ec9 100644 --- a/pkg/controller/common/license/fixtures_test.go +++ b/pkg/controller/common/license/fixtures_test.go @@ -8,6 +8,7 @@ import ( "crypto/rsa" "crypto/x509" "encoding/json" + "fmt" "testing" "github.com/elastic/cloud-on-k8s/pkg/controller/common" @@ -48,6 +49,12 @@ var ( } ) +var trialLicenseFixture = EnterpriseLicense{ + License: LicenseSpec{ + Type: LicenseTypeEnterpriseTrial, + }, +} + func withSignature(l EnterpriseLicense, sig []byte) EnterpriseLicense { l.License.Signature = string(sig) return l @@ -58,11 +65,12 @@ func asRuntimeObjects(l EnterpriseLicense, sig []byte) []runtime.Object { if err != nil { panic(err) } + return []runtime.Object{ &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Namespace: "test-system", - Name: "test-license", + Name: fmt.Sprintf("test-%s-license", string(l.License.Type)), Labels: map[string]string{ common.TypeLabelName: Type, LicenseLabelScope: string(LicenseScopeOperator), diff --git a/pkg/controller/common/license/model.go b/pkg/controller/common/license/model.go index 300239ee24..2e051b5714 100644 --- a/pkg/controller/common/license/model.go +++ b/pkg/controller/common/license/model.go @@ -45,6 +45,12 @@ type LicenseSpec struct { Version int // not marshalled but part of the signature } +// EnterpriseLicenseTypeOrder license types mapped to ints in increasing order of feature sets for sorting purposes. +var EnterpriseLicenseTypeOrder = map[OperatorLicenseType]int{ + LicenseTypeEnterpriseTrial: 0, + LicenseTypeEnterprise: 1, +} + // StartTime is the date as of which this license is valid. func (l EnterpriseLicense) StartTime() time.Time { return time.Unix(0, l.License.StartDateInMillis*int64(time.Millisecond)) diff --git a/pkg/controller/elasticsearch/nodespec/defaults.go b/pkg/controller/elasticsearch/nodespec/defaults.go index 1ebb15d20f..04c2394e47 100644 --- a/pkg/controller/elasticsearch/nodespec/defaults.go +++ b/pkg/controller/elasticsearch/nodespec/defaults.go @@ -35,6 +35,7 @@ var ( {Name: "transport", ContainerPort: network.TransportPort, Protocol: corev1.ProtocolTCP}, } + DefaultMemoryLimits = resource.MustParse("2Gi") // DefaultResources for the Elasticsearch container. The JVM default heap size is 1Gi, so we // request at least 2Gi for the container to make sure ES can work properly. // Not applying this minimum default would make ES randomly crash (OOM) on small machines. @@ -42,10 +43,10 @@ var ( // No CPU requirement is set by default. DefaultResources = corev1.ResourceRequirements{ Requests: map[corev1.ResourceName]resource.Quantity{ - corev1.ResourceMemory: resource.MustParse("2Gi"), + corev1.ResourceMemory: DefaultMemoryLimits, }, Limits: map[corev1.ResourceName]resource.Quantity{ - corev1.ResourceMemory: resource.MustParse("2Gi"), + corev1.ResourceMemory: DefaultMemoryLimits, }, } ) diff --git a/pkg/controller/kibana/config/settings.go b/pkg/controller/kibana/config/settings.go index fb63ca930f..31287cb7d1 100644 --- a/pkg/controller/kibana/config/settings.go +++ b/pkg/controller/kibana/config/settings.go @@ -17,8 +17,13 @@ import ( "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" ) -// Kibana configuration settings file -const SettingsFilename = "kibana.yml" +const ( + // Kibana configuration settings file + SettingsFilename = "kibana.yml" + // Environment variable name for the Node options that can be used to increase the Kibana maximum memory limit + EnvNodeOpts = "NODE_OPTS" +) + // CanonicalConfig contains configuration for Kibana ("kibana.yml"), // as a hierarchical key-value configuration. diff --git a/pkg/controller/kibana/pod/pod.go b/pkg/controller/kibana/pod/pod.go index 2599fe9643..2f5e21fa81 100644 --- a/pkg/controller/kibana/pod/pod.go +++ b/pkg/controller/kibana/pod/pod.go @@ -30,14 +30,17 @@ var ports = []corev1.ContainerPort{ {Name: "http", ContainerPort: int32(HTTPPort), Protocol: corev1.ProtocolTCP}, } -var DefaultResources = corev1.ResourceRequirements{ - Requests: map[corev1.ResourceName]resource.Quantity{ - corev1.ResourceMemory: resource.MustParse("1Gi"), - }, - Limits: map[corev1.ResourceName]resource.Quantity{ - corev1.ResourceMemory: resource.MustParse("1Gi"), - }, -} +var ( + DefaultMemoryLimits = resource.MustParse("1Gi") + DefaultResources = corev1.ResourceRequirements{ + Requests: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceMemory: DefaultMemoryLimits, + }, + Limits: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceMemory: DefaultMemoryLimits, + }, + } +) // readinessProbe is the readiness probe for the Kibana container func readinessProbe(useTLS bool) corev1.Probe { diff --git a/pkg/controller/license/license_controller.go b/pkg/controller/license/license_controller.go index 0375c64f6e..9d152e0bdd 100644 --- a/pkg/controller/license/license_controller.go +++ b/pkg/controller/license/license_controller.go @@ -153,7 +153,7 @@ type ReconcileLicenses struct { checker license.Checker } -// findLicense tries to find the best license available. +// findLicense tries to find the best Elastic stack license available. func findLicense(c k8s.Client, checker license.Checker, minVersion *version.Version) (esclient.License, string, bool) { licenseList, errs := license.EnterpriseLicensesOrErrors(c) if len(errs) > 0 { @@ -220,9 +220,6 @@ func (r *ReconcileLicenses) reconcileClusterLicense(cluster esv1.Elasticsearch) return noResult, true, err } matchingSpec, parent, found := findLicense(r, r.checker, minVersion) - if err != nil { - return noResult, true, err - } if !found { // no license, delete cluster level licenses to revert to basic log.V(1).Info("No enterprise license found. Attempting to remove cluster license secret", "namespace", cluster.Namespace, "es_name", cluster.Name) diff --git a/pkg/license/aggregator.go b/pkg/license/aggregator.go new file mode 100644 index 0000000000..5accd53888 --- /dev/null +++ b/pkg/license/aggregator.go @@ -0,0 +1,215 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package license + +import ( + "fmt" + "regexp" + "strconv" + "strings" + + apmv1 "github.com/elastic/cloud-on-k8s/pkg/apis/apm/v1" + esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1" + kbv1 "github.com/elastic/cloud-on-k8s/pkg/apis/kibana/v1" + "github.com/elastic/cloud-on-k8s/pkg/controller/apmserver" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/nodespec" + essettings "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/settings" + kbconfig "github.com/elastic/cloud-on-k8s/pkg/controller/kibana/config" + kbpod "github.com/elastic/cloud-on-k8s/pkg/controller/kibana/pod" + "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" +) + +// Aggregator aggregates the total of resources of all Elastic managed components +type Aggregator struct { + client k8s.Client +} + +type aggregate func() (resource.Quantity, error) + +// AggregateMemory aggregates the total memory of all Elastic managed components +func (a Aggregator) AggregateMemory() (resource.Quantity, error) { + var totalMemory resource.Quantity + + for _, f := range []aggregate{ + a.aggregateElasticsearchMemory, + a.aggregateKibanaMemory, + a.aggregateApmServerMemory, + } { + memory, err := f() + if err != nil { + return resource.Quantity{}, err + } + totalMemory.Add(memory) + } + + return totalMemory, nil +} + +func (a Aggregator) aggregateElasticsearchMemory() (resource.Quantity, error) { + var esList esv1.ElasticsearchList + err := a.client.List(&esList) + if err != nil { + return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Elasticsearch memory") + } + + var total resource.Quantity + for _, es := range esList.Items { + for _, nodeSet := range es.Spec.NodeSets { + mem, err := containerMemLimits( + nodeSet.PodTemplate.Spec.Containers, + esv1.ElasticsearchContainerName, + essettings.EnvEsJavaOpts, memFromJavaOpts, + nodespec.DefaultMemoryLimits, + ) + if err != nil { + return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Elasticsearch memory") + } + + total.Add(multiply(mem, nodeSet.Count)) + log.V(1).Info("Collecting", "namespace", es.Namespace, "es_name", es.Name, + "memory", mem.String(), "count", nodeSet.Count) + } + } + + return total, nil +} + +func (a Aggregator) aggregateKibanaMemory() (resource.Quantity, error) { + var kbList kbv1.KibanaList + err := a.client.List(&kbList) + if err != nil { + return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Kibana memory") + } + + var total resource.Quantity + for _, kb := range kbList.Items { + mem, err := containerMemLimits( + kb.Spec.PodTemplate.Spec.Containers, + kbv1.KibanaContainerName, + kbconfig.EnvNodeOpts, memFromNodeOptions, + kbpod.DefaultMemoryLimits, + ) + if err != nil { + return resource.Quantity{}, errors.Wrap(err, "failed to aggregate Kibana memory") + } + + total.Add(multiply(mem, kb.Spec.Count)) + log.V(1).Info("Collecting", "namespace", kb.Namespace, "kibana_name", kb.Name, + "memory", mem.String(), "count", kb.Spec.Count) + } + + return total, nil +} + +func (a Aggregator) aggregateApmServerMemory() (resource.Quantity, error) { + var asList apmv1.ApmServerList + err := a.client.List(&asList) + if err != nil { + return resource.Quantity{}, errors.Wrap(err, "failed to aggregate APM Server memory") + } + + var total resource.Quantity + for _, as := range asList.Items { + mem, err := containerMemLimits( + as.Spec.PodTemplate.Spec.Containers, + apmv1.ApmServerContainerName, + "", nil, // no fallback with limits defined in an env var + apmserver.DefaultMemoryLimits, + ) + if err != nil { + return resource.Quantity{}, errors.Wrap(err, "failed to aggregate APM Server memory") + } + + total.Add(multiply(mem, as.Spec.Count)) + log.V(1).Info("Collecting", "namespace", as.Namespace, "as_name", as.Name, + "memory", mem.String(), "count", as.Spec.Count) + } + + return total, nil +} + +// containerMemLimits reads the container memory limits from the resource specification with fallback +// on the environment variable and on the default limits +func containerMemLimits( + containers []corev1.Container, + containerName string, + envVarName string, + envLookup func(envVar string) (resource.Quantity, error), + defaultLimit resource.Quantity, +) (resource.Quantity, error) { + var mem resource.Quantity + for _, container := range containers { + if container.Name == containerName { + mem = *container.Resources.Limits.Memory() + + // if memory is defined at the container level, maybe fallback to the environment variable + if envLookup != nil && mem.IsZero() { + for _, envVar := range container.Env { + if envVar.Name == envVarName { + var err error + mem, err = envLookup(envVar.Value) + if err != nil { + return resource.Quantity{}, err + } + } + } + } + } + } + + // if still no memory found, fallback to the default limits + if mem.IsZero() { + mem = defaultLimit + } + + return mem, nil +} + +// maxHeapSizeRe is the pattern to extract the max Java heap size (-Xmx[g|G|m|M|k|K] in binary units) +var maxHeapSizeRe = regexp.MustCompile(`-Xmx([0-9]+)([gGmMkK]?)(?:\s.+|$)`) + +// memFromJavaOpts extracts the maximum Java heap size from a Java options string, multiplies the value by 2 +// (giving twice the JVM memory to the container is a common thing people do) +// and converts it to a resource.Quantity +func memFromJavaOpts(javaOpts string) (resource.Quantity, error) { + match := maxHeapSizeRe.FindStringSubmatch(javaOpts) + if len(match) != 3 { + return resource.Quantity{}, fmt.Errorf("cannot extract max jvm heap size from %s", javaOpts) + } + value, err := strconv.Atoi(match[1]) + if err != nil { + return resource.Quantity{}, err + } + suffix := match[2] + if suffix != "" { + // capitalize the suffix and add `i` to have a surjection of [g|G|m|M|k|K] in [Gi|Mi|Ki] + suffix = strings.ToUpper(match[2]) + "i" + } + // multiply by 2 and convert it to a quantity using the suffix + return resource.ParseQuantity(fmt.Sprintf("%d%s", value*2, suffix)) +} + +// nodeHeapSizeRe is the pattern to extract the max heap size of the node memory (--max-old-space-size=) +var nodeHeapSizeRe = regexp.MustCompile("--max-old-space-size=([0-9]*)") + +// memFromNodeOptions extracts the Node heap size from a Node options string and converts it to a resource.Quantity +func memFromNodeOptions(nodeOpts string) (resource.Quantity, error) { + match := nodeHeapSizeRe.FindStringSubmatch(nodeOpts) + if len(match) != 2 { + return resource.Quantity{}, fmt.Errorf("cannot extract max node heap size from %s", nodeOpts) + } + + return resource.ParseQuantity(match[1] + "M") +} + +// multiply multiplies a resource.Quantity by a value +func multiply(q resource.Quantity, v int32) resource.Quantity { + var result resource.Quantity + result.Set(q.Value() * int64(v)) + return result +} diff --git a/pkg/license/aggregator_test.go b/pkg/license/aggregator_test.go new file mode 100644 index 0000000000..ebdf5617c0 --- /dev/null +++ b/pkg/license/aggregator_test.go @@ -0,0 +1,125 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package license + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/api/resource" +) + +func TestMemFromJavaOpts(t *testing.T) { + tests := []struct { + name string + actual string + expected resource.Quantity + isErr bool + }{ + { + name: "in k", + actual: "-Xms1k -Xmx8388608k", + expected: resource.MustParse("16777216Ki"), + }, + { + name: "in K", + actual: "-Xmx1024K", + expected: resource.MustParse("2048Ki"), + }, + { + name: "in m", + actual: "-Xmx512m -Xms256m", + expected: resource.MustParse("1024Mi"), + }, + { + name: "in M", + actual: "-Xmx256M", + expected: resource.MustParse("512Mi"), + }, + { + name: "in g", + actual: "-Xmx64g", + expected: resource.MustParse("128Gi"), + }, + { + name: "in G", + actual: "-Xmx64G", + expected: resource.MustParse("128Gi"), + }, + { + name: "without unit", + actual: "-Xmx1048576", + expected: resource.MustParse("2Mi"), + }, + { + name: "without value", + actual: "-XmxM", + isErr: true, + }, + { + name: "with an invalid Xmx", + actual: "-XMX1k", + isErr: true, + }, + { + name: "with an invalid unit", + actual: "-Xmx64GB", + isErr: true, + }, + { + name: "without xmx", + actual: "-Xms1k", + expected: resource.MustParse("16777216k"), + isErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := memFromJavaOpts(tt.actual) + if tt.isErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + if !got.Equal(tt.expected) { + t.Errorf("memFromJavaOpts(%s) = %v, want %s", tt.actual, got.String(), tt.expected.String()) + } + } + }) + } +} + +func TestMemFromNodeOpts(t *testing.T) { + tests := []struct { + name string + actual string + expected string + isErr bool + }{ + { + name: "with max-old-space-size option", + actual: "--max-old-space-size=2048", + expected: "2048M", + }, + { + name: "empty options", + actual: "", + isErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + q, err := memFromNodeOptions(tt.actual) + if tt.isErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + got := resource.MustParse(tt.expected) + if !got.Equal(q) { + t.Errorf("memFromNodeOptions(%s) = %v, want %s", tt.actual, got, tt.expected) + } + } + }) + } +} diff --git a/pkg/license/license.go b/pkg/license/license.go new file mode 100644 index 0000000000..699f9b3f08 --- /dev/null +++ b/pkg/license/license.go @@ -0,0 +1,130 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package license + +import ( + "encoding/json" + "fmt" + "math" + "time" + + "github.com/elastic/cloud-on-k8s/pkg/controller/common" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/license" + "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + // defaultOperatorLicenseLevel is the default license level when no operator license is installed + defaultOperatorLicenseLevel = "basic" + // licensingCfgMapName is the name of the config map used to store licensing information + licensingCfgMapName = "elastic-licensing" + // Type represents the Elastic usage type used to mark the config map that stores licensing information + Type = "elastic-usage" +) + +// LicensingInfo represents information about the operator license including the total memory of all Elastic managed +// components +type LicensingInfo struct { + Timestamp string `json:"timestamp"` + EckLicenseLevel string `json:"eck_license_level"` + TotalManagedMemory string `json:"total_managed_memory"` + EnterpriseResourceUnits string `json:"enterprise_resource_units"` +} + +// LicensingResolver resolves the licensing information of the operator +type LicensingResolver struct { + operatorNs string + client k8s.Client +} + +// ToInfo returns licensing information given the total memory of all Elastic managed components +func (r LicensingResolver) ToInfo(totalMemory resource.Quantity) (LicensingInfo, error) { + eru := inEnterpriseResourceUnits(totalMemory) + memoryInGB := inGB(totalMemory) + licenseLevel, err := r.getOperatorLicenseLevel() + if err != nil { + return LicensingInfo{}, err + } + + return LicensingInfo{ + Timestamp: time.Now().Format(time.RFC3339), + EckLicenseLevel: licenseLevel, + TotalManagedMemory: memoryInGB, + EnterpriseResourceUnits: eru, + }, nil +} + +// Save updates or creates licensing information in a config map +func (r LicensingResolver) Save(info LicensingInfo, operatorNs string) error { + data, err := info.toMap() + if err != nil { + return err + } + + log.V(1).Info("Saving", "namespace", operatorNs, "configmap_name", licensingCfgMapName, "license_info", info) + cm := corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: operatorNs, + Name: licensingCfgMapName, + Labels: map[string]string{ + common.TypeLabelName: Type, + }, + }, + Data: data, + } + err = r.client.Update(&cm) + if apierrors.IsNotFound(err) { + return r.client.Create(&cm) + } + return err +} + +// getOperatorLicenseLevel gets the level of the operator license. +// If no license is found, the defaultOperatorLicenseLevel is returned. +func (r LicensingResolver) getOperatorLicenseLevel() (string, error) { + checker := license.NewLicenseChecker(r.client, r.operatorNs) + lic, err := checker.CurrentEnterpriseLicense() + if err != nil { + return "", err + } + + if lic == nil { + return defaultOperatorLicenseLevel, nil + } + + return string(lic.License.Type), nil +} + +// inGB converts a resource.Quantity in gigabytes +func inGB(q resource.Quantity) string { + // divide the value (in bytes) per 1 billion (1GB) + return fmt.Sprintf("%0.2fGB", float32(q.Value())/1000000000) +} + +// inEnterpriseResourceUnits converts a resource.Quantity to Elastic Enterprise resource units +func inEnterpriseResourceUnits(q resource.Quantity) string { + // divide by the value (in bytes) per 64 billion (64 GB) + eru := float64(q.Value()) / 64000000000 + // round to the nearest superior integer + return fmt.Sprintf("%d", int64(math.Ceil(eru))) +} + +// toMap transforms a LicensingInfo to a map of string, in order to fill in the data of a config map +func (i LicensingInfo) toMap() (map[string]string, error) { + bytes, err := json.Marshal(&i) + if err != nil { + return nil, err + } + var m map[string]string + err = json.Unmarshal(bytes, &m) + if err != nil { + return nil, err + } + return m, nil +} diff --git a/pkg/license/license_test.go b/pkg/license/license_test.go new file mode 100644 index 0000000000..25958420d1 --- /dev/null +++ b/pkg/license/license_test.go @@ -0,0 +1,25 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package license + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestToMap(t *testing.T) { + i := LicensingInfo{} + data, err := i.toMap() + assert.NoError(t, err) + assert.Equal(t, 4, len(data)) + assert.Equal(t, "", data["eck_license_level"]) + + i = LicensingInfo{EckLicenseLevel: "basic"} + data, err = i.toMap() + assert.NoError(t, err) + assert.Equal(t, 4, len(data)) + assert.Equal(t, "basic", data["eck_license_level"]) +} diff --git a/pkg/license/reporter.go b/pkg/license/reporter.go new file mode 100644 index 0000000000..bbe9e8adca --- /dev/null +++ b/pkg/license/reporter.go @@ -0,0 +1,75 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package license + +import ( + "time" + + "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" + "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +// ResourceReporterFrequency defines the reporting frequency of the resource reporter +const ResourceReporterFrequency = 2 * time.Minute + +var log = logf.Log.WithName("resource") + +// ResourceReporter aggregates resources of all Elastic components managed by the operator +// and reports them in a config map in the form of licensing information +type ResourceReporter struct { + aggregator Aggregator + licensingResolver LicensingResolver +} + +// NewResourceReporter returns a new ResourceReporter +func NewResourceReporter(client client.Client) ResourceReporter { + c := k8s.WrapClient(client) + return ResourceReporter{ + aggregator: Aggregator{ + client: c, + }, + licensingResolver: LicensingResolver{ + client: c, + }, + } +} + +// Start starts to report the licensing information repeatedly at regular intervals +func (r ResourceReporter) Start(operatorNs string, refreshPeriod time.Duration) { + // report once as soon as possible to not wait the first tick + err := r.Report(operatorNs) + if err != nil { + log.Error(err, "Failed to report licensing information") + } + + ticker := time.NewTicker(refreshPeriod) + for range ticker.C { + err := r.Report(operatorNs) + if err != nil { + log.Error(err, "Failed to report licensing information") + } + } +} + +// Report reports the licensing information in a config map +func (r ResourceReporter) Report(operatorNs string) error { + licensingInfo, err := r.Get() + if err != nil { + return err + } + + return r.licensingResolver.Save(licensingInfo, operatorNs) +} + +// Get aggregates managed resources and returns the licensing information +func (r ResourceReporter) Get() (LicensingInfo, error) { + totalMemory, err := r.aggregator.AggregateMemory() + if err != nil { + return LicensingInfo{}, err + } + + return r.licensingResolver.ToInfo(totalMemory) +} diff --git a/pkg/license/reporter_test.go b/pkg/license/reporter_test.go new file mode 100644 index 0000000000..0cc7b665bd --- /dev/null +++ b/pkg/license/reporter_test.go @@ -0,0 +1,222 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package license + +import ( + "context" + "testing" + "time" + + apmv1 "github.com/elastic/cloud-on-k8s/pkg/apis/apm/v1" + esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1" + kbv1 "github.com/elastic/cloud-on-k8s/pkg/apis/kibana/v1" + essettings "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/settings" + kbconfig "github.com/elastic/cloud-on-k8s/pkg/controller/kibana/config" + "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +func Test_Get(t *testing.T) { + es := esv1.Elasticsearch{ + Spec: esv1.ElasticsearchSpec{ + NodeSets: []esv1.NodeSet{{ + Count: 10, + }}, + }, + } + licensingInfo, err := NewResourceReporter(k8s.FakeClient(&es)).Get() + assert.NoError(t, err) + assert.Equal(t, "21.47GB", licensingInfo.TotalManagedMemory) + assert.Equal(t, "1", licensingInfo.EnterpriseResourceUnits) + + es = esv1.Elasticsearch{ + Spec: esv1.ElasticsearchSpec{ + NodeSets: []esv1.NodeSet{{ + Count: 100, + PodTemplate: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: esv1.ElasticsearchContainerName, + Resources: corev1.ResourceRequirements{ + Limits: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceMemory: resource.MustParse("6Gi"), + }, + }, + }, + }, + }, + }, + }}, + }, + } + licensingInfo, err = NewResourceReporter(k8s.FakeClient(&es)).Get() + assert.NoError(t, err) + assert.Equal(t, "644.25GB", licensingInfo.TotalManagedMemory) + assert.Equal(t, "11", licensingInfo.EnterpriseResourceUnits) + + es = esv1.Elasticsearch{ + Spec: esv1.ElasticsearchSpec{ + NodeSets: []esv1.NodeSet{{ + Count: 10, + PodTemplate: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: esv1.ElasticsearchContainerName, + Env: []corev1.EnvVar{{ + Name: "ES_JAVA_OPTS", Value: "-Xms8G -Xmx8G", + }}, + }, + }, + }, + }, + }}, + }, + } + licensingInfo, err = NewResourceReporter(k8s.FakeClient(&es)).Get() + assert.NoError(t, err) + assert.Equal(t, "171.80GB", licensingInfo.TotalManagedMemory) + assert.Equal(t, "3", licensingInfo.EnterpriseResourceUnits) + + es = esv1.Elasticsearch{ + Spec: esv1.ElasticsearchSpec{ + NodeSets: []esv1.NodeSet{{ + Count: 10, + PodTemplate: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: esv1.ElasticsearchContainerName, + Env: []corev1.EnvVar{{ + Name: essettings.EnvEsJavaOpts, Value: "-Xms8G -Xmx8G", + }}, + }, + }, + }, + }, + }}, + }, + } + licensingInfo, err = NewResourceReporter(k8s.FakeClient(&es)).Get() + assert.NoError(t, err) + assert.Equal(t, "171.80GB", licensingInfo.TotalManagedMemory) + assert.Equal(t, "3", licensingInfo.EnterpriseResourceUnits) + + kb := kbv1.Kibana{ + Spec: kbv1.KibanaSpec{ + Count: 100, + }, + } + licensingInfo, err = NewResourceReporter(k8s.FakeClient(&kb)).Get() + assert.NoError(t, err) + assert.Equal(t, "107.37GB", licensingInfo.TotalManagedMemory) + assert.Equal(t, "2", licensingInfo.EnterpriseResourceUnits) + + kb = kbv1.Kibana{ + Spec: kbv1.KibanaSpec{ + Count: 100, + PodTemplate: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: kbv1.KibanaContainerName, + Resources: corev1.ResourceRequirements{ + Limits: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + }, + }, + }, + }, + }, + }, + } + licensingInfo, err = NewResourceReporter(k8s.FakeClient(&kb)).Get() + assert.NoError(t, err) + assert.Equal(t, "214.75GB", licensingInfo.TotalManagedMemory) + assert.Equal(t, "4", licensingInfo.EnterpriseResourceUnits) + + kb = kbv1.Kibana{ + Spec: kbv1.KibanaSpec{ + Count: 100, + PodTemplate: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: kbv1.KibanaContainerName, + Env: []corev1.EnvVar{{ + Name: kbconfig.EnvNodeOpts, Value: "--max-old-space-size=2048", + }}, + }, + }, + }, + }, + }, + } + licensingInfo, err = NewResourceReporter(k8s.FakeClient(&kb)).Get() + assert.NoError(t, err) + assert.Equal(t, "204.80GB", licensingInfo.TotalManagedMemory) + assert.Equal(t, "4", licensingInfo.EnterpriseResourceUnits) +} + +func Test_Start(t *testing.T) { + es := esv1.Elasticsearch{ + ObjectMeta: metav1.ObjectMeta{ + Name: "es-test", + }, + Spec: esv1.ElasticsearchSpec{NodeSets: []esv1.NodeSet{{Count: 40}}}} + kb := kbv1.Kibana{Spec: kbv1.KibanaSpec{Count: 2}} + apm := apmv1.ApmServer{Spec: apmv1.ApmServerSpec{Count: 2}} + k8sClient := k8s.FakeClient(&es, &kb, &apm) + operatorNs := "test-system" + refreshPeriod := 1 * time.Second + waitFor := 10 * refreshPeriod + tick := refreshPeriod / 2 + + // start the resource reporter + go NewResourceReporter(k8sClient).Start(operatorNs, refreshPeriod) + + // check that the licensing config map exists + assert.Eventually(t, func() bool { + var cm corev1.ConfigMap + err := k8sClient.Get(context.Background(), types.NamespacedName{ + Namespace: operatorNs, + Name: licensingCfgMapName, + }, &cm) + if err != nil { + return false + } + return cm.Data["timestamp"] != "" && + cm.Data["eck_license_level"] == defaultOperatorLicenseLevel && + cm.Data["enterprise_resource_units"] == "2" && + cm.Data["total_managed_memory"] == "89.12GB" + }, waitFor, tick) + + // increase the Elasticsearch nodes count + es.Spec.NodeSets[0].Count = 80 + err := k8sClient.Update(context.Background(), &es) + assert.NoError(t, err) + + // check that the licensing config map has been updated + assert.Eventually(t, func() bool { + var cm corev1.ConfigMap + err := k8sClient.Get(context.Background(), types.NamespacedName{ + Namespace: operatorNs, + Name: licensingCfgMapName, + }, &cm) + if err != nil { + return false + } + return cm.Data["timestamp"] != "" && + cm.Data["eck_license_level"] == defaultOperatorLicenseLevel && + cm.Data["enterprise_resource_units"] == "3" && + cm.Data["total_managed_memory"] == "175.02GB" + }, waitFor, tick) +}