From 4306cbf9ba1270739abd4ca573632e0bcf437f1c Mon Sep 17 00:00:00 2001 From: sebgl Date: Thu, 29 Aug 2019 12:32:54 +0200 Subject: [PATCH] Adapt upscale code to add only one master at a time and patch config accordingly --- pkg/controller/elasticsearch/driver/nodes.go | 40 +-- .../elasticsearch/driver/nodes_test.go | 82 ------ .../elasticsearch/driver/upgrade.go | 4 +- .../elasticsearch/driver/upscale.go | 100 +++++-- .../elasticsearch/driver/upscale_test.go | 260 +++++++++++++++++- 5 files changed, 353 insertions(+), 133 deletions(-) delete mode 100644 pkg/controller/elasticsearch/driver/nodes_test.go diff --git a/pkg/controller/elasticsearch/driver/nodes.go b/pkg/controller/elasticsearch/driver/nodes.go index 52d6ff650f..d3ae8754da 100644 --- a/pkg/controller/elasticsearch/driver/nodes.go +++ b/pkg/controller/elasticsearch/driver/nodes.go @@ -5,7 +5,6 @@ package driver import ( - "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1alpha1" "github.com/elastic/cloud-on-k8s/pkg/controller/common/keystore" "github.com/elastic/cloud-on-k8s/pkg/controller/common/reconciler" esclient "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client" @@ -42,13 +41,23 @@ func (d *defaultDriver) reconcileNodeSpecs( return results.WithResult(defaultRequeue) } - expectedResources, err := expectedResources(d.Client, d.ES, observedState, keystoreResources) + expectedResources, err := nodespec.BuildExpectedResources(d.ES, keystoreResources) if err != nil { return results.WithError(err) } + esState := NewMemoizingESState(esClient) + // Phase 1: apply expected StatefulSets resources and scale up. - if err := HandleUpscaleAndSpecChanges(d.Client, d.ES, d.Scheme(), expectedResources, actualStatefulSets); err != nil { + upscaleCtx := upscaleCtx{ + k8sClient: d.K8sClient(), + es: d.ES, + scheme: d.Scheme(), + observedState: observedState, + esState: esState, + upscaleStateBuilder: &upscaleStateBuilder{}, + } + if err := HandleUpscaleAndSpecChanges(upscaleCtx, actualStatefulSets, expectedResources); err != nil { return results.WithError(err) } @@ -96,7 +105,7 @@ func (d *defaultDriver) reconcileNodeSpecs( // Phase 3: handle rolling upgrades. // Control nodes restart (upgrade) by manually decrementing rollingUpdate.Partition. - rollingUpgradesRes := d.handleRollingUpgrades(esClient, actualStatefulSets) + rollingUpgradesRes := d.handleRollingUpgrades(esClient, esState, actualStatefulSets) results.WithResults(rollingUpgradesRes) if rollingUpgradesRes.HasError() { return results @@ -107,26 +116,3 @@ func (d *defaultDriver) reconcileNodeSpecs( // - grow and shrink return results } - -func expectedResources( - k8sClient k8s.Client, - es v1alpha1.Elasticsearch, - observedState observer.State, - keystoreResources *keystore.Resources, -) (nodespec.ResourcesList, error) { - resources, err := nodespec.BuildExpectedResources(es, keystoreResources) - if err != nil { - return nil, err - } - - // patch configs to consider zen1 minimum master nodes - if err := zen1.SetupMinimumMasterNodesConfig(resources); err != nil { - return nil, err - } - // patch configs to consider zen2 initial master nodes - if err := zen2.SetupInitialMasterNodes(es, observedState, k8sClient, resources); err != nil { - return nil, err - } - - return resources, nil -} diff --git a/pkg/controller/elasticsearch/driver/nodes_test.go b/pkg/controller/elasticsearch/driver/nodes_test.go deleted file mode 100644 index 5883a9a812..0000000000 --- a/pkg/controller/elasticsearch/driver/nodes_test.go +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package driver - -import ( - "testing" - - "github.com/stretchr/testify/require" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/controller-runtime/pkg/client/fake" - - "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1alpha1" - "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/observer" - "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/settings" - "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" -) - -func Test_expectedResources_zen2(t *testing.T) { - es := v1alpha1.Elasticsearch{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "ns", - Name: "cluster", - }, - Spec: v1alpha1.ElasticsearchSpec{ - Version: "7.2.0", - Nodes: []v1alpha1.NodeSpec{ - { - Name: "masters", - NodeCount: 3, - }, - { - Name: "data", - NodeCount: 3, - }, - }, - }, - } - - resources, err := expectedResources(k8s.WrapClient(fake.NewFakeClient()), es, observer.State{}, nil) - require.NoError(t, err) - - // 2 StatefulSets should be created - require.Equal(t, 2, len(resources.StatefulSets())) - // master nodes config should be patched to account for zen2 initial master nodes - require.NotEmpty(t, resources[0].Config.HasKeys([]string{settings.ClusterInitialMasterNodes})) - // zen1 m_m_n specific config should not be set - require.Empty(t, resources[0].Config.HasKeys([]string{settings.DiscoveryZenMinimumMasterNodes})) -} - -func Test_expectedResources_zen1(t *testing.T) { - es := v1alpha1.Elasticsearch{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "ns", - Name: "cluster", - }, - Spec: v1alpha1.ElasticsearchSpec{ - Version: "6.8.0", - Nodes: []v1alpha1.NodeSpec{ - { - Name: "masters", - NodeCount: 3, - }, - { - Name: "data", - NodeCount: 3, - }, - }, - }, - } - - resources, err := expectedResources(k8s.WrapClient(fake.NewFakeClient()), es, observer.State{}, nil) - require.NoError(t, err) - - // 2 StatefulSets should be created - require.Equal(t, 2, len(resources.StatefulSets())) - // master nodes config should be patched to account for zen1 minimum_master_nodes - require.NotEmpty(t, resources[0].Config.HasKeys([]string{settings.DiscoveryZenMinimumMasterNodes})) - // zen2 config should not be set - require.Empty(t, resources[0].Config.HasKeys([]string{settings.ClusterInitialMasterNodes})) -} diff --git a/pkg/controller/elasticsearch/driver/upgrade.go b/pkg/controller/elasticsearch/driver/upgrade.go index bfd8d04757..7270d0b131 100644 --- a/pkg/controller/elasticsearch/driver/upgrade.go +++ b/pkg/controller/elasticsearch/driver/upgrade.go @@ -23,13 +23,11 @@ import ( func (d *defaultDriver) handleRollingUpgrades( esClient esclient.Client, + esState ESState, statefulSets sset.StatefulSetList, ) *reconciler.Results { results := &reconciler.Results{} - // We need an up-to-date ES state, but avoid requesting information we may not need. - esState := NewMemoizingESState(esClient) - // Maybe upgrade some of the nodes. res := newRollingUpgrade(d, esClient, esState, statefulSets).run() results.WithResults(res) diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index 46223868db..291d5202c6 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -11,50 +11,116 @@ import ( "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1alpha1" "github.com/elastic/cloud-on-k8s/pkg/controller/common" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/nodespec" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/observer" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/settings" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/version/zen1" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/version/zen2" "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" ) +type upscaleCtx struct { + k8sClient k8s.Client + es v1alpha1.Elasticsearch + scheme *runtime.Scheme + observedState observer.State + esState ESState + upscaleStateBuilder *upscaleStateBuilder +} + // HandleUpscaleAndSpecChanges reconciles expected NodeSpec resources. // It does: // - create any new StatefulSets // - update existing StatefulSets specification, to be used for future pods rotation // - upscale StatefulSet for which we expect more replicas +// - limit master node creation to one at a time // It does not: // - perform any StatefulSet downscale (left for downscale phase) // - perform any pod upgrade (left for rolling upgrade phase) func HandleUpscaleAndSpecChanges( - k8sClient k8s.Client, - es v1alpha1.Elasticsearch, - scheme *runtime.Scheme, - expectedResources nodespec.ResourcesList, + ctx upscaleCtx, actualStatefulSets sset.StatefulSetList, + expectedResources nodespec.ResourcesList, ) error { - // TODO: there is a split brain possibility here if going from 1 to 3 masters or 3 to 7. - // We should add only one master node at a time for safety. - // See https://github.com/elastic/cloud-on-k8s/issues/1281. - - for _, nodeSpecRes := range expectedResources { - // always reconcile config (will apply to new & recreated pods) - if err := settings.ReconcileConfig(k8sClient, es, nodeSpecRes.StatefulSet.Name, nodeSpecRes.Config); err != nil { + // adjust expected replicas to control nodes creation and deletion + adjusted, err := adjustResources(ctx, actualStatefulSets, expectedResources) + if err != nil { + return err + } + // reconcile all resources + for _, res := range adjusted { + if err := settings.ReconcileConfig(ctx.k8sClient, ctx.es, res.StatefulSet.Name, res.Config); err != nil { return err } - if _, err := common.ReconcileService(k8sClient, scheme, &nodeSpecRes.HeadlessService, &es); err != nil { + if _, err := common.ReconcileService(ctx.k8sClient, ctx.scheme, &res.HeadlessService, &ctx.es); err != nil { return err } - ssetToApply := *nodeSpecRes.StatefulSet.DeepCopy() - actual, alreadyExists := actualStatefulSets.GetByName(ssetToApply.Name) - if alreadyExists { - ssetToApply = adaptForExistingStatefulSet(actual, ssetToApply) + if err := sset.ReconcileStatefulSet(ctx.k8sClient, ctx.scheme, ctx.es, res.StatefulSet); err != nil { + return err } - if err := sset.ReconcileStatefulSet(k8sClient, scheme, es, ssetToApply); err != nil { + } + return nil +} + +func adjustResources( + ctx upscaleCtx, + actualStatefulSets sset.StatefulSetList, + expectedResources nodespec.ResourcesList, +) (nodespec.ResourcesList, error) { + adjustedResources := make(nodespec.ResourcesList, 0, len(expectedResources)) + for _, nodeSpecRes := range expectedResources { + adjustedSset, err := adjustStatefulSetReplicas(ctx, actualStatefulSets, *nodeSpecRes.StatefulSet.DeepCopy()) + if err != nil { + return nil, err + } + nodeSpecRes.StatefulSet = adjustedSset + adjustedResources = append(adjustedResources, nodeSpecRes) + } + // adapt resources configuration to match adjusted replicas + if err := adjustZenConfig(ctx.es, adjustedResources); err != nil { + return nil, err + } + return adjustedResources, nil +} + +func adjustZenConfig(es v1alpha1.Elasticsearch, resources nodespec.ResourcesList) error { + // patch configs to consider zen1 minimum master nodes + if err := zen1.SetupMinimumMasterNodesConfig(resources); err != nil { + return err + } + // patch configs to consider zen2 initial master nodes if cluster is not bootstrapped yet + if !AnnotatedForBootstrap(es) { + if err := zen2.SetupInitialMasterNodes(resources); err != nil { return err } } return nil } +func adjustStatefulSetReplicas( + ctx upscaleCtx, + actualStatefulSets sset.StatefulSetList, + expected appsv1.StatefulSet, +) (appsv1.StatefulSet, error) { + actual, alreadyExists := actualStatefulSets.GetByName(expected.Name) + if alreadyExists { + expected = adaptForExistingStatefulSet(actual, expected) + } + if alreadyExists && isReplicaIncrease(actual, expected) { + upscaleState, err := ctx.upscaleStateBuilder.InitOnce(ctx.k8sClient, ctx.es, ctx.esState) + if err != nil { + return appsv1.StatefulSet{}, err + } + expected = upscaleState.limitMasterNodesCreation(actualStatefulSets, expected) + } + return expected, nil +} + +// isReplicaIncrease returns true if expected replicas are higher than actual replicas. +func isReplicaIncrease(actual appsv1.StatefulSet, expected appsv1.StatefulSet) bool { + return sset.GetReplicas(expected) > sset.GetReplicas(actual) +} + // adaptForExistingStatefulSet modifies ssetToApply to account for the existing StatefulSet. // It avoids triggering downscales (done later), and makes sure new pods are created with the newest revision. func adaptForExistingStatefulSet(actualSset appsv1.StatefulSet, ssetToApply appsv1.StatefulSet) appsv1.StatefulSet { diff --git a/pkg/controller/elasticsearch/driver/upscale_test.go b/pkg/controller/elasticsearch/driver/upscale_test.go index e7d6b1c8f9..5be013b1e4 100644 --- a/pkg/controller/elasticsearch/driver/upscale_test.go +++ b/pkg/controller/elasticsearch/driver/upscale_test.go @@ -5,6 +5,8 @@ package driver import ( + "reflect" + "sync" "testing" "github.com/stretchr/testify/require" @@ -19,15 +21,30 @@ import ( "github.com/elastic/cloud-on-k8s/pkg/controller/common" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/name" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/nodespec" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/observer" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/settings" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset" "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" ) +var onceDone = &sync.Once{} + +func init() { + onceDone.Do(func() {}) +} + func TestHandleUpscaleAndSpecChanges(t *testing.T) { require.NoError(t, v1alpha1.AddToScheme(scheme.Scheme)) k8sClient := k8s.WrapClient(fake.NewFakeClient()) es := v1alpha1.Elasticsearch{ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "es"}} + ctx := upscaleCtx{ + k8sClient: k8sClient, + es: es, + scheme: scheme.Scheme, + observedState: observer.State{}, + esState: nil, + upscaleStateBuilder: &upscaleStateBuilder{}, + } expectedResources := nodespec.ResourcesList{ { StatefulSet: appsv1.StatefulSet{ @@ -81,7 +98,7 @@ func TestHandleUpscaleAndSpecChanges(t *testing.T) { // when no StatefulSets already exists actualStatefulSets := sset.StatefulSetList{} - err := HandleUpscaleAndSpecChanges(k8sClient, es, scheme.Scheme, expectedResources, actualStatefulSets) + err := HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResources) require.NoError(t, err) // StatefulSets should be created with their expected replicas var sset1 appsv1.StatefulSet @@ -100,7 +117,7 @@ func TestHandleUpscaleAndSpecChanges(t *testing.T) { // upscale data nodes actualStatefulSets = sset.StatefulSetList{sset1, sset2} expectedResources[1].StatefulSet.Spec.Replicas = common.Int32(10) - err = HandleUpscaleAndSpecChanges(k8sClient, es, scheme.Scheme, expectedResources, actualStatefulSets) + err = HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResources) require.NoError(t, err) require.NoError(t, k8sClient.Get(types.NamespacedName{Namespace: "ns", Name: "sset2"}, &sset2)) require.Equal(t, common.Int32(10), sset2.Spec.Replicas) @@ -110,7 +127,7 @@ func TestHandleUpscaleAndSpecChanges(t *testing.T) { // apply a spec change actualStatefulSets = sset.StatefulSetList{sset1, sset2} expectedResources[1].StatefulSet.Spec.Template.Labels = map[string]string{"a": "b"} - err = HandleUpscaleAndSpecChanges(k8sClient, es, scheme.Scheme, expectedResources, actualStatefulSets) + err = HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResources) require.NoError(t, err) require.NoError(t, k8sClient.Get(types.NamespacedName{Namespace: "ns", Name: "sset2"}, &sset2)) require.Equal(t, "b", sset2.Spec.Template.Labels["a"]) @@ -121,7 +138,7 @@ func TestHandleUpscaleAndSpecChanges(t *testing.T) { actualStatefulSets = sset.StatefulSetList{sset1, sset2} expectedResources[1].StatefulSet.Spec.Replicas = common.Int32(2) expectedResources[1].StatefulSet.Spec.Template.Labels = map[string]string{"a": "c"} - err = HandleUpscaleAndSpecChanges(k8sClient, es, scheme.Scheme, expectedResources, actualStatefulSets) + err = HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResources) require.NoError(t, err) require.NoError(t, k8sClient.Get(types.NamespacedName{Namespace: "ns", Name: "sset2"}, &sset2)) // spec should be updated @@ -131,3 +148,238 @@ func TestHandleUpscaleAndSpecChanges(t *testing.T) { // partition should be updated to 10 so current pods don't get rotated automatically require.Equal(t, common.Int32(10), sset2.Spec.UpdateStrategy.RollingUpdate.Partition) } + +func Test_adaptForExistingStatefulSet(t *testing.T) { + tests := []struct { + name string + actualSset appsv1.StatefulSet + ssetToApply appsv1.StatefulSet + want appsv1.StatefulSet + }{ + { + name: "nothing to do", + actualSset: sset.TestSset{Replicas: 3, Partition: 3}.Build(), + ssetToApply: sset.TestSset{Replicas: 3}.Build(), + want: sset.TestSset{Replicas: 3, Partition: 3}.Build(), + }, + { + name: "upscale: set Partition to the actual replicas", + actualSset: sset.TestSset{Replicas: 3, Partition: 1}.Build(), + ssetToApply: sset.TestSset{Replicas: 10}.Build(), + want: sset.TestSset{Replicas: 10, Partition: 3}.Build(), + }, + { + name: "downscale: set replicas to the actual replicas", + actualSset: sset.TestSset{Replicas: 3, Partition: 1}.Build(), + ssetToApply: sset.TestSset{Replicas: 1}.Build(), + want: sset.TestSset{Replicas: 3, Partition: 3}.Build(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := adaptForExistingStatefulSet(tt.actualSset, tt.ssetToApply); !reflect.DeepEqual(got, tt.want) { + t.Errorf("adaptForExistingStatefulSet() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_isReplicaIncrease(t *testing.T) { + tests := []struct { + name string + actual appsv1.StatefulSet + expected appsv1.StatefulSet + want bool + }{ + { + name: "increase", + actual: sset.TestSset{Replicas: 3}.Build(), + expected: sset.TestSset{Replicas: 5}.Build(), + want: true, + }, + { + name: "decrease", + actual: sset.TestSset{Replicas: 5}.Build(), + expected: sset.TestSset{Replicas: 3}.Build(), + want: false, + }, + { + name: "same value", + actual: sset.TestSset{Replicas: 3}.Build(), + expected: sset.TestSset{Replicas: 3}.Build(), + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := isReplicaIncrease(tt.actual, tt.expected); got != tt.want { + t.Errorf("isReplicaIncrease() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_adjustStatefulSetReplicas(t *testing.T) { + type args struct { + ctx upscaleCtx + actualStatefulSets sset.StatefulSetList + expected appsv1.StatefulSet + } + tests := []struct { + name string + args args + want appsv1.StatefulSet + }{ + { + name: "new StatefulSet to create", + args: args{ + actualStatefulSets: sset.StatefulSetList{}, + expected: sset.TestSset{Name: "new-sset", Replicas: 3}.Build(), + }, + want: sset.TestSset{Name: "new-sset", Replicas: 3}.Build(), + }, + { + name: "same StatefulSet already exists", + args: args{ + actualStatefulSets: sset.StatefulSetList{sset.TestSset{Name: "sset", Replicas: 3}.Build()}, + expected: sset.TestSset{Name: "sset", Replicas: 3}.Build(), + }, + want: sset.TestSset{Name: "sset", Replicas: 3, Partition: 3}.Build(), + }, + { + name: "downscale case", + args: args{ + actualStatefulSets: sset.StatefulSetList{sset.TestSset{Name: "sset", Replicas: 3, Partition: 2}.Build()}, + expected: sset.TestSset{Name: "sset", Replicas: 1}.Build(), + }, + want: sset.TestSset{Name: "sset", Replicas: 3, Partition: 3}.Build(), + }, + { + name: "upscale case: data nodes", + args: args{ + ctx: upscaleCtx{ + upscaleStateBuilder: &upscaleStateBuilder{ + once: onceDone, + upscaleState: &upscaleState{isBootstrapped: true, allowMasterCreation: false}, + }, + }, + actualStatefulSets: sset.StatefulSetList{sset.TestSset{Name: "sset", Replicas: 3, Partition: 2, Master: false, Data: true}.Build()}, + expected: sset.TestSset{Name: "sset", Replicas: 5, Master: false, Data: true}.Build(), + }, + want: sset.TestSset{Name: "sset", Replicas: 5, Partition: 3, Master: false, Data: true}.Build(), + }, + { + name: "upscale case: master nodes - one by one", + args: args{ + ctx: upscaleCtx{ + upscaleStateBuilder: &upscaleStateBuilder{ + once: onceDone, + upscaleState: &upscaleState{isBootstrapped: true, allowMasterCreation: true}, + }, + }, + actualStatefulSets: sset.StatefulSetList{sset.TestSset{Name: "sset", Replicas: 3, Partition: 2, Master: true, Data: true}.Build()}, + expected: sset.TestSset{Name: "sset", Replicas: 5, Master: true, Data: true}.Build(), + }, + want: sset.TestSset{Name: "sset", Replicas: 4, Partition: 3, Master: true, Data: true}.Build(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := adjustStatefulSetReplicas(tt.args.ctx, tt.args.actualStatefulSets, tt.args.expected) + require.NoError(t, err) + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("adjustStatefulSetReplicas() got = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_adjustZenConfig(t *testing.T) { + bootstrappedES := v1alpha1.Elasticsearch{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{ClusterUUIDAnnotationName: "uuid"}}} + notBootstrappedES := v1alpha1.Elasticsearch{} + + tests := []struct { + name string + es v1alpha1.Elasticsearch + resources nodespec.ResourcesList + wantMinimumMasterNodesSet bool + wantInitialMasterNodesSet bool + }{ + { + name: "adjust zen1 minimum_master_nodes", + es: bootstrappedES, + resources: nodespec.ResourcesList{ + { + StatefulSet: sset.TestSset{Version: "6.8.0", Replicas: 3, Master: true, Data: true}.Build(), + Config: settings.NewCanonicalConfig(), + }, + }, + wantMinimumMasterNodesSet: true, + wantInitialMasterNodesSet: false, + }, + { + name: "adjust zen2 initial master nodes when cluster is not bootstrapped yet", + es: notBootstrappedES, + resources: nodespec.ResourcesList{ + { + StatefulSet: sset.TestSset{Version: "7.2.0", Replicas: 3, Master: true, Data: true}.Build(), + Config: settings.NewCanonicalConfig(), + }, + }, + wantMinimumMasterNodesSet: false, + wantInitialMasterNodesSet: true, + }, + { + name: "don't adjust zen2 initial master nodes when cluster is already bootstrapped", + es: bootstrappedES, + resources: nodespec.ResourcesList{ + { + StatefulSet: sset.TestSset{Version: "7.2.0", Replicas: 3, Master: true, Data: true}.Build(), + Config: settings.NewCanonicalConfig(), + }, + }, + wantMinimumMasterNodesSet: false, + wantInitialMasterNodesSet: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := adjustZenConfig(tt.es, tt.resources) + require.NoError(t, err) + for _, res := range tt.resources { + hasMinimumMasterNodes := len(res.Config.HasKeys([]string{settings.DiscoveryZenMinimumMasterNodes})) > 0 + require.Equal(t, tt.wantMinimumMasterNodesSet, hasMinimumMasterNodes) + hasInitialMasterNodes := len(res.Config.HasKeys([]string{settings.ClusterInitialMasterNodes})) > 0 + require.Equal(t, tt.wantInitialMasterNodesSet, hasInitialMasterNodes) + } + }) + } +} + +func Test_adjustResources(t *testing.T) { + actualSset := sset.StatefulSetList{sset.TestSset{Version: "6.8.0", Replicas: 3, Master: true, Partition: 2}.Build()} + resources := nodespec.ResourcesList{ + { + // upscale to 10 replicas + StatefulSet: sset.TestSset{Version: "6.8.0", Replicas: 10, Master: true, Partition: 2}.Build(), + Config: settings.NewCanonicalConfig(), + }, + } + upscaleCtx := upscaleCtx{ + upscaleStateBuilder: &upscaleStateBuilder{ + once: onceDone, + upscaleState: &upscaleState{isBootstrapped: true, allowMasterCreation: true}, + }, + } + adjusted, err := adjustResources(upscaleCtx, actualSset, resources) + require.NoError(t, err) + + // should have added one more master + expectedSset := sset.TestSset{Version: "6.8.0", Replicas: 4, Master: true, Partition: 3}.Build() + require.Equal(t, expectedSset, adjusted.StatefulSets()[0]) + // and set minimum master nodes + require.NotEmpty(t, adjusted[0].Config.HasKeys([]string{settings.DiscoveryZenMinimumMasterNodes})) + + // original sset should be kept unmodified + require.Equal(t, sset.TestSset{Version: "6.8.0", Replicas: 10, Master: true, Partition: 2}.Build(), resources[0].StatefulSet) +}