diff --git a/pkg/controller/elasticsearch/driver/expectations.go b/pkg/controller/elasticsearch/driver/expectations.go new file mode 100644 index 00000000000..2757889403a --- /dev/null +++ b/pkg/controller/elasticsearch/driver/expectations.go @@ -0,0 +1,36 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package driver + +import ( + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset" +) + +func (d *defaultDriver) expectationsMet(actualStatefulSets sset.StatefulSetList) (bool, error) { + if !d.Expectations.GenerationExpected(actualStatefulSets.ObjectMetas()...) { + // Our cache of StatefulSets is out of date compared to previous reconciliation operations. + // Continuing with the reconciliation at this point may lead to: + // - errors on rejected sset updates (conflict since cached resource out of date): that's ok + // - calling ES orchestration settings (zen1/zen2/allocation excludes) with wrong assumptions: that's not ok + // Hence we choose to abort the reconciliation early: will run again later with an updated cache. + log.V(1).Info("StatefulSet cache out-of-date, re-queueing", "namespace", d.ES.Namespace, "es_name", d.ES.Name) + return false, nil + } + + podsReconciled, err := actualStatefulSets.PodReconciliationDone(d.Client) + if err != nil { + return false, err + } + if !podsReconciled { + // Pods we have in the cache do not match StatefulSets we have in the cache. + // This can happen if some pods have been scheduled for creation/deletion/upgrade + // but the operation has not happened or been observed yet. + // Continuing with nodes reconciliation at this point would be dangerous, since + // we may update ES orchestration settings (zen1/zen2/allocation excludes) with + // wrong assumptions (especially on master-eligible and ES version mismatches). + return false, nil + } + return true, nil +} diff --git a/pkg/controller/elasticsearch/driver/expectations_test.go b/pkg/controller/elasticsearch/driver/expectations_test.go new file mode 100644 index 00000000000..9401076a38a --- /dev/null +++ b/pkg/controller/elasticsearch/driver/expectations_test.go @@ -0,0 +1,61 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package driver + +import ( + "testing" + + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/elastic/cloud-on-k8s/pkg/controller/common" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/reconciler" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset" + "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" + + "github.com/stretchr/testify/require" +) + +func Test_defaultDriver_expectationsMet(t *testing.T) { + d := &defaultDriver{DefaultDriverParameters{ + Expectations: reconciler.NewExpectations(), + Client: k8s.WrapClient(fake.NewFakeClient()), + }} + + // no expectations set + met, err := d.expectationsMet(sset.StatefulSetList{}) + require.NoError(t, err) + require.True(t, met) + + // a sset generation is expected + statefulSet := sset.TestSset{Name: "sset"}.Build() + statefulSet.Generation = 123 + d.Expectations.ExpectGeneration(statefulSet.ObjectMeta) + // but not met yet + statefulSet.Generation = 122 + met, err = d.expectationsMet(sset.StatefulSetList{statefulSet}) + require.NoError(t, err) + require.False(t, met) + // met now + statefulSet.Generation = 123 + met, err = d.expectationsMet(sset.StatefulSetList{statefulSet}) + require.NoError(t, err) + require.True(t, met) + + // we expect some sset replicas to exist + // but corresponding pod does not exist + statefulSet.Spec.Replicas = common.Int32(1) + // expectations should not be met: we miss a pod + met, err = d.expectationsMet(sset.StatefulSetList{statefulSet}) + require.NoError(t, err) + require.False(t, met) + + // add the missing pod + pod := sset.TestPod{Name: "sset-0", StatefulSetName: statefulSet.Name}.Build() + d.Client = k8s.WrapClient(fake.NewFakeClient(&pod)) + // expectations should be met + met, err = d.expectationsMet(sset.StatefulSetList{statefulSet}) + require.NoError(t, err) + require.True(t, met) +} diff --git a/pkg/controller/elasticsearch/driver/nodes.go b/pkg/controller/elasticsearch/driver/nodes.go index 8864be76428..d3ae8754da8 100644 --- a/pkg/controller/elasticsearch/driver/nodes.go +++ b/pkg/controller/elasticsearch/driver/nodes.go @@ -50,12 +50,12 @@ func (d *defaultDriver) reconcileNodeSpecs( // Phase 1: apply expected StatefulSets resources and scale up. upscaleCtx := upscaleCtx{ - k8sClient: d.K8sClient(), - es: d.ES, - scheme: d.Scheme(), - observedState: observedState, - esState: esState, - upscaleState: &upscaleStateBuilder{}, + k8sClient: d.K8sClient(), + es: d.ES, + scheme: d.Scheme(), + observedState: observedState, + esState: esState, + upscaleStateBuilder: &upscaleStateBuilder{}, } if err := HandleUpscaleAndSpecChanges(upscaleCtx, actualStatefulSets, expectedResources); err != nil { return results.WithError(err) @@ -116,30 +116,3 @@ func (d *defaultDriver) reconcileNodeSpecs( // - grow and shrink return results } - -func (d *defaultDriver) expectationsMet(actualStatefulSets sset.StatefulSetList) (bool, error) { - if !d.Expectations.GenerationExpected(actualStatefulSets.ObjectMetas()...) { - // Our cache of StatefulSets is out of date compared to previous reconciliation operations. - // Continuing with the reconciliation at this point may lead to: - // - errors on rejected sset updates (conflict since cached resource out of date): that's ok - // - calling ES orchestration settings (zen1/zen2/allocation excludes) with wrong assumptions: that's not ok - // Hence we choose to abort the reconciliation early: will run again later with an updated cache. - log.V(1).Info("StatefulSet cache out-of-date, re-queueing", "namespace", d.ES.Namespace, "es_name", d.ES.Name) - return false, nil - } - - podsReconciled, err := actualStatefulSets.PodReconciliationDone(d.Client) - if err != nil { - return false, err - } - if !podsReconciled { - // Pods we have in the cache do not match StatefulSets we have in the cache. - // This can happen if some pods have been scheduled for creation/deletion/upgrade - // but the operation has not happened or been observed yet. - // Continuing with nodes reconciliation at this point would be dangerous, since - // we may update ES orchestration settings (zen1/zen2/allocation excludes) with - // wrong assumptions (especially on master-eligible and ES version mismatches). - return false, nil - } - return true, nil -} diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index 761746cff2d..291d5202c67 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -20,12 +20,12 @@ import ( ) type upscaleCtx struct { - k8sClient k8s.Client - es v1alpha1.Elasticsearch - scheme *runtime.Scheme - observedState observer.State - esState ESState - upscaleState *upscaleStateBuilder + k8sClient k8s.Client + es v1alpha1.Elasticsearch + scheme *runtime.Scheme + observedState observer.State + esState ESState + upscaleStateBuilder *upscaleStateBuilder } // HandleUpscaleAndSpecChanges reconciles expected NodeSpec resources. @@ -43,7 +43,7 @@ func HandleUpscaleAndSpecChanges( expectedResources nodespec.ResourcesList, ) error { // adjust expected replicas to control nodes creation and deletion - adjusted, err := adjustReplicas(ctx, actualStatefulSets, expectedResources) + adjusted, err := adjustResources(ctx, actualStatefulSets, expectedResources) if err != nil { return err } @@ -62,14 +62,14 @@ func HandleUpscaleAndSpecChanges( return nil } -func adjustReplicas( +func adjustResources( ctx upscaleCtx, actualStatefulSets sset.StatefulSetList, expectedResources nodespec.ResourcesList, ) (nodespec.ResourcesList, error) { adjustedResources := make(nodespec.ResourcesList, 0, len(expectedResources)) for _, nodeSpecRes := range expectedResources { - adjustedSset, err := adjustStatefulSetReplicas(ctx, actualStatefulSets, nodeSpecRes.StatefulSet) + adjustedSset, err := adjustStatefulSetReplicas(ctx, actualStatefulSets, *nodeSpecRes.StatefulSet.DeepCopy()) if err != nil { return nil, err } @@ -77,19 +77,19 @@ func adjustReplicas( adjustedResources = append(adjustedResources, nodeSpecRes) } // adapt resources configuration to match adjusted replicas - if err := adjustZenConfig(ctx, adjustedResources); err != nil { + if err := adjustZenConfig(ctx.es, adjustedResources); err != nil { return nil, err } return adjustedResources, nil } -func adjustZenConfig(ctx upscaleCtx, resources nodespec.ResourcesList) error { +func adjustZenConfig(es v1alpha1.Elasticsearch, resources nodespec.ResourcesList) error { // patch configs to consider zen1 minimum master nodes if err := zen1.SetupMinimumMasterNodesConfig(resources); err != nil { return err } // patch configs to consider zen2 initial master nodes if cluster is not bootstrapped yet - if !AnnotatedForBootstrap(ctx.es) { + if !AnnotatedForBootstrap(es) { if err := zen2.SetupInitialMasterNodes(resources); err != nil { return err } @@ -107,7 +107,7 @@ func adjustStatefulSetReplicas( expected = adaptForExistingStatefulSet(actual, expected) } if alreadyExists && isReplicaIncrease(actual, expected) { - upscaleState, err := ctx.upscaleState.InitOnce(ctx.k8sClient, ctx.es, ctx.esState) + upscaleState, err := ctx.upscaleStateBuilder.InitOnce(ctx.k8sClient, ctx.es, ctx.esState) if err != nil { return appsv1.StatefulSet{}, err } diff --git a/pkg/controller/elasticsearch/driver/upscale_state.go b/pkg/controller/elasticsearch/driver/upscale_state.go index a67e537cfc9..d1727025c50 100644 --- a/pkg/controller/elasticsearch/driver/upscale_state.go +++ b/pkg/controller/elasticsearch/driver/upscale_state.go @@ -1,3 +1,7 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + package driver import ( @@ -64,20 +68,24 @@ func isMasterNodeJoining(pod corev1.Pod, esState ESState) (bool, error) { if pod.Status.Phase == corev1.PodPending { return true, nil } + // - Running but not Ready (ES process still starting) if pod.Status.Phase == corev1.PodRunning && !k8s.IsPodReady(pod) { return true, nil } + // - Running & Ready but not part of the cluster - // This does a synchronous request to Elasticsearch. - // Relying instead on a previous (out of date) observed ES state would risk a mismatch - // if a node was removed then re-added into the cluster. - inCluster, err := esState.NodesInCluster([]string{pod.Name}) - if err != nil { - return false, err - } - if !inCluster { - return true, nil + if pod.Status.Phase == corev1.PodRunning && k8s.IsPodReady(pod) { + // This does a synchronous request to Elasticsearch. + // Relying instead on a previous (out of date) observed ES state would risk a mismatch + // if a node was removed then re-added into the cluster. + inCluster, err := esState.NodesInCluster([]string{pod.Name}) + if err != nil { + return false, err + } + if !inCluster { + return true, nil + } } // Otherwise, consider the pod is not in the process of joining the cluster. diff --git a/pkg/controller/elasticsearch/driver/upscale_state_test.go b/pkg/controller/elasticsearch/driver/upscale_state_test.go index b8d13e9af64..2055b811477 100644 --- a/pkg/controller/elasticsearch/driver/upscale_state_test.go +++ b/pkg/controller/elasticsearch/driver/upscale_state_test.go @@ -5,11 +5,20 @@ package driver import ( + "reflect" "testing" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1alpha1" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset" + "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" + "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func Test_upscaleState_limitMasterNodesCreation(t *testing.T) { @@ -98,3 +107,152 @@ func Test_upscaleState_limitMasterNodesCreation(t *testing.T) { }) } } + +type fakeESState struct { + ESState +} + +func (f *fakeESState) NodesInCluster(nodeNames []string) (bool, error) { + if nodeNames[0] == "inCluster" { + return true, nil + } + return false, nil +} + +func Test_isMasterNodeJoining(t *testing.T) { + tests := []struct { + name string + pod v1.Pod + esState ESState + want bool + }{ + { + name: "pod pending", + pod: v1.Pod{Status: v1.PodStatus{Phase: v1.PodPending}}, + want: true, + }, + { + name: "pod running but not ready", + pod: v1.Pod{Status: v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.ContainersReady, + Status: corev1.ConditionFalse, + }, + { + Type: corev1.PodReady, + Status: corev1.ConditionFalse, + }, + }}}, + want: true, + }, + { + name: "pod running and ready but not in the cluster yet", + pod: v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "notInCluster", + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.ContainersReady, + Status: corev1.ConditionTrue, + }, + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }}}, + esState: &fakeESState{}, + want: true, + }, + { + name: "pod running and ready and in the cluster", + pod: v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "inCluster", + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.ContainersReady, + Status: corev1.ConditionTrue, + }, + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }}}, + esState: &fakeESState{}, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := isMasterNodeJoining(tt.pod, tt.esState) + require.NoError(t, err) + if got != tt.want { + t.Errorf("isMasterNodeJoining() got = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_newUpscaleState(t *testing.T) { + bootstrappedES := v1alpha1.Elasticsearch{ObjectMeta: metav1.ObjectMeta{Name: "cluster", Annotations: map[string]string{ClusterUUIDAnnotationName: "uuid"}}} + notBootstrappedES := v1alpha1.Elasticsearch{ObjectMeta: metav1.ObjectMeta{Name: "cluster"}} + type args struct { + c k8s.Client + es v1alpha1.Elasticsearch + esState ESState + } + tests := []struct { + name string + args args + want *upscaleState + }{ + { + name: "cluster not bootstrapped", + args: args{ + es: notBootstrappedES, + }, + want: &upscaleState{allowMasterCreation: true, isBootstrapped: false}, + }, + { + name: "bootstrapped, no master node joining", + args: args{ + c: k8s.WrapClient(fake.NewFakeClient()), + es: bootstrappedES, + }, + want: &upscaleState{allowMasterCreation: true, isBootstrapped: true}, + }, + { + name: "bootstrapped, a master node is pending", + args: args{ + c: k8s.WrapClient(fake.NewFakeClient(sset.TestPod{ClusterName: "cluster", Master: true, Status: corev1.PodStatus{Phase: corev1.PodPending}}.BuildPtr())), + es: bootstrappedES, + }, + want: &upscaleState{allowMasterCreation: false, isBootstrapped: true}, + }, + { + name: "bootstrapped, a data node is pending", + args: args{ + c: k8s.WrapClient(fake.NewFakeClient(sset.TestPod{ClusterName: "cluster", Master: false, Data: true, Status: corev1.PodStatus{Phase: corev1.PodPending}}.BuildPtr())), + es: bootstrappedES, + }, + want: &upscaleState{allowMasterCreation: true, isBootstrapped: true}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := newUpscaleState(tt.args.c, tt.args.es, tt.args.esState) + require.NoError(t, err) + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("newUpscaleState() got = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/controller/elasticsearch/driver/upscale_test.go b/pkg/controller/elasticsearch/driver/upscale_test.go index 232b0c28fb1..847df4abb8a 100644 --- a/pkg/controller/elasticsearch/driver/upscale_test.go +++ b/pkg/controller/elasticsearch/driver/upscale_test.go @@ -5,6 +5,8 @@ package driver import ( + "reflect" + "sync" "testing" "github.com/stretchr/testify/require" @@ -25,17 +27,23 @@ import ( "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" ) +var onceDone = sync.Once{} + +func init() { + onceDone.Do(func() {}) +} + func TestHandleUpscaleAndSpecChanges(t *testing.T) { require.NoError(t, v1alpha1.AddToScheme(scheme.Scheme)) k8sClient := k8s.WrapClient(fake.NewFakeClient()) es := v1alpha1.Elasticsearch{ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "es"}} ctx := upscaleCtx{ - k8sClient: k8sClient, - es: es, - scheme: scheme.Scheme, - observedState: observer.State{}, - esState: nil, - upscaleState: &upscaleStateBuilder{}, + k8sClient: k8sClient, + es: es, + scheme: scheme.Scheme, + observedState: observer.State{}, + esState: nil, + upscaleStateBuilder: &upscaleStateBuilder{}, } expectedResources := nodespec.ResourcesList{ { @@ -140,3 +148,238 @@ func TestHandleUpscaleAndSpecChanges(t *testing.T) { // partition should be updated to 10 so current pods don't get rotated automatically require.Equal(t, common.Int32(10), sset2.Spec.UpdateStrategy.RollingUpdate.Partition) } + +func Test_adaptForExistingStatefulSet(t *testing.T) { + tests := []struct { + name string + actualSset appsv1.StatefulSet + ssetToApply appsv1.StatefulSet + want appsv1.StatefulSet + }{ + { + name: "nothing to do", + actualSset: sset.TestSset{Replicas: 3, Partition: 3}.Build(), + ssetToApply: sset.TestSset{Replicas: 3}.Build(), + want: sset.TestSset{Replicas: 3, Partition: 3}.Build(), + }, + { + name: "upscale: set Partition to the actual replicas", + actualSset: sset.TestSset{Replicas: 3, Partition: 1}.Build(), + ssetToApply: sset.TestSset{Replicas: 10}.Build(), + want: sset.TestSset{Replicas: 10, Partition: 3}.Build(), + }, + { + name: "downscale: set replicas to the actual replicas", + actualSset: sset.TestSset{Replicas: 3, Partition: 1}.Build(), + ssetToApply: sset.TestSset{Replicas: 1}.Build(), + want: sset.TestSset{Replicas: 3, Partition: 3}.Build(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := adaptForExistingStatefulSet(tt.actualSset, tt.ssetToApply); !reflect.DeepEqual(got, tt.want) { + t.Errorf("adaptForExistingStatefulSet() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_isReplicaIncrease(t *testing.T) { + tests := []struct { + name string + actual appsv1.StatefulSet + expected appsv1.StatefulSet + want bool + }{ + { + name: "increase", + actual: sset.TestSset{Replicas: 3}.Build(), + expected: sset.TestSset{Replicas: 5}.Build(), + want: true, + }, + { + name: "decrease", + actual: sset.TestSset{Replicas: 5}.Build(), + expected: sset.TestSset{Replicas: 3}.Build(), + want: false, + }, + { + name: "same value", + actual: sset.TestSset{Replicas: 3}.Build(), + expected: sset.TestSset{Replicas: 3}.Build(), + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := isReplicaIncrease(tt.actual, tt.expected); got != tt.want { + t.Errorf("isReplicaIncrease() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_adjustStatefulSetReplicas(t *testing.T) { + type args struct { + ctx upscaleCtx + actualStatefulSets sset.StatefulSetList + expected appsv1.StatefulSet + } + tests := []struct { + name string + args args + want appsv1.StatefulSet + }{ + { + name: "new StatefulSet to create", + args: args{ + actualStatefulSets: sset.StatefulSetList{}, + expected: sset.TestSset{Name: "new-sset", Replicas: 3}.Build(), + }, + want: sset.TestSset{Name: "new-sset", Replicas: 3}.Build(), + }, + { + name: "same StatefulSet already exists", + args: args{ + actualStatefulSets: sset.StatefulSetList{sset.TestSset{Name: "sset", Replicas: 3}.Build()}, + expected: sset.TestSset{Name: "sset", Replicas: 3}.Build(), + }, + want: sset.TestSset{Name: "sset", Replicas: 3, Partition: 3}.Build(), + }, + { + name: "downscale case", + args: args{ + actualStatefulSets: sset.StatefulSetList{sset.TestSset{Name: "sset", Replicas: 3, Partition: 2}.Build()}, + expected: sset.TestSset{Name: "sset", Replicas: 1}.Build(), + }, + want: sset.TestSset{Name: "sset", Replicas: 3, Partition: 3}.Build(), + }, + { + name: "upscale case: data nodes", + args: args{ + ctx: upscaleCtx{ + upscaleStateBuilder: &upscaleStateBuilder{ + once: onceDone, + upscaleState: &upscaleState{isBootstrapped: true, allowMasterCreation: false}, + }, + }, + actualStatefulSets: sset.StatefulSetList{sset.TestSset{Name: "sset", Replicas: 3, Partition: 2, Master: false, Data: true}.Build()}, + expected: sset.TestSset{Name: "sset", Replicas: 5, Master: false, Data: true}.Build(), + }, + want: sset.TestSset{Name: "sset", Replicas: 5, Partition: 3, Master: false, Data: true}.Build(), + }, + { + name: "upscale case: master nodes - one by one", + args: args{ + ctx: upscaleCtx{ + upscaleStateBuilder: &upscaleStateBuilder{ + once: onceDone, + upscaleState: &upscaleState{isBootstrapped: true, allowMasterCreation: true}, + }, + }, + actualStatefulSets: sset.StatefulSetList{sset.TestSset{Name: "sset", Replicas: 3, Partition: 2, Master: true, Data: true}.Build()}, + expected: sset.TestSset{Name: "sset", Replicas: 5, Master: true, Data: true}.Build(), + }, + want: sset.TestSset{Name: "sset", Replicas: 4, Partition: 3, Master: true, Data: true}.Build(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := adjustStatefulSetReplicas(tt.args.ctx, tt.args.actualStatefulSets, tt.args.expected) + require.NoError(t, err) + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("adjustStatefulSetReplicas() got = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_adjustZenConfig(t *testing.T) { + bootstrappedES := v1alpha1.Elasticsearch{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{ClusterUUIDAnnotationName: "uuid"}}} + notBootstrappedES := v1alpha1.Elasticsearch{} + + tests := []struct { + name string + es v1alpha1.Elasticsearch + resources nodespec.ResourcesList + wantMinimumMasterNodesSet bool + wantInitialMasterNodesSet bool + }{ + { + name: "adjust zen1 minimum_master_nodes", + es: bootstrappedES, + resources: nodespec.ResourcesList{ + { + StatefulSet: sset.TestSset{Version: "6.8.0", Replicas: 3, Master: true, Data: true}.Build(), + Config: settings.NewCanonicalConfig(), + }, + }, + wantMinimumMasterNodesSet: true, + wantInitialMasterNodesSet: false, + }, + { + name: "adjust zen2 initial master nodes when cluster is not bootstrapped yet", + es: notBootstrappedES, + resources: nodespec.ResourcesList{ + { + StatefulSet: sset.TestSset{Version: "7.2.0", Replicas: 3, Master: true, Data: true}.Build(), + Config: settings.NewCanonicalConfig(), + }, + }, + wantMinimumMasterNodesSet: false, + wantInitialMasterNodesSet: true, + }, + { + name: "don't adjust zen2 initial master nodes when cluster is already bootstrapped", + es: bootstrappedES, + resources: nodespec.ResourcesList{ + { + StatefulSet: sset.TestSset{Version: "7.2.0", Replicas: 3, Master: true, Data: true}.Build(), + Config: settings.NewCanonicalConfig(), + }, + }, + wantMinimumMasterNodesSet: false, + wantInitialMasterNodesSet: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := adjustZenConfig(tt.es, tt.resources) + require.NoError(t, err) + for _, res := range tt.resources { + hasMinimumMasterNodes := len(res.Config.HasKeys([]string{settings.DiscoveryZenMinimumMasterNodes})) > 0 + require.Equal(t, tt.wantMinimumMasterNodesSet, hasMinimumMasterNodes) + hasInitialMasterNodes := len(res.Config.HasKeys([]string{settings.ClusterInitialMasterNodes})) > 0 + require.Equal(t, tt.wantInitialMasterNodesSet, hasInitialMasterNodes) + } + }) + } +} + +func Test_adjustResources(t *testing.T) { + actualSset := sset.StatefulSetList{sset.TestSset{Version: "6.8.0", Replicas: 3, Master: true, Partition: 2}.Build()} + resources := nodespec.ResourcesList{ + { + // upscale to 10 replicas + StatefulSet: sset.TestSset{Version: "6.8.0", Replicas: 10, Master: true, Partition: 2}.Build(), + Config: settings.NewCanonicalConfig(), + }, + } + upscaleCtx := upscaleCtx{ + upscaleStateBuilder: &upscaleStateBuilder{ + once: onceDone, + upscaleState: &upscaleState{isBootstrapped: true, allowMasterCreation: true}, + }, + } + adjusted, err := adjustResources(upscaleCtx, actualSset, resources) + require.NoError(t, err) + + // should have added one more master + expectedSset := sset.TestSset{Version: "6.8.0", Replicas: 4, Master: true, Partition: 3}.Build() + require.Equal(t, expectedSset, adjusted.StatefulSets()[0]) + // and set minimum master nodes + require.NotEmpty(t, adjusted[0].Config.HasKeys([]string{settings.DiscoveryZenMinimumMasterNodes})) + + // original sset should be kept unmodified + require.Equal(t, sset.TestSset{Version: "6.8.0", Replicas: 10, Master: true, Partition: 2}.Build(), resources[0].StatefulSet) +} diff --git a/pkg/controller/elasticsearch/sset/fixtures.go b/pkg/controller/elasticsearch/sset/fixtures.go index 5b0e53b26e9..646fb3648f2 100644 --- a/pkg/controller/elasticsearch/sset/fixtures.go +++ b/pkg/controller/elasticsearch/sset/fixtures.go @@ -68,6 +68,7 @@ type TestPod struct { Revision string Master bool Data bool + Status corev1.PodStatus } func (t TestPod) Build() corev1.Pod { @@ -85,6 +86,7 @@ func (t TestPod) Build() corev1.Pod { Name: t.Name, Labels: labels, }, + Status: t.Status, } }