Skip to content

Commit

Permalink
Resource aggregation for licensing count (elastic#2277)
Browse files Browse the repository at this point in the history
Start a reporter in a goroutine that repeatedly (every 2 minutes) aggregates the total memory of
all Elastic managed components by the operator and reports it in the form of a licensing information in a config map.

Example of the content of the config map:

  > kubectl -n elastic-system get cm elastic-licensing -o json | jq .data
  {
    "eck_license_level": "enterprise",
    "enterprise_resource_units": "1",
    "timestamp": "2019-12-20T18:18:31+01:00",
    "total_managed_memory": "12.88GB"
  }

Notes:
- "Enterprise resource units" is an Elastic unit calculated with the total memory under management divided by 64GB.
- The ECK license level can be basic, trial-enterprise or entreprise.
  • Loading branch information
thbkrkr authored and mjmbischoff committed Jan 13, 2020
1 parent 9f76d85 commit ea652ce
Show file tree
Hide file tree
Showing 17 changed files with 1,051 additions and 33 deletions.
68 changes: 68 additions & 0 deletions cmd/licensing-info/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package main

import (
"encoding/json"
"fmt"
"log"

eckscheme "github.com/elastic/cloud-on-k8s/pkg/controller/common/scheme"
"github.com/elastic/cloud-on-k8s/pkg/license"
"k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp" // auth on gke
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
)

// Simple program that returns the licensing information, including the total memory of all Elastic managed components by
// the operator and its equivalent in "Enterprise Resource Units".
//
// The main objective of its existence is to show a use of the ResourceReporter and also to propose an alternative to
// immediately retrieve the licensing information.
//
// Example of use:
//
// > go run cmd/licensing-info/main.go
// {
// "timestamp": "2019-12-17T11:56:02+01:00",
// "license_level": "basic",
// "memory": "5.37GB",
// "enterprise_resource_units": "1"
// }
//

func main() {
licensingInfo, err := license.NewResourceReporter(newK8sClient()).Get()
if err != nil {
log.Fatal(err, "Failed to get licensing info")
}

bytes, err := json.Marshal(licensingInfo)
if err != nil {
log.Fatal(err, "Failed to marshal licensing info")
}

fmt.Print(string(bytes))
}

func newK8sClient() client.Client {
cfg, err := config.GetConfig()
if err != nil {
log.Fatal(err, "Failed to get a Kubernetes config")
}

err = eckscheme.SetupScheme()
if err != nil {
log.Fatal(err, "Failed to set up the ECK scheme")
}

c, err := client.New(cfg, client.Options{Scheme: scheme.Scheme})
if err != nil {
log.Fatal(err, "Failed to create a new Kubernetes client")
}

return c
}
11 changes: 9 additions & 2 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@ import (
"strings"
"time"

"k8s.io/client-go/rest"

// allow gcp authentication
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

"github.com/spf13/cobra"
"github.com/spf13/viper"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -45,6 +44,7 @@ import (
"github.com/elastic/cloud-on-k8s/pkg/controller/webhook"
"github.com/elastic/cloud-on-k8s/pkg/dev"
"github.com/elastic/cloud-on-k8s/pkg/dev/portforward"
licensing "github.com/elastic/cloud-on-k8s/pkg/license"
"github.com/elastic/cloud-on-k8s/pkg/utils/net"
)

Expand Down Expand Up @@ -321,6 +321,13 @@ func execute() {
log.Error(err, "unable to create controller", "controller", "LicenseTrial")
os.Exit(1)
}

go func() {
time.Sleep(10 * time.Second) // wait some arbitrary time for the manager to start
mgr.GetCache().WaitForCacheSync(nil) // wait until k8s client cache is initialized
r := licensing.NewResourceReporter(mgr.GetClient())
r.Start(operatorNamespace, licensing.ResourceReporterFrequency)
}()
}

log.Info("Starting the manager", "uuid", operatorInfo.OperatorUUID,
Expand Down
19 changes: 11 additions & 8 deletions pkg/controller/apmserver/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,17 @@ const (
ConfigVolumePath = ApmBaseDir + "/config"
)

var DefaultResources = corev1.ResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceMemory: resource.MustParse("512Mi"),
},
Limits: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceMemory: resource.MustParse("512Mi"),
},
}
var (
DefaultMemoryLimits = resource.MustParse("512Mi")
DefaultResources = corev1.ResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceMemory: DefaultMemoryLimits,
},
Limits: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceMemory: DefaultMemoryLimits,
},
}
)

