diff --git a/operators/config/crds/elasticsearch_v1alpha1_elasticsearch.yaml b/operators/config/crds/elasticsearch_v1alpha1_elasticsearch.yaml index 14c27efa02..70c9306d9a 100644 --- a/operators/config/crds/elasticsearch_v1alpha1_elasticsearch.yaml +++ b/operators/config/crds/elasticsearch_v1alpha1_elasticsearch.yaml @@ -122,7 +122,7 @@ spec: name: description: Name is a logical name for this set of nodes. Used as a part of the managed Elasticsearch node.name setting. - maxLength: 12 + maxLength: 19 pattern: '[a-zA-Z0-9-]+' type: string nodeCount: diff --git a/operators/config/samples/apm/apm_es_kibana.yaml b/operators/config/samples/apm/apm_es_kibana.yaml index 371cb4ea15..d0a12c0557 100644 --- a/operators/config/samples/apm/apm_es_kibana.yaml +++ b/operators/config/samples/apm/apm_es_kibana.yaml @@ -7,7 +7,7 @@ metadata: spec: version: "7.1.0" nodes: - - name: all + - name: default nodeCount: 3 --- apiVersion: apm.k8s.elastic.co/v1alpha1 diff --git a/operators/config/samples/elasticsearch/elasticsearch.yaml b/operators/config/samples/elasticsearch/elasticsearch.yaml index 457197d319..2264698d39 100644 --- a/operators/config/samples/elasticsearch/elasticsearch.yaml +++ b/operators/config/samples/elasticsearch/elasticsearch.yaml @@ -6,7 +6,7 @@ metadata: spec: version: "7.1.0" nodes: - - name: all + - name: default config: # most Elasticsearch configuration parameters are possible to set, e.g: node.attr.attr_name: attr_value diff --git a/operators/config/samples/elasticsearch/elasticsearch_local_volume.yaml b/operators/config/samples/elasticsearch/elasticsearch_local_volume.yaml index b16f17010f..6ed411ac9d 100644 --- a/operators/config/samples/elasticsearch/elasticsearch_local_volume.yaml +++ b/operators/config/samples/elasticsearch/elasticsearch_local_volume.yaml @@ -7,7 +7,7 @@ metadata: spec: version: "7.1.0" nodes: - - name: all + - name: default nodeCount: 3 volumeClaimTemplates: - metadata: diff --git a/operators/config/samples/kibana/kibana_es.yaml b/operators/config/samples/kibana/kibana_es.yaml index db0f8d5498..44caaf58dd 100644 --- a/operators/config/samples/kibana/kibana_es.yaml +++ b/operators/config/samples/kibana/kibana_es.yaml @@ -6,7 +6,7 @@ metadata: spec: version: "7.1.0" nodes: - - name: all + - name: default nodeCount: 1 --- apiVersion: kibana.k8s.elastic.co/v1alpha1 diff --git a/operators/pkg/apis/elasticsearch/v1alpha1/elasticsearch_types.go b/operators/pkg/apis/elasticsearch/v1alpha1/elasticsearch_types.go index 7ff6c3e6d1..913f53510e 100644 --- a/operators/pkg/apis/elasticsearch/v1alpha1/elasticsearch_types.go +++ b/operators/pkg/apis/elasticsearch/v1alpha1/elasticsearch_types.go @@ -66,7 +66,7 @@ func (es ElasticsearchSpec) NodeCount() int32 { type NodeSpec struct { // Name is a logical name for this set of nodes. Used as a part of the managed Elasticsearch node.name setting. // +kubebuilder:validation:Pattern=[a-zA-Z0-9-]+ - // +kubebuilder:validation:MaxLength=12 + // +kubebuilder:validation:MaxLength=19 Name string `json:"name"` // Config represents Elasticsearch configuration. diff --git a/operators/pkg/controller/elasticsearch/driver/default.go b/operators/pkg/controller/elasticsearch/driver/default.go index 15f076de28..b162617920 100644 --- a/operators/pkg/controller/elasticsearch/driver/default.go +++ b/operators/pkg/controller/elasticsearch/driver/default.go @@ -8,11 +8,15 @@ import ( "crypto/x509" "fmt" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" controller "sigs.k8s.io/controller-runtime/pkg/reconcile" + "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/migration" + esvolume "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/volume" + "github.com/elastic/cloud-on-k8s/operators/pkg/apis/elasticsearch/v1alpha1" "github.com/elastic/cloud-on-k8s/operators/pkg/controller/common" "github.com/elastic/cloud-on-k8s/operators/pkg/controller/common/events" @@ -39,7 +43,6 @@ import ( "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/user" esversion "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/version" "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/version/version6" - esvolume "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/volume" "github.com/elastic/cloud-on-k8s/operators/pkg/utils/k8s" ) @@ -212,7 +215,6 @@ func (d *defaultDriver) Reconcile( return results.WithError(err) } - namespacedName := k8s.ExtractNamespacedName(&es) // //// There might be some ongoing creations and deletions our k8s client cache //// hasn't seen yet. In such case, requeue until we are in-sync. @@ -323,68 +325,11 @@ func (d *defaultDriver) Reconcile( ) } - actualStatefulSets, err := sset.RetrieveActualStatefulSets(d.Client, namespacedName) - if err != nil { - return results.WithError(err) - } - - nodeSpecResources, err := nodespec.BuildExpectedResources(es, podTemplateSpecBuilder) - if err != nil { - return results.WithError(err) - } - - // TODO: handle zen2 initial master nodes more cleanly - // should be empty once cluster is bootstraped - var initialMasters []string - // TODO: refactor/move - for _, res := range nodeSpecResources { - cfg, err := res.Config.Unpack() - if err != nil { - return results.WithError(err) - } - if cfg.Node.Master { - for i := 0; i < int(*res.StatefulSet.Spec.Replicas); i++ { - initialMasters = append(initialMasters, fmt.Sprintf("%s-%d", res.StatefulSet.Name, i)) - } - } - } - for i := range nodeSpecResources { - if err := nodeSpecResources[i].Config.SetStrings(settings.ClusterInitialMasterNodes, initialMasters...); err != nil { - return results.WithError(err) - } - } - - // create or update all expected ssets - // TODO: safe upgrades - for _, nodeSpec := range nodeSpecResources { - if err := settings.ReconcileConfig(d.Client, es, nodeSpec.StatefulSet.Name, nodeSpec.Config); err != nil { - return results.WithError(err) - } - if _, err := common.ReconcileService(d.Client, d.Scheme, &nodeSpec.HeadlessService, &es); err != nil { - return results.WithError(err) - } - if err := sset.ReconcileStatefulSet(d.Client, d.Scheme, es, nodeSpec.StatefulSet); err != nil { - return results.WithError(err) - } - } - - // delete all unexpected ssets - for _, actual := range actualStatefulSets { - if _, shouldExist := nodeSpecResources.StatefulSets().GetByName(actual.Name); !shouldExist { - // TODO: safe node removal - if err := d.Client.Delete(&actual); err != nil { - return results.WithError(err) - } - } + res = d.reconcileNodeSpecs(es, podTemplateSpecBuilder, esClient, observedState) + if results.WithResults(res).HasError() { + return results } - // TODO: - // - safe sset replacement - // - safe node removal (data migration) - // - safe node upgrade (rollingUpdate.Partition + shards allocation) - // - change budget - // - zen1, zen2 - // //// Call Zen1 setting updater before new masters are created to ensure that they immediately start with the //// correct value for minimum_master_nodes. @@ -568,6 +513,166 @@ func removePodFromList(pods []corev1.Pod, pod corev1.Pod) []corev1.Pod { return pods } +func (d *defaultDriver) reconcileNodeSpecs( + es v1alpha1.Elasticsearch, + podSpecBuilder esversion.PodTemplateSpecBuilder, + esClient esclient.Client, + observedState observer.State, +) *reconciler.Results { + results := &reconciler.Results{} + + actualStatefulSets, err := sset.RetrieveActualStatefulSets(d.Client, k8s.ExtractNamespacedName(&es)) + if err != nil { + return results.WithError(err) + } + + nodeSpecResources, err := nodespec.BuildExpectedResources(es, podSpecBuilder) + if err != nil { + return results.WithError(err) + } + + // TODO: handle zen2 initial master nodes more cleanly + // should be empty once cluster is bootstraped + var initialMasters []string + // TODO: refactor/move + for _, res := range nodeSpecResources { + cfg, err := res.Config.Unpack() + if err != nil { + return results.WithError(err) + } + if cfg.Node.Master { + for i := 0; i < int(*res.StatefulSet.Spec.Replicas); i++ { + initialMasters = append(initialMasters, fmt.Sprintf("%s-%d", res.StatefulSet.Name, i)) + } + } + } + for i := range nodeSpecResources { + if err := nodeSpecResources[i].Config.SetStrings(settings.ClusterInitialMasterNodes, initialMasters...); err != nil { + return results.WithError(err) + } + } + + // Phase 1: apply expected StatefulSets resources, but don't scale down. + // The goal is to: + // 1. scale sset up (eg. go from 3 to 5 replicas). + // 2. apply configuration changes on the sset resource, to be used for future pods creation/recreation, + // but do not rotate pods yet. + // 3. do **not** apply replicas scale down, otherwise nodes would be deleted before + // we handle a clean deletion. + for _, nodeSpecRes := range nodeSpecResources { + // always reconcile config (will apply to new & recreated pods) + if err := settings.ReconcileConfig(d.Client, es, nodeSpecRes.StatefulSet.Name, nodeSpecRes.Config); err != nil { + return results.WithError(err) + } + if _, err := common.ReconcileService(d.Client, d.Scheme, &nodeSpecRes.HeadlessService, &es); err != nil { + return results.WithError(err) + } + ssetToApply := *nodeSpecRes.StatefulSet.DeepCopy() + actual, exists := actualStatefulSets.GetByName(ssetToApply.Name) + if exists && sset.Replicas(ssetToApply) < sset.Replicas(actual) { + // sset needs to be scaled down + // update the sset to use the new spec but don't scale replicas down for now + ssetToApply.Spec.Replicas = actual.Spec.Replicas + } + if err := sset.ReconcileStatefulSet(d.Client, d.Scheme, es, ssetToApply); err != nil { + return results.WithError(err) + } + } + + // Phase 2: handle sset scale down. + // We want to safely remove nodes from the cluster, either because the sset requires less replicas, + // or because it should be removed entirely. + for i, actual := range actualStatefulSets { + expected, shouldExist := nodeSpecResources.StatefulSets().GetByName(actual.Name) + switch { + // stateful set removal + case !shouldExist: + target := int32(0) + removalResult := d.scaleStatefulSetDown(&actualStatefulSets[i], target, esClient, observedState) + results.WithResults(removalResult) + if removalResult.HasError() { + return results + } + // stateful set downscale + case actual.Spec.Replicas != nil && sset.Replicas(expected) < sset.Replicas(actual): + target := sset.Replicas(expected) + downscaleResult := d.scaleStatefulSetDown(&actualStatefulSets[i], target, esClient, observedState) + if downscaleResult.HasError() { + return results + } + } + } + + // TODO: + // - safe node upgrade (rollingUpdate.Partition + shards allocation) + // - change budget + // - zen1, zen2 + return results +} + +func (d *defaultDriver) scaleStatefulSetDown( + statefulSet *appsv1.StatefulSet, + targetReplicas int32, + esClient esclient.Client, + observedState observer.State, +) *reconciler.Results { + results := &reconciler.Results{} + logger := log.WithValues("statefulset", k8s.ExtractNamespacedName(statefulSet)) + + if sset.Replicas(*statefulSet) == 0 && targetReplicas == 0 { + // we don't expect any new replicas in this statefulset, remove it + logger.Info("Deleting statefulset") + if err := d.Client.Delete(statefulSet); err != nil { + return results.WithError(err) + } + } + // copy the current replicas, to be decremented with nodes to remove + initialReplicas := sset.Replicas(*statefulSet) + updatedReplicas := initialReplicas + + // leaving nodes names can be built from StatefulSet name and ordinals + // nodes are ordered by highest ordinal first + var leavingNodes []string + for i := initialReplicas - 1; i > targetReplicas-1; i-- { + leavingNodes = append(leavingNodes, sset.PodName(statefulSet.Name, int(i))) + } + + // TODO: don't remove last master/last data nodes? + // TODO: detect cases where data migration cannot happen since no nodes to host shards? + + // migrate data away from these nodes before removing them + logger.V(1).Info("Migrating data away from nodes", "nodes", leavingNodes) + if err := migration.MigrateData(esClient, leavingNodes); err != nil { + return results.WithError(err) + } + + for _, node := range leavingNodes { + if migration.IsMigratingData(observedState, node, leavingNodes) { + // data migration not over yet: schedule a requeue + logger.V(1).Info("Data migration not over yet, skipping node deletion", "node", node) + results.WithResult(defaultRequeue) + // no need to check other nodes since we remove them in order and this one isn't ready anyway + break + } + // data migration over: allow pod to be removed + updatedReplicas-- + } + + if updatedReplicas != initialReplicas { + // update cluster coordination settings to account for nodes deletion + // TODO: update zen1/zen2 + + // trigger deletion of nodes whose data migration is over + logger.V(1).Info("Scaling replicas down", "from", initialReplicas, "to", updatedReplicas) + statefulSet.Spec.Replicas = &updatedReplicas + if err := d.Client.Update(statefulSet); err != nil { + return results.WithError(err) + } + } + + return nil +} + // //// calculateChanges calculates the changes we'd need to perform to go from the current cluster configuration to the //// desired one. diff --git a/operators/pkg/controller/elasticsearch/elasticsearch_controller.go b/operators/pkg/controller/elasticsearch/elasticsearch_controller.go index 40b2589f3c..a863144ff6 100644 --- a/operators/pkg/controller/elasticsearch/elasticsearch_controller.go +++ b/operators/pkg/controller/elasticsearch/elasticsearch_controller.go @@ -9,6 +9,19 @@ import ( "sync/atomic" "time" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" + "sigs.k8s.io/controller-runtime/pkg/source" + elasticsearchv1alpha1 "github.com/elastic/cloud-on-k8s/operators/pkg/apis/elasticsearch/v1alpha1" "github.com/elastic/cloud-on-k8s/operators/pkg/controller/common" "github.com/elastic/cloud-on-k8s/operators/pkg/controller/common/certificates/http" @@ -25,16 +38,6 @@ import ( "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/settings" "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/validation" "github.com/elastic/cloud-on-k8s/operators/pkg/utils/k8s" - corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/record" - "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" - "sigs.k8s.io/controller-runtime/pkg/source" ) const name = "elasticsearch-controller" @@ -87,29 +90,36 @@ func addWatches(c controller.Controller, r *ReconcileElasticsearch) error { return err } - // Watch pods - if err := c.Watch(&source.Kind{Type: &corev1.Pod{}}, r.dynamicWatches.Pods); err != nil { + // Watch StatefulSets + if err := c.Watch( + &source.Kind{Type: &appsv1.StatefulSet{}}, &handler.EnqueueRequestForOwner{ + IsController: true, + OwnerType: &elasticsearchv1alpha1.Elasticsearch{}, + }, + ); err != nil { return err } - if err := r.dynamicWatches.Pods.AddHandlers( - // trigger reconciliation loop on ES pods owned by this controller - &watches.OwnerWatch{ - EnqueueRequestForOwner: handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &elasticsearchv1alpha1.Elasticsearch{}, - }, - }, - // Reconcile pods expectations. - // This does not technically need to be part of a dynamic watch, since it will - // stay there forever (nothing dynamic here). - // Turns out our dynamic watch mechanism happens to be a pretty nice way to - // setup multiple "static" handlers for a single watch. - watches.NewExpectationsWatch( - "pods-expectations", - r.podsExpectations, - // retrieve cluster name from pod labels - label.ClusterFromResourceLabels, - )); err != nil { + + // Watch pods belonging to ES clusters + if err := c.Watch(&source.Kind{Type: &corev1.Pod{}}, + &handler.EnqueueRequestsFromMapFunc{ + ToRequests: handler.ToRequestsFunc( + func(object handler.MapObject) []reconcile.Request { + labels := object.Meta.GetLabels() + clusterName, isSet := labels[label.ClusterNameLabelName] + if !isSet { + return nil + } + return []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: object.Meta.GetNamespace(), + Name: clusterName, + }, + }, + } + }), + }); err != nil { return err } diff --git a/operators/pkg/controller/elasticsearch/migration/migrate_data.go b/operators/pkg/controller/elasticsearch/migration/migrate_data.go index 866e677e41..885c5a84e0 100644 --- a/operators/pkg/controller/elasticsearch/migration/migrate_data.go +++ b/operators/pkg/controller/elasticsearch/migration/migrate_data.go @@ -8,8 +8,6 @@ import ( "context" "strings" - corev1 "k8s.io/api/core/v1" - "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/client" "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/observer" ) @@ -66,16 +64,16 @@ func nodeIsMigratingData(nodeName string, shards []client.Shard, exclusions map[ // IsMigratingData looks only at the presence of shards on a given node // and checks if there is at least one other copy of the shard in the cluster // that is started and not relocating. -func IsMigratingData(state observer.State, pod corev1.Pod, exclusions []corev1.Pod) bool { +func IsMigratingData(state observer.State, podName string, exclusions []string) bool { clusterState := state.ClusterState - if clusterState.IsEmpty() { + if clusterState == nil || clusterState.IsEmpty() { return true // we don't know if the request timed out or the cluster has not formed yet } excludedNodes := make(map[string]struct{}, len(exclusions)) - for _, n := range exclusions { - excludedNodes[n.Name] = struct{}{} + for _, name := range exclusions { + excludedNodes[name] = struct{}{} } - return nodeIsMigratingData(pod.Name, clusterState.GetShards(), excludedNodes) + return nodeIsMigratingData(podName, clusterState.GetShards(), excludedNodes) } // AllocationSettings captures Elasticsearch API calls around allocation filtering. diff --git a/operators/pkg/controller/elasticsearch/migration/migrate_data_test.go b/operators/pkg/controller/elasticsearch/migration/migrate_data_test.go index a02c627a06..b8f18aaec1 100644 --- a/operators/pkg/controller/elasticsearch/migration/migrate_data_test.go +++ b/operators/pkg/controller/elasticsearch/migration/migrate_data_test.go @@ -8,10 +8,10 @@ import ( "context" "testing" + "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/client" + "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/observer" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - - "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/client" ) func TestEnoughRedundancy(t *testing.T) { @@ -128,3 +128,53 @@ func TestMigrateData(t *testing.T) { assert.Contains(t, esClient.getAndReset(), tt.want) } } + +func TestIsMigratingData(t *testing.T) { + type args struct { + state observer.State + podName string + exclusions []string + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "cluster state is nil", + args: args{ + state: observer.State{ClusterState: nil}, + podName: "pod", + exclusions: nil, + }, + want: true, + }, + { + name: "cluster state is empty", + args: args{ + state: observer.State{ClusterState: &client.ClusterState{}}, + podName: "pod", + exclusions: nil, + }, + want: true, + }, + { + name: "no data migration in progress", + args: args{ + state: observer.State{ClusterState: &client.ClusterState{ + ClusterName: "name", + }}, + podName: "pod", + exclusions: nil, + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := IsMigratingData(tt.args.state, tt.args.podName, tt.args.exclusions); got != tt.want { + t.Errorf("IsMigratingData() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/operators/pkg/controller/elasticsearch/nodespec/resources.go b/operators/pkg/controller/elasticsearch/nodespec/resources.go index 112afdc81d..847e98b73e 100644 --- a/operators/pkg/controller/elasticsearch/nodespec/resources.go +++ b/operators/pkg/controller/elasticsearch/nodespec/resources.go @@ -21,15 +21,14 @@ type Resources struct { StatefulSet appsv1.StatefulSet HeadlessService corev1.Service Config settings.CanonicalConfig - // TLS certs } type ResourcesList []Resources func (l ResourcesList) StatefulSets() sset.StatefulSetList { ssetList := make(sset.StatefulSetList, 0, len(l)) - for _, nodeSpec := range l { - ssetList = append(ssetList, nodeSpec.StatefulSet) + for _, resource := range l { + ssetList = append(ssetList, resource.StatefulSet) } return ssetList } diff --git a/operators/pkg/controller/elasticsearch/sset/getter.go b/operators/pkg/controller/elasticsearch/sset/getter.go new file mode 100644 index 0000000000..6771adb7e0 --- /dev/null +++ b/operators/pkg/controller/elasticsearch/sset/getter.go @@ -0,0 +1,17 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package sset + +import ( + appsv1 "k8s.io/api/apps/v1" +) + +// Replicas returns the replicas configured for this StatefulSet, or 0 if nil. +func Replicas(statefulSet appsv1.StatefulSet) int32 { + if statefulSet.Spec.Replicas != nil { + return *statefulSet.Spec.Replicas + } + return 0 +} diff --git a/operators/pkg/controller/elasticsearch/sset/pod.go b/operators/pkg/controller/elasticsearch/sset/pod.go new file mode 100644 index 0000000000..9af5d8074f --- /dev/null +++ b/operators/pkg/controller/elasticsearch/sset/pod.go @@ -0,0 +1,11 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package sset + +import "fmt" + +func PodName(ssetName string, ordinal int) string { + return fmt.Sprintf("%s-%d", ssetName, ordinal) +}