diff --git a/pkg/controller/elasticsearch/driver/bootstrap.go b/pkg/controller/elasticsearch/driver/bootstrap.go new file mode 100644 index 0000000000..ba492f90d0 --- /dev/null +++ b/pkg/controller/elasticsearch/driver/bootstrap.go @@ -0,0 +1,50 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package driver + +import ( + "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1alpha1" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/observer" + "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" +) + +const ( + // ClusterUUIDAnnotationName used to store the cluster UUID as an annotation when cluster has been bootstrapped. + ClusterUUIDAnnotationName = "elasticsearch.k8s.elastic.co/cluster-uuid" +) + +// AnnotatedForBootstrap returns true if the cluster has been annotated with the UUID already. +func AnnotatedForBootstrap(cluster v1alpha1.Elasticsearch) bool { + _, bootstrapped := cluster.Annotations[ClusterUUIDAnnotationName] + return bootstrapped +} + +func ReconcileClusterUUID(c k8s.Client, cluster *v1alpha1.Elasticsearch, observedState observer.State) error { + if AnnotatedForBootstrap(*cluster) { + // already annotated, nothing to do. + return nil + } + if clusterIsBootstrapped(observedState) { + // cluster bootstrapped but not annotated yet + return annotateWithUUID(cluster, observedState, c) + } + // cluster not bootstrapped yet + return nil +} + +// clusterIsBootstrapped returns true if the cluster has formed and has a UUID. +func clusterIsBootstrapped(observedState observer.State) bool { + return observedState.ClusterState != nil && len(observedState.ClusterState.ClusterUUID) > 0 +} + +// annotateWithUUID annotates the cluster with its UUID, to mark it as "bootstrapped". +func annotateWithUUID(cluster *v1alpha1.Elasticsearch, observedState observer.State, c k8s.Client) error { + log.Info("Annotating bootstrapped cluster with its UUID", "namespace", cluster.Namespace, "es_name", cluster.Name) + if cluster.Annotations == nil { + cluster.Annotations = make(map[string]string) + } + cluster.Annotations[ClusterUUIDAnnotationName] = observedState.ClusterState.ClusterUUID + return c.Update(cluster) +} diff --git a/pkg/controller/elasticsearch/driver/bootstrap_test.go b/pkg/controller/elasticsearch/driver/bootstrap_test.go new file mode 100644 index 0000000000..451f6d40c4 --- /dev/null +++ b/pkg/controller/elasticsearch/driver/bootstrap_test.go @@ -0,0 +1,92 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package driver + +import ( + "testing" + + "github.com/stretchr/testify/require" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1alpha1" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/observer" + "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/scheme" +) + +func bootstrappedES() *v1alpha1.Elasticsearch { + return &v1alpha1.Elasticsearch{ObjectMeta: metav1.ObjectMeta{Name: "cluster", Annotations: map[string]string{ClusterUUIDAnnotationName: "uuid"}}} +} + +func notBootstrappedES() *v1alpha1.Elasticsearch { + return &v1alpha1.Elasticsearch{ObjectMeta: metav1.ObjectMeta{Name: "cluster"}} +} + +func TestAnnotatedForBootstrap(t *testing.T) { + require.True(t, AnnotatedForBootstrap(*bootstrappedES())) + require.False(t, AnnotatedForBootstrap(*notBootstrappedES())) +} + +func Test_annotateWithUUID(t *testing.T) { + require.NoError(t, v1alpha1.AddToScheme(scheme.Scheme)) + + cluster := notBootstrappedES() + observedState := observer.State{ClusterState: &client.ClusterState{ClusterUUID: "cluster-uuid"}} + k8sClient := k8s.WrapClient(fake.NewFakeClient(cluster)) + + err := annotateWithUUID(cluster, observedState, k8sClient) + require.NoError(t, err) + require.True(t, AnnotatedForBootstrap(*cluster)) + + var retrieved v1alpha1.Elasticsearch + err = k8sClient.Get(k8s.ExtractNamespacedName(cluster), &retrieved) + require.NoError(t, err) + require.True(t, AnnotatedForBootstrap(retrieved)) +} + +func TestReconcileClusterUUID(t *testing.T) { + require.NoError(t, v1alpha1.AddToScheme(scheme.Scheme)) + tests := []struct { + name string + c k8s.Client + cluster *v1alpha1.Elasticsearch + observedState observer.State + wantCluster *v1alpha1.Elasticsearch + }{ + { + name: "already annotated", + cluster: bootstrappedES(), + wantCluster: bootstrappedES(), + }, + { + name: "not annotated, but not bootstrapped yet (cluster state empty)", + cluster: notBootstrappedES(), + observedState: observer.State{ClusterState: nil}, + wantCluster: notBootstrappedES(), + }, + { + name: "not annotated, but not bootstrapped yet (cluster UUID empty)", + cluster: notBootstrappedES(), + observedState: observer.State{ClusterState: &client.ClusterState{ClusterUUID: ""}}, + wantCluster: notBootstrappedES(), + }, + { + name: "not annotated, but bootstrapped", + c: k8s.WrapClient(fake.NewFakeClient(notBootstrappedES())), + cluster: notBootstrappedES(), + observedState: observer.State{ClusterState: &client.ClusterState{ClusterUUID: "uuid"}}, + wantCluster: bootstrappedES(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := ReconcileClusterUUID(tt.c, tt.cluster, tt.observedState) + require.NoError(t, err) + require.Equal(t, tt.wantCluster, tt.cluster) + }) + } +} diff --git a/pkg/controller/elasticsearch/driver/downscale.go b/pkg/controller/elasticsearch/driver/downscale.go index e753f1d792..687d7985df 100644 --- a/pkg/controller/elasticsearch/driver/downscale.go +++ b/pkg/controller/elasticsearch/driver/downscale.go @@ -26,14 +26,6 @@ func HandleDownscale( ) *reconciler.Results { results := &reconciler.Results{} - canProceed, err := noOnGoingDeletion(downscaleCtx, actualStatefulSets) - if err != nil { - return results.WithError(err) - } - if !canProceed { - return results.WithResult(defaultRequeue) - } - // compute the list of StatefulSet downscales to perform downscales := calculateDownscales(expectedStatefulSets, actualStatefulSets) leavingNodes := leavingNodeNames(downscales) @@ -64,19 +56,6 @@ func HandleDownscale( return results } -// noOnGoingDeletion returns true if some pods deletion or creation may still be in progress -func noOnGoingDeletion(downscaleCtx downscaleContext, actualStatefulSets sset.StatefulSetList) (bool, error) { - // Pods we have may not match replicas specified in the StatefulSets spec. - // This can happen if, for example, replicas were recently downscaled to remove a node, - // but the node isn't completely terminated yet, and may still be part of the cluster. - // Moving on with downscaling more nodes may lead to complications when dealing with - // Elasticsearch shards allocation excludes (should not be cleared if the ghost node isn't removed yet) - // or zen settings (must consider terminating masters that are still there). - // Let's retry once expected pods are there. - // PodReconciliationDone also matches any pod not created yet, for which we'll also requeue. - return actualStatefulSets.PodReconciliationDone(downscaleCtx.k8sClient, downscaleCtx.es) -} - // calculateDownscales compares expected and actual StatefulSets to return a list of ssetDownscale. // We also include StatefulSets removal (0 replicas) in those downscales. func calculateDownscales(expectedStatefulSets sset.StatefulSetList, actualStatefulSets sset.StatefulSetList) []ssetDownscale { diff --git a/pkg/controller/elasticsearch/driver/downscale_test.go b/pkg/controller/elasticsearch/driver/downscale_test.go index afc146e221..364c50b2a9 100644 --- a/pkg/controller/elasticsearch/driver/downscale_test.go +++ b/pkg/controller/elasticsearch/driver/downscale_test.go @@ -182,15 +182,6 @@ func TestHandleDownscale(t *testing.T) { require.NoError(t, err) require.Equal(t, expectedAfterDownscale, actual.Items) - // running the downscale again should requeue since some pods are not terminated yet - results = HandleDownscale(downscaleCtx, requestedStatefulSets, actual.Items) - require.False(t, results.HasError()) - require.Equal(t, requeueResults, results) - // no StatefulSet should have been updated - err = k8sClient.List(&client.ListOptions{}, &actual) - require.NoError(t, err) - require.Equal(t, expectedAfterDownscale, actual.Items) - // simulate pods deletion that would be done by the StatefulSet controller require.NoError(t, k8sClient.Delete(&podsSsetMaster3Replicas[2])) require.NoError(t, k8sClient.Delete(&podsSsetData4Replicas[3])) diff --git a/pkg/controller/elasticsearch/driver/driver.go b/pkg/controller/elasticsearch/driver/driver.go index e99e911795..1c42c71ce2 100644 --- a/pkg/controller/elasticsearch/driver/driver.go +++ b/pkg/controller/elasticsearch/driver/driver.go @@ -230,6 +230,12 @@ func (d *defaultDriver) Reconcile() *reconciler.Results { return results.WithError(err) } + // set an annotation with the ClusterUUID, if bootstrapped + if err := ReconcileClusterUUID(d.Client, &d.ES, observedState); err != nil { + return results.WithError(err) + } + + // reconcile StatefulSets and nodes configuration res = d.reconcileNodeSpecs(esReachable, esClient, d.ReconcileState, observedState, *resourcesState, keystoreResources) if results.WithResults(res).HasError() { return results diff --git a/pkg/controller/elasticsearch/driver/expectations.go b/pkg/controller/elasticsearch/driver/expectations.go new file mode 100644 index 0000000000..2757889403 --- /dev/null +++ b/pkg/controller/elasticsearch/driver/expectations.go @@ -0,0 +1,36 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package driver + +import ( + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset" +) + +func (d *defaultDriver) expectationsMet(actualStatefulSets sset.StatefulSetList) (bool, error) { + if !d.Expectations.GenerationExpected(actualStatefulSets.ObjectMetas()...) { + // Our cache of StatefulSets is out of date compared to previous reconciliation operations. + // Continuing with the reconciliation at this point may lead to: + // - errors on rejected sset updates (conflict since cached resource out of date): that's ok + // - calling ES orchestration settings (zen1/zen2/allocation excludes) with wrong assumptions: that's not ok + // Hence we choose to abort the reconciliation early: will run again later with an updated cache. + log.V(1).Info("StatefulSet cache out-of-date, re-queueing", "namespace", d.ES.Namespace, "es_name", d.ES.Name) + return false, nil + } + + podsReconciled, err := actualStatefulSets.PodReconciliationDone(d.Client) + if err != nil { + return false, err + } + if !podsReconciled { + // Pods we have in the cache do not match StatefulSets we have in the cache. + // This can happen if some pods have been scheduled for creation/deletion/upgrade + // but the operation has not happened or been observed yet. + // Continuing with nodes reconciliation at this point would be dangerous, since + // we may update ES orchestration settings (zen1/zen2/allocation excludes) with + // wrong assumptions (especially on master-eligible and ES version mismatches). + return false, nil + } + return true, nil +} diff --git a/pkg/controller/elasticsearch/driver/expectations_test.go b/pkg/controller/elasticsearch/driver/expectations_test.go new file mode 100644 index 0000000000..9401076a38 --- /dev/null +++ b/pkg/controller/elasticsearch/driver/expectations_test.go @@ -0,0 +1,61 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package driver + +import ( + "testing" + + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/elastic/cloud-on-k8s/pkg/controller/common" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/reconciler" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset" + "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" + + "github.com/stretchr/testify/require" +) + +func Test_defaultDriver_expectationsMet(t *testing.T) { + d := &defaultDriver{DefaultDriverParameters{ + Expectations: reconciler.NewExpectations(), + Client: k8s.WrapClient(fake.NewFakeClient()), + }} + + // no expectations set + met, err := d.expectationsMet(sset.StatefulSetList{}) + require.NoError(t, err) + require.True(t, met) + + // a sset generation is expected + statefulSet := sset.TestSset{Name: "sset"}.Build() + statefulSet.Generation = 123 + d.Expectations.ExpectGeneration(statefulSet.ObjectMeta) + // but not met yet + statefulSet.Generation = 122 + met, err = d.expectationsMet(sset.StatefulSetList{statefulSet}) + require.NoError(t, err) + require.False(t, met) + // met now + statefulSet.Generation = 123 + met, err = d.expectationsMet(sset.StatefulSetList{statefulSet}) + require.NoError(t, err) + require.True(t, met) + + // we expect some sset replicas to exist + // but corresponding pod does not exist + statefulSet.Spec.Replicas = common.Int32(1) + // expectations should not be met: we miss a pod + met, err = d.expectationsMet(sset.StatefulSetList{statefulSet}) + require.NoError(t, err) + require.False(t, met) + + // add the missing pod + pod := sset.TestPod{Name: "sset-0", StatefulSetName: statefulSet.Name}.Build() + d.Client = k8s.WrapClient(fake.NewFakeClient(&pod)) + // expectations should be met + met, err = d.expectationsMet(sset.StatefulSetList{statefulSet}) + require.NoError(t, err) + require.True(t, met) +} diff --git a/pkg/controller/elasticsearch/driver/nodes.go b/pkg/controller/elasticsearch/driver/nodes.go index 4098463762..d3ae8754da 100644 --- a/pkg/controller/elasticsearch/driver/nodes.go +++ b/pkg/controller/elasticsearch/driver/nodes.go @@ -5,7 +5,6 @@ package driver import ( - "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1alpha1" "github.com/elastic/cloud-on-k8s/pkg/controller/common/keystore" "github.com/elastic/cloud-on-k8s/pkg/controller/common/reconciler" esclient "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client" @@ -33,23 +32,32 @@ func (d *defaultDriver) reconcileNodeSpecs( return results.WithError(err) } - if !d.Expectations.GenerationExpected(actualStatefulSets.ObjectMetas()...) { - // Our cache of StatefulSets is out of date compared to previous reconciliation operations. - // Continuing with the reconciliation at this point may lead to: - // - errors on rejected sset updates (conflict since cached resource out of date): that's ok - // - calling ES orchestration settings (zen1/zen2/allocation excludes) with wrong assumptions: that's not ok - // Hence we choose to abort the reconciliation early: will run again later with an updated cache. - log.V(1).Info("StatefulSet cache out-of-date, re-queueing", "namespace", d.ES.Namespace, "es_name", d.ES.Name) + // check if actual StatefulSets and corresponding pods match our expectations before applying any change + ok, err := d.expectationsMet(actualStatefulSets) + if err != nil { + return results.WithError(err) + } + if !ok { return results.WithResult(defaultRequeue) } - expectedResources, err := expectedResources(d.Client, d.ES, observedState, keystoreResources) + expectedResources, err := nodespec.BuildExpectedResources(d.ES, keystoreResources) if err != nil { return results.WithError(err) } + esState := NewMemoizingESState(esClient) + // Phase 1: apply expected StatefulSets resources and scale up. - if err := HandleUpscaleAndSpecChanges(d.Client, d.ES, d.Scheme(), expectedResources, actualStatefulSets); err != nil { + upscaleCtx := upscaleCtx{ + k8sClient: d.K8sClient(), + es: d.ES, + scheme: d.Scheme(), + observedState: observedState, + esState: esState, + upscaleStateBuilder: &upscaleStateBuilder{}, + } + if err := HandleUpscaleAndSpecChanges(upscaleCtx, actualStatefulSets, expectedResources); err != nil { return results.WithError(err) } @@ -97,7 +105,7 @@ func (d *defaultDriver) reconcileNodeSpecs( // Phase 3: handle rolling upgrades. // Control nodes restart (upgrade) by manually decrementing rollingUpdate.Partition. - rollingUpgradesRes := d.handleRollingUpgrades(esClient, actualStatefulSets) + rollingUpgradesRes := d.handleRollingUpgrades(esClient, esState, actualStatefulSets) results.WithResults(rollingUpgradesRes) if rollingUpgradesRes.HasError() { return results @@ -108,26 +116,3 @@ func (d *defaultDriver) reconcileNodeSpecs( // - grow and shrink return results } - -func expectedResources( - k8sClient k8s.Client, - es v1alpha1.Elasticsearch, - observedState observer.State, - keystoreResources *keystore.Resources, -) (nodespec.ResourcesList, error) { - resources, err := nodespec.BuildExpectedResources(es, keystoreResources) - if err != nil { - return nil, err - } - - // patch configs to consider zen1 minimum master nodes - if err := zen1.SetupMinimumMasterNodesConfig(resources); err != nil { - return nil, err - } - // patch configs to consider zen2 initial master nodes - if err := zen2.SetupInitialMasterNodes(es, observedState, k8sClient, resources); err != nil { - return nil, err - } - - return resources, nil -} diff --git a/pkg/controller/elasticsearch/driver/nodes_test.go b/pkg/controller/elasticsearch/driver/nodes_test.go deleted file mode 100644 index 5883a9a812..0000000000 --- a/pkg/controller/elasticsearch/driver/nodes_test.go +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package driver - -import ( - "testing" - - "github.com/stretchr/testify/require" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/controller-runtime/pkg/client/fake" - - "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1alpha1" - "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/observer" - "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/settings" - "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" -) - -func Test_expectedResources_zen2(t *testing.T) { - es := v1alpha1.Elasticsearch{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "ns", - Name: "cluster", - }, - Spec: v1alpha1.ElasticsearchSpec{ - Version: "7.2.0", - Nodes: []v1alpha1.NodeSpec{ - { - Name: "masters", - NodeCount: 3, - }, - { - Name: "data", - NodeCount: 3, - }, - }, - }, - } - - resources, err := expectedResources(k8s.WrapClient(fake.NewFakeClient()), es, observer.State{}, nil) - require.NoError(t, err) - - // 2 StatefulSets should be created - require.Equal(t, 2, len(resources.StatefulSets())) - // master nodes config should be patched to account for zen2 initial master nodes - require.NotEmpty(t, resources[0].Config.HasKeys([]string{settings.ClusterInitialMasterNodes})) - // zen1 m_m_n specific config should not be set - require.Empty(t, resources[0].Config.HasKeys([]string{settings.DiscoveryZenMinimumMasterNodes})) -} - -func Test_expectedResources_zen1(t *testing.T) { - es := v1alpha1.Elasticsearch{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "ns", - Name: "cluster", - }, - Spec: v1alpha1.ElasticsearchSpec{ - Version: "6.8.0", - Nodes: []v1alpha1.NodeSpec{ - { - Name: "masters", - NodeCount: 3, - }, - { - Name: "data", - NodeCount: 3, - }, - }, - }, - } - - resources, err := expectedResources(k8s.WrapClient(fake.NewFakeClient()), es, observer.State{}, nil) - require.NoError(t, err) - - // 2 StatefulSets should be created - require.Equal(t, 2, len(resources.StatefulSets())) - // master nodes config should be patched to account for zen1 minimum_master_nodes - require.NotEmpty(t, resources[0].Config.HasKeys([]string{settings.DiscoveryZenMinimumMasterNodes})) - // zen2 config should not be set - require.Empty(t, resources[0].Config.HasKeys([]string{settings.ClusterInitialMasterNodes})) -} diff --git a/pkg/controller/elasticsearch/driver/upgrade.go b/pkg/controller/elasticsearch/driver/upgrade.go index 6025d32c8e..7270d0b131 100644 --- a/pkg/controller/elasticsearch/driver/upgrade.go +++ b/pkg/controller/elasticsearch/driver/upgrade.go @@ -23,13 +23,11 @@ import ( func (d *defaultDriver) handleRollingUpgrades( esClient esclient.Client, + esState ESState, statefulSets sset.StatefulSetList, ) *reconciler.Results { results := &reconciler.Results{} - // We need an up-to-date ES state, but avoid requesting information we may not need. - esState := NewMemoizingESState(esClient) - // Maybe upgrade some of the nodes. res := newRollingUpgrade(d, esClient, esState, statefulSets).run() results.WithResults(res) @@ -280,11 +278,11 @@ func (d *defaultDriver) MaybeEnableShardsAllocation( } // Make sure all pods scheduled for upgrade have been upgraded. - scheduledUpgradesDone, err := sset.ScheduledUpgradesDone(d.Client, statefulSets) + done, err := statefulSets.PodReconciliationDone(d.Client) if err != nil { return results.WithError(err) } - if !scheduledUpgradesDone { + if !done { log.V(1).Info( "Rolling upgrade not over yet, some pods don't have the updated revision, keeping shard allocations disabled", "namespace", d.ES.Namespace, diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index 46223868db..291d5202c6 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -11,50 +11,116 @@ import ( "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1alpha1" "github.com/elastic/cloud-on-k8s/pkg/controller/common" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/nodespec" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/observer" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/settings" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/version/zen1" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/version/zen2" "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" ) +type upscaleCtx struct { + k8sClient k8s.Client + es v1alpha1.Elasticsearch + scheme *runtime.Scheme + observedState observer.State + esState ESState + upscaleStateBuilder *upscaleStateBuilder +} + // HandleUpscaleAndSpecChanges reconciles expected NodeSpec resources. // It does: // - create any new StatefulSets // - update existing StatefulSets specification, to be used for future pods rotation // - upscale StatefulSet for which we expect more replicas +// - limit master node creation to one at a time // It does not: // - perform any StatefulSet downscale (left for downscale phase) // - perform any pod upgrade (left for rolling upgrade phase) func HandleUpscaleAndSpecChanges( - k8sClient k8s.Client, - es v1alpha1.Elasticsearch, - scheme *runtime.Scheme, - expectedResources nodespec.ResourcesList, + ctx upscaleCtx, actualStatefulSets sset.StatefulSetList, + expectedResources nodespec.ResourcesList, ) error { - // TODO: there is a split brain possibility here if going from 1 to 3 masters or 3 to 7. - // We should add only one master node at a time for safety. - // See https://github.com/elastic/cloud-on-k8s/issues/1281. - - for _, nodeSpecRes := range expectedResources { - // always reconcile config (will apply to new & recreated pods) - if err := settings.ReconcileConfig(k8sClient, es, nodeSpecRes.StatefulSet.Name, nodeSpecRes.Config); err != nil { + // adjust expected replicas to control nodes creation and deletion + adjusted, err := adjustResources(ctx, actualStatefulSets, expectedResources) + if err != nil { + return err + } + // reconcile all resources + for _, res := range adjusted { + if err := settings.ReconcileConfig(ctx.k8sClient, ctx.es, res.StatefulSet.Name, res.Config); err != nil { return err } - if _, err := common.ReconcileService(k8sClient, scheme, &nodeSpecRes.HeadlessService, &es); err != nil { + if _, err := common.ReconcileService(ctx.k8sClient, ctx.scheme, &res.HeadlessService, &ctx.es); err != nil { return err } - ssetToApply := *nodeSpecRes.StatefulSet.DeepCopy() - actual, alreadyExists := actualStatefulSets.GetByName(ssetToApply.Name) - if alreadyExists { - ssetToApply = adaptForExistingStatefulSet(actual, ssetToApply) + if err := sset.ReconcileStatefulSet(ctx.k8sClient, ctx.scheme, ctx.es, res.StatefulSet); err != nil { + return err } - if err := sset.ReconcileStatefulSet(k8sClient, scheme, es, ssetToApply); err != nil { + } + return nil +} + +func adjustResources( + ctx upscaleCtx, + actualStatefulSets sset.StatefulSetList, + expectedResources nodespec.ResourcesList, +) (nodespec.ResourcesList, error) { + adjustedResources := make(nodespec.ResourcesList, 0, len(expectedResources)) + for _, nodeSpecRes := range expectedResources { + adjustedSset, err := adjustStatefulSetReplicas(ctx, actualStatefulSets, *nodeSpecRes.StatefulSet.DeepCopy()) + if err != nil { + return nil, err + } + nodeSpecRes.StatefulSet = adjustedSset + adjustedResources = append(adjustedResources, nodeSpecRes) + } + // adapt resources configuration to match adjusted replicas + if err := adjustZenConfig(ctx.es, adjustedResources); err != nil { + return nil, err + } + return adjustedResources, nil +} + +func adjustZenConfig(es v1alpha1.Elasticsearch, resources nodespec.ResourcesList) error { + // patch configs to consider zen1 minimum master nodes + if err := zen1.SetupMinimumMasterNodesConfig(resources); err != nil { + return err + } + // patch configs to consider zen2 initial master nodes if cluster is not bootstrapped yet + if !AnnotatedForBootstrap(es) { + if err := zen2.SetupInitialMasterNodes(resources); err != nil { return err } } return nil } +func adjustStatefulSetReplicas( + ctx upscaleCtx, + actualStatefulSets sset.StatefulSetList, + expected appsv1.StatefulSet, +) (appsv1.StatefulSet, error) { + actual, alreadyExists := actualStatefulSets.GetByName(expected.Name) + if alreadyExists { + expected = adaptForExistingStatefulSet(actual, expected) + } + if alreadyExists && isReplicaIncrease(actual, expected) { + upscaleState, err := ctx.upscaleStateBuilder.InitOnce(ctx.k8sClient, ctx.es, ctx.esState) + if err != nil { + return appsv1.StatefulSet{}, err + } + expected = upscaleState.limitMasterNodesCreation(actualStatefulSets, expected) + } + return expected, nil +} + +// isReplicaIncrease returns true if expected replicas are higher than actual replicas. +func isReplicaIncrease(actual appsv1.StatefulSet, expected appsv1.StatefulSet) bool { + return sset.GetReplicas(expected) > sset.GetReplicas(actual) +} + // adaptForExistingStatefulSet modifies ssetToApply to account for the existing StatefulSet. // It avoids triggering downscales (done later), and makes sure new pods are created with the newest revision. func adaptForExistingStatefulSet(actualSset appsv1.StatefulSet, ssetToApply appsv1.StatefulSet) appsv1.StatefulSet { diff --git a/pkg/controller/elasticsearch/driver/upscale_state.go b/pkg/controller/elasticsearch/driver/upscale_state.go new file mode 100644 index 0000000000..326e824b7e --- /dev/null +++ b/pkg/controller/elasticsearch/driver/upscale_state.go @@ -0,0 +1,141 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package driver + +import ( + "sync" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + + "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1alpha1" + "github.com/elastic/cloud-on-k8s/pkg/controller/common" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/label" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/nodespec" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset" + "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" +) + +type upscaleStateBuilder struct { + once *sync.Once + upscaleState *upscaleState +} + +func (o *upscaleStateBuilder) InitOnce(c k8s.Client, es v1alpha1.Elasticsearch, esState ESState) (*upscaleState, error) { + if o.once == nil { + o.once = &sync.Once{} + } + var err error + o.once.Do(func() { + o.upscaleState, err = newUpscaleState(c, es, esState) + }) + return o.upscaleState, err +} + +type upscaleState struct { + isBootstrapped bool + allowMasterCreation bool +} + +func newUpscaleState(c k8s.Client, es v1alpha1.Elasticsearch, esState ESState) (*upscaleState, error) { + state := &upscaleState{ + isBootstrapped: AnnotatedForBootstrap(es), + allowMasterCreation: true, + } + if !state.isBootstrapped { + return state, nil + } + // is there a master node creation in progress already? + masters, err := sset.GetActualMastersForCluster(c, es) + if err != nil { + return nil, err + } + for _, masterNodePod := range masters { + isJoining, err := isMasterNodeJoining(masterNodePod, esState) + if err != nil { + return nil, err + } + if isJoining { + state.recordMasterNodeCreation() + } + } + return state, nil +} + +func isMasterNodeJoining(pod corev1.Pod, esState ESState) (bool, error) { + // Consider a master node to be in the process of joining the cluster if either: + + // - Pending (pod not started yet) + if pod.Status.Phase == corev1.PodPending { + return true, nil + } + + // - Running but not Ready (ES process still starting) + if pod.Status.Phase == corev1.PodRunning && !k8s.IsPodReady(pod) { + return true, nil + } + + // - Running & Ready but not part of the cluster + if pod.Status.Phase == corev1.PodRunning && k8s.IsPodReady(pod) { + // This does a synchronous request to Elasticsearch. + // Relying instead on a previous (out of date) observed ES state would risk a mismatch + // if a node was removed then re-added into the cluster. + inCluster, err := esState.NodesInCluster([]string{pod.Name}) + if err != nil { + return false, err + } + if !inCluster { + return true, nil + } + } + + // Otherwise, consider the pod is not in the process of joining the cluster. + // It's either already running (and has joined), or in an error state. + return false, nil +} + +func (s *upscaleState) recordMasterNodeCreation() { + // if the cluster is already formed, don't allow more master nodes to be created + if s.isBootstrapped { + s.allowMasterCreation = false + } +} + +func (s *upscaleState) canAddMasterNode() bool { + return s.allowMasterCreation +} + +func (s *upscaleState) limitMasterNodesCreation( + actualStatefulSets sset.StatefulSetList, + ssetToApply appsv1.StatefulSet, +) appsv1.StatefulSet { + if !label.IsMasterNodeSet(ssetToApply) { + return ssetToApply + } + + targetReplicas := sset.GetReplicas(ssetToApply) + actual, alreadyExists := actualStatefulSets.GetByName(ssetToApply.Name) + actualReplicas := int32(0) + if alreadyExists { + actualReplicas = sset.GetReplicas(actual) + } + + nodespec.UpdateReplicas(&ssetToApply, common.Int32(actualReplicas)) + for rep := actualReplicas + 1; rep <= targetReplicas; rep++ { + if !s.canAddMasterNode() { + ssetLogger(ssetToApply).Info( + "Limiting master nodes creation to one at a time", + "target", targetReplicas, + "current", sset.GetReplicas(ssetToApply), + ) + break + } + // allow one more master node to be created + nodespec.UpdateReplicas(&ssetToApply, common.Int32(rep)) + s.recordMasterNodeCreation() + } + + return ssetToApply +} diff --git a/pkg/controller/elasticsearch/driver/upscale_state_test.go b/pkg/controller/elasticsearch/driver/upscale_state_test.go new file mode 100644 index 0000000000..8aaed69642 --- /dev/null +++ b/pkg/controller/elasticsearch/driver/upscale_state_test.go @@ -0,0 +1,274 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package driver + +import ( + "reflect" + "testing" + + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1alpha1" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset" + "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" + + "github.com/stretchr/testify/require" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func Test_upscaleState_limitMasterNodesCreation(t *testing.T) { + tests := []struct { + name string + state *upscaleState + actualStatefulSets sset.StatefulSetList + ssetToApply appsv1.StatefulSet + wantSset appsv1.StatefulSet + wantState *upscaleState + }{ + { + name: "no change on the sset spec", + state: &upscaleState{allowMasterCreation: true, isBootstrapped: true}, + actualStatefulSets: sset.StatefulSetList{sset.TestSset{Name: "sset", Replicas: 3, Master: true}.Build()}, + ssetToApply: sset.TestSset{Name: "sset", Replicas: 3, Master: true}.Build(), + wantSset: sset.TestSset{Name: "sset", Replicas: 3, Master: true}.Build(), + wantState: &upscaleState{allowMasterCreation: true, isBootstrapped: true}, + }, + { + name: "spec change (same replicas)", + state: &upscaleState{allowMasterCreation: true, isBootstrapped: true}, + actualStatefulSets: sset.StatefulSetList{sset.TestSset{Name: "sset", Version: "6.8.0", Replicas: 3, Master: true}.Build()}, + ssetToApply: sset.TestSset{Name: "sset", Version: "7.2.0", Replicas: 3, Master: true}.Build(), + wantSset: sset.TestSset{Name: "sset", Version: "7.2.0", Replicas: 3, Master: true}.Build(), + wantState: &upscaleState{allowMasterCreation: true, isBootstrapped: true}, + }, + { + name: "upscale data nodes from 1 to 3: should go through", + state: &upscaleState{allowMasterCreation: true, isBootstrapped: true}, + actualStatefulSets: sset.StatefulSetList{sset.TestSset{Name: "sset", Replicas: 1, Master: false}.Build()}, + ssetToApply: sset.TestSset{Name: "sset", Replicas: 3, Master: false}.Build(), + wantSset: sset.TestSset{Name: "sset", Replicas: 3, Master: false}.Build(), + wantState: &upscaleState{allowMasterCreation: true, isBootstrapped: true}, + }, + { + name: "upscale master nodes from 1 to 3: should limit to 2", + state: &upscaleState{allowMasterCreation: true, isBootstrapped: true}, + actualStatefulSets: sset.StatefulSetList{sset.TestSset{Name: "sset", Replicas: 1, Master: true}.Build()}, + ssetToApply: sset.TestSset{Name: "sset", Replicas: 3, Master: true}.Build(), + wantSset: sset.TestSset{Name: "sset", Replicas: 2, Master: true}.Build(), + wantState: &upscaleState{allowMasterCreation: false, isBootstrapped: true}, + }, + { + name: "upscale master nodes from 1 to 3 when cluster not yet bootstrapped: should go through", + state: &upscaleState{allowMasterCreation: true, isBootstrapped: false}, + actualStatefulSets: sset.StatefulSetList{sset.TestSset{Name: "sset", Replicas: 1, Master: true}.Build()}, + ssetToApply: sset.TestSset{Name: "sset", Replicas: 3, Master: true}.Build(), + wantSset: sset.TestSset{Name: "sset", Replicas: 3, Master: true}.Build(), + wantState: &upscaleState{allowMasterCreation: true, isBootstrapped: false}, + }, + { + name: "new StatefulSet with 5 master nodes, cluster isn't bootstrapped yet: should go through", + state: &upscaleState{allowMasterCreation: true, isBootstrapped: false}, + actualStatefulSets: sset.StatefulSetList{}, + ssetToApply: sset.TestSset{Name: "sset", Replicas: 3, Master: true}.Build(), + wantSset: sset.TestSset{Name: "sset", Replicas: 3, Master: true}.Build(), + wantState: &upscaleState{allowMasterCreation: true, isBootstrapped: false}, + }, + { + name: "new StatefulSet with 5 master nodes, cluster already bootstrapped: should limit to 1", + state: &upscaleState{allowMasterCreation: true, isBootstrapped: true}, + actualStatefulSets: sset.StatefulSetList{}, + ssetToApply: sset.TestSset{Name: "sset", Replicas: 3, Master: true}.Build(), + wantSset: sset.TestSset{Name: "sset", Replicas: 1, Master: true}.Build(), + wantState: &upscaleState{allowMasterCreation: false, isBootstrapped: true}, + }, + { + name: "scale up from 3 to 5, nodespec changed to master: should limit to 4 (one new master)", + state: &upscaleState{allowMasterCreation: true, isBootstrapped: true}, + // no master on existing StatefulSet + actualStatefulSets: sset.StatefulSetList{sset.TestSset{Name: "sset", Replicas: 3, Master: false}.Build()}, + // turned into masters on newer StatefulSet + ssetToApply: sset.TestSset{Name: "sset", Replicas: 5, Master: true}.Build(), + wantSset: sset.TestSset{Name: "sset", Replicas: 4, Master: true}.Build(), + wantState: &upscaleState{allowMasterCreation: false, isBootstrapped: true}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotSset := tt.state.limitMasterNodesCreation(tt.actualStatefulSets, tt.ssetToApply) + // StatefulSet should be adapted + require.Equal(t, gotSset, tt.wantSset) + // upscaleState should be mutated accordingly + require.Equal(t, tt.wantState, tt.state) + }) + } +} + +type fakeESState struct { + ESState +} + +func (f *fakeESState) NodesInCluster(nodeNames []string) (bool, error) { + if nodeNames[0] == "inCluster" { + return true, nil + } + return false, nil +} + +func Test_isMasterNodeJoining(t *testing.T) { + tests := []struct { + name string + pod v1.Pod + esState ESState + want bool + }{ + { + name: "pod pending", + pod: v1.Pod{Status: v1.PodStatus{Phase: v1.PodPending}}, + want: true, + }, + { + name: "pod running but not ready", + pod: v1.Pod{Status: v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.ContainersReady, + Status: corev1.ConditionFalse, + }, + { + Type: corev1.PodReady, + Status: corev1.ConditionFalse, + }, + }}}, + want: true, + }, + { + name: "pod running and ready but not in the cluster yet", + pod: v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "notInCluster", + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.ContainersReady, + Status: corev1.ConditionTrue, + }, + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }}}, + esState: &fakeESState{}, + want: true, + }, + { + name: "pod running and ready and in the cluster", + pod: v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "inCluster", + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.ContainersReady, + Status: corev1.ConditionTrue, + }, + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }}}, + esState: &fakeESState{}, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := isMasterNodeJoining(tt.pod, tt.esState) + require.NoError(t, err) + if got != tt.want { + t.Errorf("isMasterNodeJoining() got = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_newUpscaleState(t *testing.T) { + type args struct { + c k8s.Client + es v1alpha1.Elasticsearch + esState ESState + } + tests := []struct { + name string + args args + want *upscaleState + }{ + { + name: "cluster not bootstrapped", + args: args{ + es: *notBootstrappedES(), + }, + want: &upscaleState{allowMasterCreation: true, isBootstrapped: false}, + }, + { + name: "bootstrapped, no master node joining", + args: args{ + c: k8s.WrapClient(fake.NewFakeClient()), + es: *bootstrappedES(), + }, + want: &upscaleState{allowMasterCreation: true, isBootstrapped: true}, + }, + { + name: "bootstrapped, a master node is pending", + args: args{ + c: k8s.WrapClient(fake.NewFakeClient(sset.TestPod{ClusterName: "cluster", Master: true, Status: corev1.PodStatus{Phase: corev1.PodPending}}.BuildPtr())), + es: *bootstrappedES(), + }, + want: &upscaleState{allowMasterCreation: false, isBootstrapped: true}, + }, + { + name: "bootstrapped, a data node is pending", + args: args{ + c: k8s.WrapClient(fake.NewFakeClient(sset.TestPod{ClusterName: "cluster", Master: false, Data: true, Status: corev1.PodStatus{Phase: corev1.PodPending}}.BuildPtr())), + es: *bootstrappedES(), + }, + want: &upscaleState{allowMasterCreation: true, isBootstrapped: true}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := newUpscaleState(tt.args.c, tt.args.es, tt.args.esState) + require.NoError(t, err) + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("newUpscaleState() got = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_upscaleStateBuilder_InitOnce(t *testing.T) { + b := &upscaleStateBuilder{} + s, err := b.InitOnce(k8s.WrapClient(fake.NewFakeClient()), *notBootstrappedES(), &fakeESState{}) + require.NoError(t, err) + require.Equal(t, &upscaleState{isBootstrapped: false, allowMasterCreation: true}, s) + // running InitOnce again should not build the state again + // run it with arguments that should normally modify the state + s, err = b.InitOnce(k8s.WrapClient(fake.NewFakeClient()), *bootstrappedES(), &fakeESState{}) + require.NoError(t, err) + require.Equal(t, &upscaleState{isBootstrapped: false, allowMasterCreation: true}, s) + // double checking this would indeed modify the state on first init + b = &upscaleStateBuilder{} + s, err = b.InitOnce(k8s.WrapClient(fake.NewFakeClient()), *bootstrappedES(), &fakeESState{}) + require.NoError(t, err) + require.Equal(t, &upscaleState{isBootstrapped: true, allowMasterCreation: true}, s) + +} diff --git a/pkg/controller/elasticsearch/driver/upscale_test.go b/pkg/controller/elasticsearch/driver/upscale_test.go index e7d6b1c8f9..5be013b1e4 100644 --- a/pkg/controller/elasticsearch/driver/upscale_test.go +++ b/pkg/controller/elasticsearch/driver/upscale_test.go @@ -5,6 +5,8 @@ package driver import ( + "reflect" + "sync" "testing" "github.com/stretchr/testify/require" @@ -19,15 +21,30 @@ import ( "github.com/elastic/cloud-on-k8s/pkg/controller/common" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/name" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/nodespec" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/observer" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/settings" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset" "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" ) +var onceDone = &sync.Once{} + +func init() { + onceDone.Do(func() {}) +} + func TestHandleUpscaleAndSpecChanges(t *testing.T) { require.NoError(t, v1alpha1.AddToScheme(scheme.Scheme)) k8sClient := k8s.WrapClient(fake.NewFakeClient()) es := v1alpha1.Elasticsearch{ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "es"}} + ctx := upscaleCtx{ + k8sClient: k8sClient, + es: es, + scheme: scheme.Scheme, + observedState: observer.State{}, + esState: nil, + upscaleStateBuilder: &upscaleStateBuilder{}, + } expectedResources := nodespec.ResourcesList{ { StatefulSet: appsv1.StatefulSet{ @@ -81,7 +98,7 @@ func TestHandleUpscaleAndSpecChanges(t *testing.T) { // when no StatefulSets already exists actualStatefulSets := sset.StatefulSetList{} - err := HandleUpscaleAndSpecChanges(k8sClient, es, scheme.Scheme, expectedResources, actualStatefulSets) + err := HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResources) require.NoError(t, err) // StatefulSets should be created with their expected replicas var sset1 appsv1.StatefulSet @@ -100,7 +117,7 @@ func TestHandleUpscaleAndSpecChanges(t *testing.T) { // upscale data nodes actualStatefulSets = sset.StatefulSetList{sset1, sset2} expectedResources[1].StatefulSet.Spec.Replicas = common.Int32(10) - err = HandleUpscaleAndSpecChanges(k8sClient, es, scheme.Scheme, expectedResources, actualStatefulSets) + err = HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResources) require.NoError(t, err) require.NoError(t, k8sClient.Get(types.NamespacedName{Namespace: "ns", Name: "sset2"}, &sset2)) require.Equal(t, common.Int32(10), sset2.Spec.Replicas) @@ -110,7 +127,7 @@ func TestHandleUpscaleAndSpecChanges(t *testing.T) { // apply a spec change actualStatefulSets = sset.StatefulSetList{sset1, sset2} expectedResources[1].StatefulSet.Spec.Template.Labels = map[string]string{"a": "b"} - err = HandleUpscaleAndSpecChanges(k8sClient, es, scheme.Scheme, expectedResources, actualStatefulSets) + err = HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResources) require.NoError(t, err) require.NoError(t, k8sClient.Get(types.NamespacedName{Namespace: "ns", Name: "sset2"}, &sset2)) require.Equal(t, "b", sset2.Spec.Template.Labels["a"]) @@ -121,7 +138,7 @@ func TestHandleUpscaleAndSpecChanges(t *testing.T) { actualStatefulSets = sset.StatefulSetList{sset1, sset2} expectedResources[1].StatefulSet.Spec.Replicas = common.Int32(2) expectedResources[1].StatefulSet.Spec.Template.Labels = map[string]string{"a": "c"} - err = HandleUpscaleAndSpecChanges(k8sClient, es, scheme.Scheme, expectedResources, actualStatefulSets) + err = HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResources) require.NoError(t, err) require.NoError(t, k8sClient.Get(types.NamespacedName{Namespace: "ns", Name: "sset2"}, &sset2)) // spec should be updated @@ -131,3 +148,238 @@ func TestHandleUpscaleAndSpecChanges(t *testing.T) { // partition should be updated to 10 so current pods don't get rotated automatically require.Equal(t, common.Int32(10), sset2.Spec.UpdateStrategy.RollingUpdate.Partition) } + +func Test_adaptForExistingStatefulSet(t *testing.T) { + tests := []struct { + name string + actualSset appsv1.StatefulSet + ssetToApply appsv1.StatefulSet + want appsv1.StatefulSet + }{ + { + name: "nothing to do", + actualSset: sset.TestSset{Replicas: 3, Partition: 3}.Build(), + ssetToApply: sset.TestSset{Replicas: 3}.Build(), + want: sset.TestSset{Replicas: 3, Partition: 3}.Build(), + }, + { + name: "upscale: set Partition to the actual replicas", + actualSset: sset.TestSset{Replicas: 3, Partition: 1}.Build(), + ssetToApply: sset.TestSset{Replicas: 10}.Build(), + want: sset.TestSset{Replicas: 10, Partition: 3}.Build(), + }, + { + name: "downscale: set replicas to the actual replicas", + actualSset: sset.TestSset{Replicas: 3, Partition: 1}.Build(), + ssetToApply: sset.TestSset{Replicas: 1}.Build(), + want: sset.TestSset{Replicas: 3, Partition: 3}.Build(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := adaptForExistingStatefulSet(tt.actualSset, tt.ssetToApply); !reflect.DeepEqual(got, tt.want) { + t.Errorf("adaptForExistingStatefulSet() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_isReplicaIncrease(t *testing.T) { + tests := []struct { + name string + actual appsv1.StatefulSet + expected appsv1.StatefulSet + want bool + }{ + { + name: "increase", + actual: sset.TestSset{Replicas: 3}.Build(), + expected: sset.TestSset{Replicas: 5}.Build(), + want: true, + }, + { + name: "decrease", + actual: sset.TestSset{Replicas: 5}.Build(), + expected: sset.TestSset{Replicas: 3}.Build(), + want: false, + }, + { + name: "same value", + actual: sset.TestSset{Replicas: 3}.Build(), + expected: sset.TestSset{Replicas: 3}.Build(), + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := isReplicaIncrease(tt.actual, tt.expected); got != tt.want { + t.Errorf("isReplicaIncrease() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_adjustStatefulSetReplicas(t *testing.T) { + type args struct { + ctx upscaleCtx + actualStatefulSets sset.StatefulSetList + expected appsv1.StatefulSet + } + tests := []struct { + name string + args args + want appsv1.StatefulSet + }{ + { + name: "new StatefulSet to create", + args: args{ + actualStatefulSets: sset.StatefulSetList{}, + expected: sset.TestSset{Name: "new-sset", Replicas: 3}.Build(), + }, + want: sset.TestSset{Name: "new-sset", Replicas: 3}.Build(), + }, + { + name: "same StatefulSet already exists", + args: args{ + actualStatefulSets: sset.StatefulSetList{sset.TestSset{Name: "sset", Replicas: 3}.Build()}, + expected: sset.TestSset{Name: "sset", Replicas: 3}.Build(), + }, + want: sset.TestSset{Name: "sset", Replicas: 3, Partition: 3}.Build(), + }, + { + name: "downscale case", + args: args{ + actualStatefulSets: sset.StatefulSetList{sset.TestSset{Name: "sset", Replicas: 3, Partition: 2}.Build()}, + expected: sset.TestSset{Name: "sset", Replicas: 1}.Build(), + }, + want: sset.TestSset{Name: "sset", Replicas: 3, Partition: 3}.Build(), + }, + { + name: "upscale case: data nodes", + args: args{ + ctx: upscaleCtx{ + upscaleStateBuilder: &upscaleStateBuilder{ + once: onceDone, + upscaleState: &upscaleState{isBootstrapped: true, allowMasterCreation: false}, + }, + }, + actualStatefulSets: sset.StatefulSetList{sset.TestSset{Name: "sset", Replicas: 3, Partition: 2, Master: false, Data: true}.Build()}, + expected: sset.TestSset{Name: "sset", Replicas: 5, Master: false, Data: true}.Build(), + }, + want: sset.TestSset{Name: "sset", Replicas: 5, Partition: 3, Master: false, Data: true}.Build(), + }, + { + name: "upscale case: master nodes - one by one", + args: args{ + ctx: upscaleCtx{ + upscaleStateBuilder: &upscaleStateBuilder{ + once: onceDone, + upscaleState: &upscaleState{isBootstrapped: true, allowMasterCreation: true}, + }, + }, + actualStatefulSets: sset.StatefulSetList{sset.TestSset{Name: "sset", Replicas: 3, Partition: 2, Master: true, Data: true}.Build()}, + expected: sset.TestSset{Name: "sset", Replicas: 5, Master: true, Data: true}.Build(), + }, + want: sset.TestSset{Name: "sset", Replicas: 4, Partition: 3, Master: true, Data: true}.Build(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := adjustStatefulSetReplicas(tt.args.ctx, tt.args.actualStatefulSets, tt.args.expected) + require.NoError(t, err) + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("adjustStatefulSetReplicas() got = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_adjustZenConfig(t *testing.T) { + bootstrappedES := v1alpha1.Elasticsearch{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{ClusterUUIDAnnotationName: "uuid"}}} + notBootstrappedES := v1alpha1.Elasticsearch{} + + tests := []struct { + name string + es v1alpha1.Elasticsearch + resources nodespec.ResourcesList + wantMinimumMasterNodesSet bool + wantInitialMasterNodesSet bool + }{ + { + name: "adjust zen1 minimum_master_nodes", + es: bootstrappedES, + resources: nodespec.ResourcesList{ + { + StatefulSet: sset.TestSset{Version: "6.8.0", Replicas: 3, Master: true, Data: true}.Build(), + Config: settings.NewCanonicalConfig(), + }, + }, + wantMinimumMasterNodesSet: true, + wantInitialMasterNodesSet: false, + }, + { + name: "adjust zen2 initial master nodes when cluster is not bootstrapped yet", + es: notBootstrappedES, + resources: nodespec.ResourcesList{ + { + StatefulSet: sset.TestSset{Version: "7.2.0", Replicas: 3, Master: true, Data: true}.Build(), + Config: settings.NewCanonicalConfig(), + }, + }, + wantMinimumMasterNodesSet: false, + wantInitialMasterNodesSet: true, + }, + { + name: "don't adjust zen2 initial master nodes when cluster is already bootstrapped", + es: bootstrappedES, + resources: nodespec.ResourcesList{ + { + StatefulSet: sset.TestSset{Version: "7.2.0", Replicas: 3, Master: true, Data: true}.Build(), + Config: settings.NewCanonicalConfig(), + }, + }, + wantMinimumMasterNodesSet: false, + wantInitialMasterNodesSet: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := adjustZenConfig(tt.es, tt.resources) + require.NoError(t, err) + for _, res := range tt.resources { + hasMinimumMasterNodes := len(res.Config.HasKeys([]string{settings.DiscoveryZenMinimumMasterNodes})) > 0 + require.Equal(t, tt.wantMinimumMasterNodesSet, hasMinimumMasterNodes) + hasInitialMasterNodes := len(res.Config.HasKeys([]string{settings.ClusterInitialMasterNodes})) > 0 + require.Equal(t, tt.wantInitialMasterNodesSet, hasInitialMasterNodes) + } + }) + } +} + +func Test_adjustResources(t *testing.T) { + actualSset := sset.StatefulSetList{sset.TestSset{Version: "6.8.0", Replicas: 3, Master: true, Partition: 2}.Build()} + resources := nodespec.ResourcesList{ + { + // upscale to 10 replicas + StatefulSet: sset.TestSset{Version: "6.8.0", Replicas: 10, Master: true, Partition: 2}.Build(), + Config: settings.NewCanonicalConfig(), + }, + } + upscaleCtx := upscaleCtx{ + upscaleStateBuilder: &upscaleStateBuilder{ + once: onceDone, + upscaleState: &upscaleState{isBootstrapped: true, allowMasterCreation: true}, + }, + } + adjusted, err := adjustResources(upscaleCtx, actualSset, resources) + require.NoError(t, err) + + // should have added one more master + expectedSset := sset.TestSset{Version: "6.8.0", Replicas: 4, Master: true, Partition: 3}.Build() + require.Equal(t, expectedSset, adjusted.StatefulSets()[0]) + // and set minimum master nodes + require.NotEmpty(t, adjusted[0].Config.HasKeys([]string{settings.DiscoveryZenMinimumMasterNodes})) + + // original sset should be kept unmodified + require.Equal(t, sset.TestSset{Version: "6.8.0", Replicas: 10, Master: true, Partition: 2}.Build(), resources[0].StatefulSet) +} diff --git a/pkg/controller/elasticsearch/sset/fixtures.go b/pkg/controller/elasticsearch/sset/fixtures.go index d43759e919..646fb3648f 100644 --- a/pkg/controller/elasticsearch/sset/fixtures.go +++ b/pkg/controller/elasticsearch/sset/fixtures.go @@ -5,12 +5,11 @@ package sset import ( + "github.com/elastic/cloud-on-k8s/pkg/controller/common/hash" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/label" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "github.com/elastic/cloud-on-k8s/pkg/controller/common/hash" - "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/label" ) type TestSset struct { @@ -55,6 +54,11 @@ func (t TestSset) Build() appsv1.StatefulSet { return statefulSet } +func (t TestSset) BuildPtr() *appsv1.StatefulSet { + built := t.Build() + return &built +} + type TestPod struct { Namespace string Name string @@ -64,6 +68,7 @@ type TestPod struct { Revision string Master bool Data bool + Status corev1.PodStatus } func (t TestPod) Build() corev1.Pod { @@ -81,6 +86,7 @@ func (t TestPod) Build() corev1.Pod { Name: t.Name, Labels: labels, }, + Status: t.Status, } } diff --git a/pkg/controller/elasticsearch/sset/list.go b/pkg/controller/elasticsearch/sset/list.go index c61c02f054..a6df1a0d6b 100644 --- a/pkg/controller/elasticsearch/sset/list.go +++ b/pkg/controller/elasticsearch/sset/list.go @@ -12,11 +12,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" - "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1alpha1" "github.com/elastic/cloud-on-k8s/pkg/controller/common/version" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/label" "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" - "github.com/elastic/cloud-on-k8s/pkg/utils/stringsutil" ) var log = logf.Log.WithName("statefulset") @@ -86,29 +84,19 @@ func (l StatefulSetList) GetActualPods(c k8s.Client) ([]corev1.Pod, error) { } // PodReconciliationDone returns true if actual existing pods match what is specified in the StatefulSetList. -// It may return false if there are pods in the process of being created (but not created yet) -// or terminated (but not removed yet). -func (l StatefulSetList) PodReconciliationDone(c k8s.Client, es v1alpha1.Elasticsearch) (bool, error) { - // pods we expect to be there based on StatefulSets spec - expectedPods := l.PodNames() - // pods that are there for this cluster - actualRawPods, err := GetActualPodsForCluster(c, es) - if err != nil { - return false, err - } - actualPods := k8s.PodNames(actualRawPods) - - // check if they match - match := len(expectedPods) == len(actualPods) && stringsutil.StringsInSlice(expectedPods, actualPods) - if !match { - log.V(1).Info( - "Pod reconciliation is not done yet", - "namespace", es.Namespace, "es_name", es.Name, - "expected_pods", expectedPods, "actual_pods", actualPods, - ) +// It may return false if there are pods in the process of being: +// - created (but not there in our resources cache) +// - removed (but still there in our resources cache) +// - upgraded (pod revision should match statefulset revision, but doesn't in our resources cache) +// Status of the pods (running, error, etc.) is ignored. +func (l StatefulSetList) PodReconciliationDone(c k8s.Client) (bool, error) { + for _, s := range l { + done, err := PodReconciliationDoneForSset(c, s) + if err != nil || !done { + return done, err + } } - - return match, nil + return true, nil } // DeepCopy returns a copy of the StatefulSetList with no reference to the original StatefulSetList. diff --git a/pkg/controller/elasticsearch/sset/list_test.go b/pkg/controller/elasticsearch/sset/list_test.go index 30836d864f..d49bafab39 100644 --- a/pkg/controller/elasticsearch/sset/list_test.go +++ b/pkg/controller/elasticsearch/sset/list_test.go @@ -14,8 +14,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client/fake" - "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1alpha1" - "github.com/elastic/cloud-on-k8s/pkg/controller/common" "github.com/elastic/cloud-on-k8s/pkg/controller/common/version" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/label" "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" @@ -91,41 +89,7 @@ func TestStatefulSetList_GetExistingPods(t *testing.T) { } func TestStatefulSetList_PodReconciliationDone(t *testing.T) { - es := v1alpha1.Elasticsearch{ - ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"}, - } - statefulSet1 := appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "sset1"}, - Spec: appsv1.StatefulSetSpec{ - Replicas: common.Int32(2), - }, - } - statefulSet2 := appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "sset2"}, - Spec: appsv1.StatefulSetSpec{ - Replicas: common.Int32(1), - }, - } - sset1Pod0 := corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{Namespace: es.Namespace, Name: "sset1-0", Labels: map[string]string{ - label.ClusterNameLabelName: es.Name, - }}, - } - sset1Pod1 := corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{Namespace: es.Namespace, Name: "sset1-1", Labels: map[string]string{ - label.ClusterNameLabelName: es.Name, - }}, - } - sset1Pod2 := corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{Namespace: es.Namespace, Name: "sset1-2", Labels: map[string]string{ - label.ClusterNameLabelName: es.Name, - }}, - } - sset2Pod0 := corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{Namespace: es.Namespace, Name: "sset2-0", Labels: map[string]string{ - label.ClusterNameLabelName: es.Name, - }}, - } + // more detailed cases covered in PodReconciliationDoneForSset(), called by the function we test here tests := []struct { name string l StatefulSetList @@ -141,37 +105,43 @@ func TestStatefulSetList_PodReconciliationDone(t *testing.T) { { name: "some pods, no sset", l: nil, - c: k8s.WrapClient(fake.NewFakeClient(&sset1Pod0)), - want: false, + c: k8s.WrapClient(fake.NewFakeClient( + TestPod{Namespace: "ns", Name: "sset-0", StatefulSetName: "sset", Revision: "current-rev"}.BuildPtr(), + )), + want: true, }, { name: "some statefulSets, no pod", - l: StatefulSetList{statefulSet1, statefulSet2}, - c: k8s.WrapClient(fake.NewFakeClient(&statefulSet1, &statefulSet2)), + l: StatefulSetList{TestSset{Name: "sset1", Replicas: 3}.Build()}, + c: k8s.WrapClient(fake.NewFakeClient(TestSset{Name: "sset1", Replicas: 3}.BuildPtr())), want: false, }, { - name: "missing sset1Pod1", - l: StatefulSetList{statefulSet1, statefulSet2}, - c: k8s.WrapClient(fake.NewFakeClient(&statefulSet1, &statefulSet2, &sset1Pod0, &sset2Pod0)), - want: false, + name: "sset has its pods", + l: StatefulSetList{ + TestSset{Name: "sset1", Replicas: 2, Status: appsv1.StatefulSetStatus{CurrentRevision: "current-rev"}}.Build(), + }, + c: k8s.WrapClient(fake.NewFakeClient( + TestPod{Namespace: "ns", Name: "sset1-0", StatefulSetName: "sset2", Revision: "current-rev"}.BuildPtr(), + TestPod{Namespace: "ns", Name: "sset1-1", StatefulSetName: "sset2", Revision: "current-rev"}.BuildPtr(), + )), + want: true, }, { - name: "additional pod sset1Pod2 that shouldn't be there", - l: StatefulSetList{statefulSet1, statefulSet2}, - c: k8s.WrapClient(fake.NewFakeClient(&statefulSet1, &statefulSet2, &sset1Pod0, &sset1Pod1, &sset1Pod2, &sset2Pod0)), + name: "sset is missing a pod", + l: StatefulSetList{ + TestSset{Name: "sset1", Replicas: 2, Status: appsv1.StatefulSetStatus{CurrentRevision: "current-rev"}}.Build(), + }, + c: k8s.WrapClient(fake.NewFakeClient( + TestPod{Namespace: "ns", Name: "sset1-0", StatefulSetName: "sset2", Revision: "current-rev"}.BuildPtr(), + )), want: false, }, - { - name: "pods match sset spec", - l: StatefulSetList{statefulSet1, statefulSet2}, - c: k8s.WrapClient(fake.NewFakeClient(&statefulSet1, &statefulSet2, &sset1Pod0, &sset1Pod1, &sset2Pod0)), - want: true, - }, + // TODO: test more than one StatefulSet once https://github.com/kubernetes-sigs/controller-runtime/pull/311 is available } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := tt.l.PodReconciliationDone(tt.c, es) + got, err := tt.l.PodReconciliationDone(tt.c) require.NoError(t, err) require.Equal(t, tt.want, got) }) diff --git a/pkg/controller/elasticsearch/sset/pod.go b/pkg/controller/elasticsearch/sset/pod.go index 916cd6d2c4..73c69c8b3b 100644 --- a/pkg/controller/elasticsearch/sset/pod.go +++ b/pkg/controller/elasticsearch/sset/pod.go @@ -17,6 +17,7 @@ import ( "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1alpha1" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/label" "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" + "github.com/elastic/cloud-on-k8s/pkg/utils/stringsutil" ) // PodName returns the name of the pod with the given ordinal for this StatefulSet. @@ -76,29 +77,57 @@ func GetActualMastersForCluster(c k8s.Client, es v1alpha1.Elasticsearch) ([]core return label.FilterMasterNodePods(pods), nil } -// ScheduledUpgradesDone returns true if all pods scheduled for upgrade have been upgraded. -// This is done by checking the revision of pods whose ordinal is higher or equal than the StatefulSet -// rollingUpdate.Partition index. -func ScheduledUpgradesDone(c k8s.Client, statefulSets StatefulSetList) (bool, error) { - for _, s := range statefulSets { - if s.Status.UpdateRevision == "" { - // no upgrade scheduled - continue +func PodReconciliationDoneForSset(c k8s.Client, statefulSet appsv1.StatefulSet) (bool, error) { + // check all expected pods are there: no more, no less + actualPods, err := GetActualPodsForStatefulSet(c, statefulSet) + if err != nil { + return false, err + } + actualPodNames := k8s.PodNames(actualPods) + expectedPodNames := PodNames(statefulSet) + if !(len(actualPodNames) == len(expectedPodNames) && stringsutil.StringsInSlice(expectedPodNames, actualPodNames)) { + log.V(1).Info( + "Some pods still need to be created/deleted", + "namespace", statefulSet.Namespace, "statefulset_name", statefulSet.Name, + "expected_pods", expectedPodNames, "actual_pods", actualPodNames, + ) + return false, nil + } + + // check pods revision match what is specified in the StatefulSet + done, err := scheduledUpgradeDone(c, statefulSet) + if err != nil { + return false, err + } + if !done { + log.V(1).Info( + "Some pods still need to be upgraded", + "namespace", statefulSet.Namespace, "statefulset_name", statefulSet.Name, + ) + return false, nil + } + + return true, nil +} + +func scheduledUpgradeDone(c k8s.Client, statefulSet appsv1.StatefulSet) (bool, error) { + if statefulSet.Status.UpdateRevision == "" { + // no upgrade scheduled + return true, nil + } + partition := GetPartition(statefulSet) + for i := GetReplicas(statefulSet) - 1; i >= partition; i-- { + var pod corev1.Pod + err := c.Get(types.NamespacedName{Namespace: statefulSet.Namespace, Name: PodName(statefulSet.Name, i)}, &pod) + if errors.IsNotFound(err) { + // pod probably being terminated + return false, nil + } + if err != nil { + return false, err } - partition := GetPartition(s) - for i := GetReplicas(s) - 1; i >= partition; i-- { - var pod corev1.Pod - err := c.Get(types.NamespacedName{Namespace: s.Namespace, Name: PodName(s.Name, i)}, &pod) - if errors.IsNotFound(err) { - // pod probably being terminated - return false, nil - } - if err != nil { - return false, err - } - if PodRevision(pod) != s.Status.UpdateRevision { - return false, nil - } + if PodRevision(pod) != statefulSet.Status.UpdateRevision { + return false, nil } } return true, nil diff --git a/pkg/controller/elasticsearch/sset/pod_test.go b/pkg/controller/elasticsearch/sset/pod_test.go index 1ca13d0535..9d1383ec67 100644 --- a/pkg/controller/elasticsearch/sset/pod_test.go +++ b/pkg/controller/elasticsearch/sset/pod_test.go @@ -36,138 +36,114 @@ func TestPodNames(t *testing.T) { ) } -func TestScheduledUpgradesDone(t *testing.T) { - ssetSample := func(name string, partition int32, currentRev string, updateRev string) appsv1.StatefulSet { - return appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "ns", - Name: name, - }, - Spec: appsv1.StatefulSetSpec{ - Replicas: common.Int32(3), - UpdateStrategy: appsv1.StatefulSetUpdateStrategy{ - Type: appsv1.RollingUpdateStatefulSetStrategyType, - RollingUpdate: &appsv1.RollingUpdateStatefulSetStrategy{Partition: common.Int32(partition)}, - }, - }, +func Test_PodReconciliationDoneForSset(t *testing.T) { + ssetName := "sset" + ssetSample := func(replicas int32, partition int32, currentRev string, updateRev string) appsv1.StatefulSet { + return TestSset{ + Name: ssetName, + ClusterName: "cluster", + Replicas: replicas, + Partition: partition, Status: appsv1.StatefulSetStatus{ CurrentRevision: currentRev, UpdateRevision: updateRev, }, - } + }.Build() } podSample := func(name string, revision string) *corev1.Pod { - return &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "ns", - Name: name, - Labels: map[string]string{ - appsv1.StatefulSetRevisionLabel: revision, - }, - }, - } + return TestPod{ + Namespace: "ns", + Name: name, + ClusterName: "cluster", + StatefulSetName: ssetName, + Revision: revision, + }.BuildPtr() } tests := []struct { - name string - c k8s.Client - statefulSets StatefulSetList - want bool + name string + c k8s.Client + statefulSet appsv1.StatefulSet + want bool }{ { - name: "no statefulset", - c: k8s.WrapClient(fake.NewFakeClient()), - statefulSets: nil, - want: true, - }, - { - name: "statefulset with no upgrade revision", - c: k8s.WrapClient(fake.NewFakeClient()), - statefulSets: StatefulSetList{ssetSample("sset", 1, "current-rev", "")}, - want: true, + name: "statefulset with a pod missing", + c: k8s.WrapClient(fake.NewFakeClient( + podSample("sset-0", "current-rev"), + podSample("sset-1", "current-rev"), + // missing sset-2 + )), + statefulSet: ssetSample(3, 1, "current-rev", ""), + want: false, }, { - name: "statefulset with one pod (sset-2) currently being restarted (missing)", + name: "statefulset with an additional pod", c: k8s.WrapClient(fake.NewFakeClient( podSample("sset-0", "current-rev"), podSample("sset-1", "current-rev"), + podSample("sset-2", "current-rev"), + // sset-3 still there from previous downscale + podSample("sset-3", "current-rev"), )), - statefulSets: StatefulSetList{ssetSample("sset", 2, "current-rev", "update-rev")}, - want: false, + statefulSet: ssetSample(3, 1, "current-rev", ""), + want: false, }, { - name: "statefulset with one pod upgraded, matching current partition", + name: "statefulset with all pods in the current revision, no upgrade revision", c: k8s.WrapClient(fake.NewFakeClient( podSample("sset-0", "current-rev"), podSample("sset-1", "current-rev"), - podSample("sset-2", "update-rev"), + podSample("sset-2", "current-rev"), )), - statefulSets: StatefulSetList{ssetSample("sset", 2, "current-rev", "update-rev")}, - want: true, + statefulSet: ssetSample(3, 1, "current-rev", ""), + want: true, }, { - name: "statefulset with one pod not upgraded yet", + name: "statefulset with one pod (sset-2) currently being restarted (missing)", c: k8s.WrapClient(fake.NewFakeClient( podSample("sset-0", "current-rev"), podSample("sset-1", "current-rev"), - podSample("sset-2", "current-rev"), )), - statefulSets: StatefulSetList{ssetSample("sset", 2, "current-rev", "update-rev")}, - want: false, + statefulSet: ssetSample(3, 2, "current-rev", "update-rev"), + want: false, }, { - name: "statefulset with all pods upgraded", + name: "statefulset with one pod upgraded, matching current partition", c: k8s.WrapClient(fake.NewFakeClient( - podSample("sset-0", "update-rev"), - podSample("sset-1", "update-rev"), + podSample("sset-0", "current-rev"), + podSample("sset-1", "current-rev"), podSample("sset-2", "update-rev"), )), - statefulSets: StatefulSetList{ssetSample("sset", 0, "current-rev", "update-rev")}, - want: true, + statefulSet: ssetSample(3, 2, "current-rev", "update-rev"), + want: true, }, { - name: "multiple statefulsets with all pods upgraded", + name: "statefulset with one pod not upgraded yet", c: k8s.WrapClient(fake.NewFakeClient( - podSample("sset-0", "update-rev"), - podSample("sset-1", "update-rev"), - podSample("sset-2", "update-rev"), - podSample("sset2-0", "update-rev"), - podSample("sset2-1", "update-rev"), - podSample("sset2-2", "update-rev"), + podSample("sset-0", "current-rev"), + podSample("sset-1", "current-rev"), + podSample("sset-2", "current-rev"), )), - statefulSets: StatefulSetList{ - ssetSample("sset", 0, "current-rev", "update-rev"), - ssetSample("sset2", 0, "current-rev", "update-rev"), - }, - want: true, + statefulSet: ssetSample(3, 2, "current-rev", "update-rev"), + want: false, }, { - name: "multiple statefulsets with some pods not upgraded yet", + name: "statefulset with all pods upgraded", c: k8s.WrapClient(fake.NewFakeClient( podSample("sset-0", "update-rev"), podSample("sset-1", "update-rev"), podSample("sset-2", "update-rev"), - podSample("sset2-0", "update-rev"), - podSample("sset2-1", "update-rev"), - podSample("sset2-2", "current-rev"), // not upgraded yet - podSample("sset3-0", "update-rev"), - podSample("sset3-1", "update-rev"), - podSample("sset3-2", "update-rev"), )), - statefulSets: StatefulSetList{ - ssetSample("sset", 0, "current-rev", "update-rev"), - ssetSample("sset2", 0, "current-rev", "update-rev"), - ssetSample("sset3", 0, "current-rev", "update-rev"), - }, - want: false, + statefulSet: ssetSample(3, 0, "current-rev", "update-rev"), + want: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := ScheduledUpgradesDone(tt.c, tt.statefulSets) + got, err := PodReconciliationDoneForSset(tt.c, tt.statefulSet) require.NoError(t, err) if got != tt.want { - t.Errorf("ScheduledUpgradesDone() got = %v, want %v", got, tt.want) + t.Errorf("PodReconciliationDoneForSset() got = %v, want %v", got, tt.want) } }) } diff --git a/pkg/controller/elasticsearch/version/zen2/initial_master_nodes.go b/pkg/controller/elasticsearch/version/zen2/initial_master_nodes.go index 343d8f6f33..0091aef411 100644 --- a/pkg/controller/elasticsearch/version/zen2/initial_master_nodes.go +++ b/pkg/controller/elasticsearch/version/zen2/initial_master_nodes.go @@ -5,67 +5,16 @@ package zen2 import ( - "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1alpha1" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/label" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/nodespec" - "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/observer" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/settings" - "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" ) -const ( - // ClusterUUIDAnnotationName used to store the cluster UUID as an annotation when cluster has been bootstrapped. - ClusterUUIDAnnotationName = "elasticsearch.k8s.elastic.co/cluster-uuid" -) - -// annotatedForBootstrap returns true if the cluster has been annotated with the UUID already. -func annotatedForBootstrap(cluster v1alpha1.Elasticsearch) bool { - _, bootstrapped := cluster.Annotations[ClusterUUIDAnnotationName] - return bootstrapped -} - -// clusterIsBootstrapped returns true if the cluster has formed and has a UUID. -func clusterIsBootstrapped(observedState observer.State) bool { - return observedState.ClusterState != nil && len(observedState.ClusterState.ClusterUUID) > 0 -} - -// annotateWithUUID annotates the cluster with its UUID, to mark it as "bootstrapped". -func annotateWithUUID(cluster v1alpha1.Elasticsearch, observedState observer.State, c k8s.Client) error { - log.Info("Annotating bootstrapped cluster with its UUID", "namespace", cluster.Namespace, "es_name", cluster.Name) - if cluster.Annotations == nil { - cluster.Annotations = make(map[string]string) - } - cluster.Annotations[ClusterUUIDAnnotationName] = observedState.ClusterState.ClusterUUID - if err := c.Update(&cluster); err != nil { - return err - } - return nil -} - // SetupInitialMasterNodes modifies the ES config of the given resources to setup // cluster initial master nodes. -// It also saves the cluster UUID as an annotation to ensure that it's not set -// if the cluster has already been bootstrapped. func SetupInitialMasterNodes( - cluster v1alpha1.Elasticsearch, - observedState observer.State, - c k8s.Client, nodeSpecResources nodespec.ResourcesList, ) error { - if annotatedForBootstrap(cluster) { - // Cluster already bootstrapped, nothing to do. - return nil - } - - if clusterIsBootstrapped(observedState) { - // Cluster is not annotated for bootstrap, but should be. - if err := annotateWithUUID(cluster, observedState, c); err != nil { - return err - } - return nil - } - - // Cluster is not bootstrapped yet, set initial_master_nodes setting in each master node config. masters := nodeSpecResources.MasterNodesNames() if len(masters) == 0 { return nil diff --git a/pkg/controller/elasticsearch/version/zen2/initial_master_nodes_test.go b/pkg/controller/elasticsearch/version/zen2/initial_master_nodes_test.go index 978a8d939e..a1e662b192 100644 --- a/pkg/controller/elasticsearch/version/zen2/initial_master_nodes_test.go +++ b/pkg/controller/elasticsearch/version/zen2/initial_master_nodes_test.go @@ -7,111 +7,15 @@ package zen2 import ( "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/kubernetes/scheme" - "sigs.k8s.io/controller-runtime/pkg/client/fake" - "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1alpha1" settings2 "github.com/elastic/cloud-on-k8s/pkg/controller/common/settings" - "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/nodespec" - "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/observer" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/settings" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset" - "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" ) -const ( - defaultClusterUUID = "jiMyMA1hQ-WMPK3vEStZuw" -) - -func setupScheme(t *testing.T) *runtime.Scheme { - sc := scheme.Scheme - if err := v1alpha1.SchemeBuilder.AddToScheme(sc); err != nil { - assert.Fail(t, "failed to add Es types") - } - return sc -} - -var esNN = types.NamespacedName{ - Namespace: "ns1", - Name: "foo", -} - -func newElasticsearch() v1alpha1.Elasticsearch { - return v1alpha1.Elasticsearch{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: esNN.Namespace, - Name: esNN.Name, - }, - } -} - -func withAnnotation(es v1alpha1.Elasticsearch, name, value string) v1alpha1.Elasticsearch { - if es.Annotations == nil { - es.Annotations = make(map[string]string) - } - es.Annotations[name] = value - return es -} - -func TestSetupInitialMasterNodes_AlreadyBootstrapped(t *testing.T) { - s := setupScheme(t) - tests := []struct { - name string - es v1alpha1.Elasticsearch - observedState observer.State - nodeSpecResources nodespec.ResourcesList - expected []settings.CanonicalConfig - expectedEs v1alpha1.Elasticsearch - }{ - { - name: "cluster already annotated for bootstrap: no changes", - es: withAnnotation(newElasticsearch(), ClusterUUIDAnnotationName, defaultClusterUUID), - nodeSpecResources: nodespec.ResourcesList{ - {StatefulSet: sset.TestSset{Name: "data", Version: "7.1.0", Replicas: 3, Master: false, Data: true}.Build(), Config: settings.NewCanonicalConfig()}, - }, - expected: []settings.CanonicalConfig{settings.NewCanonicalConfig()}, - expectedEs: withAnnotation(newElasticsearch(), ClusterUUIDAnnotationName, defaultClusterUUID), - }, - { - name: "cluster bootstrapped but not annotated: should be annotated", - es: newElasticsearch(), - observedState: observer.State{ClusterState: &client.ClusterState{ClusterUUID: defaultClusterUUID}}, - nodeSpecResources: nodespec.ResourcesList{ - {StatefulSet: sset.TestSset{Name: "data", Version: "7.1.0", Replicas: 3, Master: false, Data: true}.Build(), Config: settings.NewCanonicalConfig()}, - }, - expected: []settings.CanonicalConfig{settings.NewCanonicalConfig()}, - expectedEs: withAnnotation(newElasticsearch(), ClusterUUIDAnnotationName, defaultClusterUUID), - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - client := k8s.WrapClient(fake.NewFakeClientWithScheme(s, &tt.es)) - err := SetupInitialMasterNodes(tt.es, tt.observedState, client, tt.nodeSpecResources) - require.NoError(t, err) - // check if the ES resource was annotated - var es v1alpha1.Elasticsearch - err = client.Get(esNN, &es) - assert.NoError(t, err) - require.Equal(t, tt.expectedEs, es) - // check if nodespec config were modified - for i := 0; i < len(tt.nodeSpecResources); i++ { - expected, err := tt.expected[i].Render() - require.NoError(t, err) - actual, err := tt.nodeSpecResources[i].Config.Render() - require.NoError(t, err) - require.Equal(t, expected, actual) - } - }) - } -} - -func TestSetupInitialMasterNodes_NotBootstrapped(t *testing.T) { +func TestSetupInitialMasterNodes(t *testing.T) { tests := []struct { name string nodeSpecResources nodespec.ResourcesList @@ -165,7 +69,7 @@ func TestSetupInitialMasterNodes_NotBootstrapped(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := SetupInitialMasterNodes(v1alpha1.Elasticsearch{}, observer.State{}, k8s.WrapClient(fake.NewFakeClient()), tt.nodeSpecResources) + err := SetupInitialMasterNodes(tt.nodeSpecResources) require.NoError(t, err) for i := 0; i < len(tt.nodeSpecResources); i++ { expected, err := tt.expected[i].Render() diff --git a/pkg/controller/elasticsearch/version/zen2/voting_exclusions.go b/pkg/controller/elasticsearch/version/zen2/voting_exclusions.go index c1060f6104..25c2446014 100644 --- a/pkg/controller/elasticsearch/version/zen2/voting_exclusions.go +++ b/pkg/controller/elasticsearch/version/zen2/voting_exclusions.go @@ -38,7 +38,7 @@ func AddToVotingConfigExclusions(c k8s.Client, esClient client.Client, es v1alph } // canClearVotingConfigExclusions returns true if it is safe to clear voting config exclusions. -func canClearVotingConfigExclusions(c k8s.Client, es v1alpha1.Elasticsearch, actualStatefulSets sset.StatefulSetList) (bool, error) { +func canClearVotingConfigExclusions(c k8s.Client, actualStatefulSets sset.StatefulSetList) (bool, error) { // Voting config exclusions are set before master nodes are removed on sset downscale. // They can be cleared when: // - nodes are effectively removed @@ -47,7 +47,7 @@ func canClearVotingConfigExclusions(c k8s.Client, es v1alpha1.Elasticsearch, act // - expected nodes to remove are not removed yet // PodReconciliationDone returns false is there are some pods not created yet: we don't really // care about those here, but that's still fine to requeue and retry later for the sake of simplicity. - return actualStatefulSets.PodReconciliationDone(c, es) + return actualStatefulSets.PodReconciliationDone(c) } // ClearVotingConfigExclusions resets the voting config exclusions if all excluded nodes are properly removed. @@ -62,7 +62,7 @@ func ClearVotingConfigExclusions(es v1alpha1.Elasticsearch, c k8s.Client, esClie return false, nil } - canClear, err := canClearVotingConfigExclusions(c, es, actualStatefulSets) + canClear, err := canClearVotingConfigExclusions(c, actualStatefulSets) if err != nil { return false, err }