func readinessProbe(tls bool) corev1.Probe {
scheme := corev1.URISchemeHTTP
Expand Down
40 changes: 34 additions & 6 deletions pkg/controller/common/license/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package license

import (
"sort"
"time"

"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"
Expand All @@ -14,6 +15,7 @@ import (
)

type Checker interface {
CurrentEnterpriseLicense() (*EnterpriseLicense, error)
EnterpriseFeaturesEnabled() (bool, error)
Valid(l EnterpriseLicense) (bool, error)
}
Expand Down Expand Up @@ -45,27 +47,49 @@ func (lc *checker) publicKeyFor(l EnterpriseLicense) ([]byte, error) {
}, &signatureSec)
}

// EnterpriseFeaturesEnabled returns true if a valid enterprise license is installed.
func (lc *checker) EnterpriseFeaturesEnabled() (bool, error) {
// CurrentEnterpriseLicense returns the currently valid Enterprise license if installed.
func (lc *checker) CurrentEnterpriseLicense() (*EnterpriseLicense, error) {
licenses, err := EnterpriseLicenses(lc.k8sClient)
if err != nil {
return false, errors.Wrap(err, "failed to list enterprise licenses")
return nil, errors.Wrap(err, "failed to list enterprise licenses")
}

sort.Slice(licenses, func(i, j int) bool {
t1, t2 := EnterpriseLicenseTypeOrder[licenses[i].License.Type], EnterpriseLicenseTypeOrder[licenses[j].License.Type]
if t1 != t2 { // sort by type (first the most features)
return t1 > t2
}
// and by expiry date (first which expires last)
return licenses[i].License.ExpiryDateInMillis > licenses[j].License.ExpiryDateInMillis
})

// pick the first valid Enterprise license in the sorted slice
for _, l := range licenses {
valid, err := lc.Valid(l)
if err != nil {
return false, err
return nil, err
}
if valid {
return true, nil
return &l, nil
}
}
return false, nil
return nil, nil
}

// EnterpriseFeaturesEnabled returns true if a valid enterprise license is installed.
func (lc *checker) EnterpriseFeaturesEnabled() (bool, error) {
license, err := lc.CurrentEnterpriseLicense()
if err != nil {
return false, err
}
return license != nil, nil
}

// Valid returns true if the given Enterprise license is valid or an error if any.
func (lc *checker) Valid(l EnterpriseLicense) (bool, error) {
if l.IsTrial() {
return true, nil
}
pk, err := lc.publicKeyFor(l)
if err != nil {
return false, errors.Wrap(err, "while loading signature secret")
Expand All @@ -83,6 +107,10 @@ func (lc *checker) Valid(l EnterpriseLicense) (bool, error) {

type MockChecker struct{}

func (MockChecker) CurrentEnterpriseLicense() (*EnterpriseLicense, error) {
return &EnterpriseLicense{}, nil
}

func (MockChecker) EnterpriseFeaturesEnabled() (bool, error) {
return true, nil
}
Expand Down
100 changes: 100 additions & 0 deletions pkg/controller/common/license/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,103 @@ func TestChecker_EnterpriseFeaturesEnabled(t *testing.T) {
})
}
}

func Test_CurrentEnterpriseLicense(t *testing.T) {
privKey, err := x509.ParsePKCS1PrivateKey(privateKeyFixture)
require.NoError(t, err)

validLicenseFixture := licenseFixtureV3
validLicenseFixture.License.ExpiryDateInMillis = chrono.ToMillis(time.Now().Add(1 * time.Hour))
signatureBytes, err := NewSigner(privKey).Sign(validLicenseFixture)
require.NoError(t, err)
validLicense := asRuntimeObjects(validLicenseFixture, signatureBytes)

validTrialLicenseFixture := trialLicenseFixture
validTrialLicenseFixture.License.ExpiryDateInMillis = chrono.ToMillis(time.Now().Add(1 * time.Hour))
trialSignatureBytes, err := NewSigner(privKey).Sign(validTrialLicenseFixture)
require.NoError(t, err)
validTrialLicense := asRuntimeObjects(validTrialLicenseFixture, trialSignatureBytes)

type fields struct {
initialObjects []runtime.Object
operatorNamespace string
publicKey []byte
}

tests := []struct {
name string
fields fields
want bool
wantErr bool
wantType OperatorLicenseType
}{
{
name: "get valid enterprise license: OK",
fields: fields{
initialObjects: validLicense,
operatorNamespace: "test-system",
publicKey: publicKeyBytesFixture(t),
},
want: true,
wantType: LicenseTypeEnterprise,
wantErr: false,
},
{
name: "get valid trial enterprise license: OK",
fields: fields{
initialObjects: validTrialLicense,
operatorNamespace: "test-system",
publicKey: publicKeyBytesFixture(t),
},
want: true,
wantType: LicenseTypeEnterpriseTrial,
wantErr: false,
},
{
name: "get valid enterprise license among two licenses: OK",
fields: fields{
initialObjects: append(validLicense, validTrialLicense...),
operatorNamespace: "test-system",
publicKey: publicKeyBytesFixture(t),
},
want: true,
wantType: LicenseTypeEnterprise,
wantErr: false,
},
{
name: "no license: OK",
fields: fields{
initialObjects: []runtime.Object{},
operatorNamespace: "test-system",
},
want: false,
wantErr: false,
},
{
name: "invalid public key: FAIL",
fields: fields{
initialObjects: validLicense,
operatorNamespace: "test-system",
publicKey: []byte("not a public key"),
},
want: false,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
lc := &checker{
k8sClient: k8s.WrappedFakeClient(tt.fields.initialObjects...),
operatorNamespace: tt.fields.operatorNamespace,
publicKey: tt.fields.publicKey,
}
got, err := lc.CurrentEnterpriseLicense()
if (err != nil) != tt.wantErr {
t.Errorf("Checker.CurrentEnterpriseLicense() err = %v, wantErr %v", err, tt.wantErr)
}
if tt.want != (got != nil) {
t.Errorf("Checker.CurrentEnterpriseLicense() = %v, want %v", got, tt.want)
}
})
}
}
10 changes: 9 additions & 1 deletion pkg/controller/common/license/fixtures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"crypto/rsa"
"crypto/x509"
"encoding/json"
"fmt"
"testing"

