From d405a127b4e3d6b6ed9172064129beff121681a2 Mon Sep 17 00:00:00 2001 From: sebgl Date: Fri, 18 Oct 2019 09:45:00 +0200 Subject: [PATCH 1/3] Perform forced rolling upgrade even if ES is reachable There are cases where Elasticsearch is reachable (some Pods are Ready), but cannot respond to any requests. For example, if there is 1/2 master nodes available. See https://github.com/elastic/cloud-on-k8s/issues/1847. In such case, the bootlooping/pending 2nd master node will stay stuck forever since we will never reach the force upgrade part of the reconciliation. This commit fixes it by running force upgrades (if required) right after the upscale/spec change phase. This force upgrade phase becomes the new "Step 2". Following steps (downscale and regular upgrade) require the Elasticsearch cluster to be reachable. Due to how this force rolling upgrade deletes some pods and set some expectations, I chose to requeue immediately if it was attempted. This way we don't continue the reconciliation based on a transient state that would require us re-checking expectations. The next reconciliation can be a "regular" one. I think this also tends to simplify a bit the general logic: we first do everything that does not require the ES API (steps 1 and 2), then move on with downscales and standard rolling upgrades if ES is reachable (steps 3 and 4); instead of passing an `esReachable` bool around. --- pkg/controller/elasticsearch/driver/nodes.go | 82 ++++++++++--------- .../elasticsearch/driver/upgrade.go | 17 ---- .../elasticsearch/driver/upgrade_forced.go | 18 +++- .../driver/upgrade_forced_test.go | 4 +- 4 files changed, 63 insertions(+), 58 deletions(-) diff --git a/pkg/controller/elasticsearch/driver/nodes.go b/pkg/controller/elasticsearch/driver/nodes.go index 0235401dfd..af0340583a 100644 --- a/pkg/controller/elasticsearch/driver/nodes.go +++ b/pkg/controller/elasticsearch/driver/nodes.go @@ -79,49 +79,57 @@ func (d *defaultDriver) reconcileNodeSpecs( return results.WithError(err) } - if esReachable { - // Update Zen1 minimum master nodes through the API, corresponding to the current nodes we have. - requeue, err := zen1.UpdateMinimumMasterNodes(d.Client, d.ES, esClient, actualStatefulSets) - if err != nil { - return results.WithError(err) - } - if requeue { - results.WithResult(defaultRequeue) - } - // Maybe clear zen2 voting config exclusions. - requeue, err = zen2.ClearVotingConfigExclusions(d.ES, d.Client, esClient, actualStatefulSets) - if err != nil { - return results.WithError(err) - } - if requeue { - results.WithResult(defaultRequeue) - } + // Phase 2: if there is any Pending or bootlooping Pod to upgrade, do it. + attempted, err := d.MaybeForceUpgrade(actualStatefulSets) + if err != nil || attempted { // if attempted, we're in a transient state where it's safer to requeue + reconcileState.UpdateElasticsearchApplyingChanges(resourcesState.CurrentPods) + return results.WithError(err) + } - // Phase 2: handle sset scale down. - // We want to safely remove nodes from the cluster, either because the sset requires less replicas, - // or because it should be removed entirely. - downscaleCtx := newDownscaleContext( - d.Client, - esClient, - resourcesState, - observedState, - reconcileState, - d.Expectations, - d.ES, - ) - downscaleRes := HandleDownscale(downscaleCtx, expectedResources.StatefulSets(), actualStatefulSets) - results.WithResults(downscaleRes) - if downscaleRes.HasError() { - return results - } - } else { - // ES cannot be reached right now, let's make sure we requeue. + // Next operations require the Elasticsearch API to be available. + if !esReachable { + log.Info("ES cannot be reached yet, re-queuing", "namespace", d.ES.Namespace, "es_name", d.ES.Name) reconcileState.UpdateElasticsearchApplyingChanges(resourcesState.CurrentPods) + return results.WithResult(defaultRequeue) + } + + // Update Zen1 minimum master nodes through the API, corresponding to the current nodes we have. + requeue, err := zen1.UpdateMinimumMasterNodes(d.Client, d.ES, esClient, actualStatefulSets) + if err != nil { + return results.WithError(err) + } + if requeue { results.WithResult(defaultRequeue) } + // Maybe clear zen2 voting config exclusions. + requeue, err = zen2.ClearVotingConfigExclusions(d.ES, d.Client, esClient, actualStatefulSets) + if err != nil { + return results.WithError(err) + } + if requeue { + results.WithResult(defaultRequeue) + } + + // Phase 2: handle sset scale down. + // We want to safely remove nodes from the cluster, either because the sset requires less replicas, + // or because it should be removed entirely. + downscaleCtx := newDownscaleContext( + d.Client, + esClient, + resourcesState, + observedState, + reconcileState, + d.Expectations, + d.ES, + ) + downscaleRes := HandleDownscale(downscaleCtx, expectedResources.StatefulSets(), actualStatefulSets) + results.WithResults(downscaleRes) + if downscaleRes.HasError() { + return results + } // Phase 3: handle rolling upgrades. - rollingUpgradesRes := d.handleRollingUpgrades(esClient, esReachable, esState, actualStatefulSets, expectedResources.MasterNodesNames()) + rollingUpgradesRes := d.handleRollingUpgrades(esClient, esState, actualStatefulSets, expectedResources.MasterNodesNames()) results.WithResults(rollingUpgradesRes) if rollingUpgradesRes.HasError() { return results diff --git a/pkg/controller/elasticsearch/driver/upgrade.go b/pkg/controller/elasticsearch/driver/upgrade.go index bd9edfa8fb..aaf7f49ef4 100644 --- a/pkg/controller/elasticsearch/driver/upgrade.go +++ b/pkg/controller/elasticsearch/driver/upgrade.go @@ -21,7 +21,6 @@ import ( func (d *defaultDriver) handleRollingUpgrades( esClient esclient.Client, - esReachable bool, esState ESState, statefulSets sset.StatefulSetList, expectedMaster []string, @@ -44,27 +43,11 @@ func (d *defaultDriver) handleRollingUpgrades( if err != nil { return results.WithError(err) } - actualPods, err := statefulSets.GetActualPods(d.Client) - if err != nil { - return results.WithError(err) - } - - // Maybe force upgrade all Pods, bypassing any safety check and ES interaction. - if forced, err := d.maybeForceUpgrade(actualPods, podsToUpgrade); err != nil || forced { - return results.WithError(err) - } - - if !esReachable { - // Cannot move on with rolling upgrades if ES cannot be reached. - return results.WithResult(defaultRequeue) - } - // Get the healthy Pods (from a K8S point of view + in the ES cluster) healthyPods, err := healthyPods(d.Client, statefulSets, esState) if err != nil { return results.WithError(err) } - // Get current masters actualMasters, err := sset.GetActualMastersForCluster(d.Client, d.ES) if err != nil { diff --git a/pkg/controller/elasticsearch/driver/upgrade_forced.go b/pkg/controller/elasticsearch/driver/upgrade_forced.go index c35895caa7..1a95e2d9f4 100644 --- a/pkg/controller/elasticsearch/driver/upgrade_forced.go +++ b/pkg/controller/elasticsearch/driver/upgrade_forced.go @@ -11,13 +11,27 @@ import ( "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1beta1" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/label" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset" "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" ) -// maybeForceUpgrade may attempt a forced upgrade of all podsToUpgrade if allowed to, +func (d *defaultDriver) MaybeForceUpgrade(statefulSets sset.StatefulSetList) (bool, error) { + // Get the pods to upgrade + podsToUpgrade, err := podsToUpgrade(d.Client, statefulSets) + if err != nil { + return false, err + } + actualPods, err := statefulSets.GetActualPods(d.Client) + if err != nil { + return false, err + } + return d.maybeForceUpgradePods(actualPods, podsToUpgrade) +} + +// maybeForceUpgradePods may attempt a forced upgrade of all podsToUpgrade if allowed to, // in order to unlock situations where the reconciliation may otherwise be stuck // (eg. no cluster formed, all nodes have a bad spec). -func (d *defaultDriver) maybeForceUpgrade(actualPods []corev1.Pod, podsToUpgrade []corev1.Pod) (attempted bool, err error) { +func (d *defaultDriver) maybeForceUpgradePods(actualPods []corev1.Pod, podsToUpgrade []corev1.Pod) (attempted bool, err error) { actualBySset := podsByStatefulSetName(actualPods) toUpgradeBySset := podsByStatefulSetName(podsToUpgrade) diff --git a/pkg/controller/elasticsearch/driver/upgrade_forced_test.go b/pkg/controller/elasticsearch/driver/upgrade_forced_test.go index 6c2ac7c8c8..e2f4543654 100644 --- a/pkg/controller/elasticsearch/driver/upgrade_forced_test.go +++ b/pkg/controller/elasticsearch/driver/upgrade_forced_test.go @@ -19,7 +19,7 @@ import ( corev1 "k8s.io/api/core/v1" ) -func Test_defaultDriver_maybeForceUpgrade(t *testing.T) { +func Test_defaultDriver_maybeForceUpgradePods(t *testing.T) { tests := []struct { name string actualPods []corev1.Pod @@ -172,7 +172,7 @@ func Test_defaultDriver_maybeForceUpgrade(t *testing.T) { }, } - attempted, err := d.maybeForceUpgrade(tt.actualPods, tt.podsToUpgrade) + attempted, err := d.maybeForceUpgradePods(tt.actualPods, tt.podsToUpgrade) require.NoError(t, err) require.Equal(t, tt.wantAttempted, attempted) var pods corev1.PodList From 0e63149848150cc827cb96f37922fe9b3f7d7424 Mon Sep 17 00:00:00 2001 From: sebgl Date: Fri, 18 Oct 2019 10:20:21 +0200 Subject: [PATCH 2/3] Modify e2e test to cover the es reachable case --- test/e2e/es/forced_upgrade_test.go | 109 +++++++++++------- .../elasticsearch/steps_forced_upgrade.go | 8 +- 2 files changed, 73 insertions(+), 44 deletions(-) diff --git a/test/e2e/es/forced_upgrade_test.go b/test/e2e/es/forced_upgrade_test.go index c377ad3940..6d65437d9c 100644 --- a/test/e2e/es/forced_upgrade_test.go +++ b/test/e2e/es/forced_upgrade_test.go @@ -5,10 +5,12 @@ package es import ( + "errors" "fmt" "testing" "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1beta1" + estype "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1beta1" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/label" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/name" @@ -34,7 +36,7 @@ func TestForceUpgradePendingPods(t *testing.T) { k, initial, // wait for all initial Pods to be Pending - elasticsearch.CheckESPodsPending(initial, k), + []test.Step{elasticsearch.CheckESPodsPending(initial, k)}, fixed, ).RunSequential(t) } @@ -43,13 +45,23 @@ func TestForceUpgradePendingPodsInOneStatefulSet(t *testing.T) { // create a cluster in which one StatefulSet is OK, // and the second one will have Pods that stay Pending forever initial := elasticsearch.NewBuilder("force-upgrade-pending-sset"). - WithESMasterDataNodes(1, elasticsearch.DefaultResources). - WithESDataNodes(2, elasticsearch.DefaultResources) + WithNodeSet(estype.NodeSet{ + Name: "ok", + Count: 1, + PodTemplate: elasticsearch.ESPodTemplate(elasticsearch.DefaultResources), + }). + WithNodeSet(estype.NodeSet{ + Name: "pending", + Count: 1, + PodTemplate: elasticsearch.ESPodTemplate(elasticsearch.DefaultResources), + }) + // make Pods of the 2nds NodeSet pending initial.Elasticsearch.Spec.NodeSets[1].PodTemplate.Spec.NodeSelector = map[string]string{ "cannot": "be-scheduled", } - // fix that cluster to remove the wrong NodeSelector + + // eventually fix that cluster to remove the wrong NodeSelector fixed := elasticsearch.Builder{} fixed.Elasticsearch = *initial.Elasticsearch.DeepCopy() fixed.Elasticsearch.Spec.NodeSets[1].PodTemplate.Spec.NodeSelector = nil @@ -58,28 +70,43 @@ func TestForceUpgradePendingPodsInOneStatefulSet(t *testing.T) { elasticsearch.ForcedUpgradeTestSteps( k, initial, - test.Step{ - Name: "Wait for Pods of the first StatefulSet to be running, and second StatefulSet to be Pending", - Test: test.Eventually(func() error { - pendingSset := name.StatefulSet(initial.Elasticsearch.Name, initial.Elasticsearch.Spec.NodeSets[1].Name) - pods, err := k.GetPods(test.ESPodListOptions(initial.Elasticsearch.Namespace, initial.Elasticsearch.Name)...) - if err != nil { - return err - } - if int32(len(pods)) != initial.Elasticsearch.Spec.NodeCount() { - return fmt.Errorf("expected %d pods, got %d", len(pods), initial.Elasticsearch.Spec.NodeCount()) - } - for _, p := range pods { - expectedPhase := corev1.PodRunning - if p.Labels[label.StatefulSetNameLabelName] == pendingSset { - expectedPhase = corev1.PodPending + []test.Step{ + { + Name: "Wait for Pods of the first StatefulSet to be running, and second StatefulSet to be Pending", + Test: test.Eventually(func() error { + pendingSset := name.StatefulSet(initial.Elasticsearch.Name, initial.Elasticsearch.Spec.NodeSets[1].Name) + pods, err := k.GetPods(test.ESPodListOptions(initial.Elasticsearch.Namespace, initial.Elasticsearch.Name)...) + if err != nil { + return err + } + if int32(len(pods)) != initial.Elasticsearch.Spec.NodeCount() { + return fmt.Errorf("expected %d pods, got %d", len(pods), initial.Elasticsearch.Spec.NodeCount()) + } + for _, p := range pods { + expectedPhase := corev1.PodRunning + if p.Labels[label.StatefulSetNameLabelName] == pendingSset { + expectedPhase = corev1.PodPending + } + if p.Status.Phase != expectedPhase { + return fmt.Errorf("pod %s not %s", p.Name, expectedPhase) + } + } + return nil + }), + }, + { + Name: "Wait for the ES service to have endpoints and become technically reachable", + Test: test.Eventually(func() error { + endpoints, err := k.GetEndpoints(initial.Elasticsearch.Namespace, name.HTTPService(initial.Elasticsearch.Name)) + if err != nil { + return err } - if p.Status.Phase != expectedPhase { - return fmt.Errorf("pod %s not %s", p.Name, expectedPhase) + if len(endpoints.Subsets) == 0 || len(endpoints.Subsets[0].Addresses) == 0 { + return errors.New("elasticsearch HTTP service does not have endpoint") } - } - return nil - }), + return nil + }), + }, }, fixed, ).RunSequential(t) @@ -103,23 +130,25 @@ func TestForceUpgradeBootloopingPods(t *testing.T) { k, initial, // wait for Pods to restart due to wrong config - elasticsearch.CheckPodsCondition( - initial, - k, - "Pods should have restarted at least once due to wrong ES config", - func(p corev1.Pod) error { - for _, containerStatus := range p.Status.ContainerStatuses { - if containerStatus.Name != v1beta1.ElasticsearchContainerName { - continue - } - if containerStatus.RestartCount < 1 { - return fmt.Errorf("container not restarted yet") + []test.Step{ + elasticsearch.CheckPodsCondition( + initial, + k, + "Pods should have restarted at least once due to wrong ES config", + func(p corev1.Pod) error { + for _, containerStatus := range p.Status.ContainerStatuses { + if containerStatus.Name != v1beta1.ElasticsearchContainerName { + continue + } + if containerStatus.RestartCount < 1 { + return fmt.Errorf("container not restarted yet") + } + return nil } - return nil - } - return fmt.Errorf("container %s not found in pod %s", v1beta1.ElasticsearchContainerName, p.Name) - }, - ), + return fmt.Errorf("container %s not found in pod %s", v1beta1.ElasticsearchContainerName, p.Name) + }, + ), + }, fixed, ).RunSequential(t) } diff --git a/test/e2e/test/elasticsearch/steps_forced_upgrade.go b/test/e2e/test/elasticsearch/steps_forced_upgrade.go index 1ae0445262..8e2542c86c 100644 --- a/test/e2e/test/elasticsearch/steps_forced_upgrade.go +++ b/test/e2e/test/elasticsearch/steps_forced_upgrade.go @@ -8,15 +8,15 @@ import ( "github.com/elastic/cloud-on-k8s/test/e2e/test" ) -// ForcedUpgradeTestSteps creates the initial cluster that is not expected to run, wait for condition to be reached, +// ForcedUpgradeTestSteps creates the initial cluster that is not expected to run, wait for conditions to be met, // then mutates it to the fixed cluster, that is expected to become healthy. -func ForcedUpgradeTestSteps(k *test.K8sClient, initial Builder, condition test.Step, fixed Builder) test.StepList { +func ForcedUpgradeTestSteps(k *test.K8sClient, initial Builder, conditions []test.Step, fixed Builder) test.StepList { return test.StepList{}. // create the initial (failing) cluster WithSteps(initial.InitTestSteps(k)). WithSteps(initial.CreationTestSteps(k)). - // wait for condition to be met - WithStep(condition). + // wait for conditions to be met + WithSteps(conditions). // apply the fixed Elasticsearch resource WithSteps(fixed.UpgradeTestSteps(k)). // ensure the cluster eventually becomes healthy From 91c09a4647b3d6f2472e6bec9c136e3a9d29ad47 Mon Sep 17 00:00:00 2001 From: sebgl Date: Thu, 24 Oct 2019 10:42:05 +0200 Subject: [PATCH 3/3] Improve comment --- pkg/controller/elasticsearch/driver/nodes.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/controller/elasticsearch/driver/nodes.go b/pkg/controller/elasticsearch/driver/nodes.go index af0340583a..48dfb09c66 100644 --- a/pkg/controller/elasticsearch/driver/nodes.go +++ b/pkg/controller/elasticsearch/driver/nodes.go @@ -81,7 +81,10 @@ func (d *defaultDriver) reconcileNodeSpecs( // Phase 2: if there is any Pending or bootlooping Pod to upgrade, do it. attempted, err := d.MaybeForceUpgrade(actualStatefulSets) - if err != nil || attempted { // if attempted, we're in a transient state where it's safer to requeue + if err != nil || attempted { + // If attempted, we're in a transient state where it's safer to requeue. + // We don't want to re-upgrade in a regular way the pods we just force-upgraded. + // Next reconciliation will check expectations again. reconcileState.UpdateElasticsearchApplyingChanges(resourcesState.CurrentPods) return results.WithError(err) }