"github.com/elastic/cloud-on-k8s/pkg/controller/common"
Expand Down Expand Up @@ -48,6 +49,12 @@ var (
}
)

var trialLicenseFixture = EnterpriseLicense{
License: LicenseSpec{
Type: LicenseTypeEnterpriseTrial,
},
}

func withSignature(l EnterpriseLicense, sig []byte) EnterpriseLicense {
l.License.Signature = string(sig)
return l
Expand All @@ -58,11 +65,12 @@ func asRuntimeObjects(l EnterpriseLicense, sig []byte) []runtime.Object {
if err != nil {
panic(err)
}

return []runtime.Object{
&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-system",
Name: "test-license",
Name: fmt.Sprintf("test-%s-license", string(l.License.Type)),
Labels: map[string]string{
common.TypeLabelName: Type,
LicenseLabelScope: string(LicenseScopeOperator),
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/common/license/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ type LicenseSpec struct {
Version int // not marshalled but part of the signature
}

// EnterpriseLicenseTypeOrder license types mapped to ints in increasing order of feature sets for sorting purposes.
var EnterpriseLicenseTypeOrder = map[OperatorLicenseType]int{
LicenseTypeEnterpriseTrial: 0,
LicenseTypeEnterprise: 1,
}

// StartTime is the date as of which this license is valid.
func (l EnterpriseLicense) StartTime() time.Time {
return time.Unix(0, l.License.StartDateInMillis*int64(time.Millisecond))
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/elasticsearch/nodespec/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,18 @@ var (
{Name: "transport", ContainerPort: network.TransportPort, Protocol: corev1.ProtocolTCP},
}

DefaultMemoryLimits = resource.MustParse("2Gi")
// DefaultResources for the Elasticsearch container. The JVM default heap size is 1Gi, so we
// request at least 2Gi for the container to make sure ES can work properly.
// Not applying this minimum default would make ES randomly crash (OOM) on small machines.
// Similarly, we apply a default memory limit of 2Gi, to ensure the Pod isn't the first one to get evicted.
// No CPU requirement is set by default.
DefaultResources = corev1.ResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceMemory: resource.MustParse("2Gi"),
corev1.ResourceMemory: DefaultMemoryLimits,
},
Limits: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceMemory: resource.MustParse("2Gi"),
corev1.ResourceMemory: DefaultMemoryLimits,
},
}
)
Expand Down
Loading

0 comments on commit ea652ce

Please sign in to comment.