Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform forced rolling upgrade even if ES is reachable #2022

Merged
merged 4 commits into from
Oct 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 48 additions & 37 deletions pkg/controller/elasticsearch/driver/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,49 +79,60 @@ func (d *defaultDriver) reconcileNodeSpecs(
return results.WithError(err)
}

if esReachable {
// Update Zen1 minimum master nodes through the API, corresponding to the current nodes we have.
requeue, err := zen1.UpdateMinimumMasterNodes(d.Client, d.ES, esClient, actualStatefulSets)
if err != nil {
return results.WithError(err)
}
if requeue {
results.WithResult(defaultRequeue)
}
// Maybe clear zen2 voting config exclusions.
requeue, err = zen2.ClearVotingConfigExclusions(d.ES, d.Client, esClient, actualStatefulSets)
if err != nil {
return results.WithError(err)
}
if requeue {
results.WithResult(defaultRequeue)
}
// Phase 2: if there is any Pending or bootlooping Pod to upgrade, do it.
attempted, err := d.MaybeForceUpgrade(actualStatefulSets)
if err != nil || attempted {
// If attempted, we're in a transient state where it's safer to requeue.
// We don't want to re-upgrade in a regular way the pods we just force-upgraded.
// Next reconciliation will check expectations again.
reconcileState.UpdateElasticsearchApplyingChanges(resourcesState.CurrentPods)
return results.WithError(err)
}

// Phase 2: handle sset scale down.
// We want to safely remove nodes from the cluster, either because the sset requires less replicas,
// or because it should be removed entirely.
downscaleCtx := newDownscaleContext(
d.Client,
esClient,
resourcesState,
observedState,
reconcileState,
d.Expectations,
d.ES,
)
downscaleRes := HandleDownscale(downscaleCtx, expectedResources.StatefulSets(), actualStatefulSets)
results.WithResults(downscaleRes)
if downscaleRes.HasError() {
return results
}
} else {
// ES cannot be reached right now, let's make sure we requeue.
// Next operations require the Elasticsearch API to be available.
if !esReachable {
log.Info("ES cannot be reached yet, re-queuing", "namespace", d.ES.Namespace, "es_name", d.ES.Name)
reconcileState.UpdateElasticsearchApplyingChanges(resourcesState.CurrentPods)
return results.WithResult(defaultRequeue)
}

// Update Zen1 minimum master nodes through the API, corresponding to the current nodes we have.
requeue, err := zen1.UpdateMinimumMasterNodes(d.Client, d.ES, esClient, actualStatefulSets)
if err != nil {
return results.WithError(err)
}
if requeue {
results.WithResult(defaultRequeue)
}
// Maybe clear zen2 voting config exclusions.
requeue, err = zen2.ClearVotingConfigExclusions(d.ES, d.Client, esClient, actualStatefulSets)
if err != nil {
return results.WithError(err)
}
if requeue {
results.WithResult(defaultRequeue)
}

// Phase 2: handle sset scale down.
// We want to safely remove nodes from the cluster, either because the sset requires less replicas,
// or because it should be removed entirely.
downscaleCtx := newDownscaleContext(
d.Client,
esClient,
resourcesState,
observedState,
reconcileState,
d.Expectations,
d.ES,
)
downscaleRes := HandleDownscale(downscaleCtx, expectedResources.StatefulSets(), actualStatefulSets)
results.WithResults(downscaleRes)
if downscaleRes.HasError() {
return results
}

// Phase 3: handle rolling upgrades.
rollingUpgradesRes := d.handleRollingUpgrades(esClient, esReachable, esState, expectedResources.MasterNodesNames())
rollingUpgradesRes := d.handleRollingUpgrades(esClient, esState, expectedResources.MasterNodesNames())
results.WithResults(rollingUpgradesRes)
if rollingUpgradesRes.HasError() {
return results
Expand Down
17 changes: 0 additions & 17 deletions pkg/controller/elasticsearch/driver/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

func (d *defaultDriver) handleRollingUpgrades(
esClient esclient.Client,
esReachable bool,
esState ESState,
expectedMaster []string,
) *reconciler.Results {
Expand All @@ -47,27 +46,11 @@ func (d *defaultDriver) handleRollingUpgrades(
if err != nil {
return results.WithError(err)
}
actualPods, err := statefulSets.GetActualPods(d.Client)
if err != nil {
return results.WithError(err)
}

// Maybe force upgrade all Pods, bypassing any safety check and ES interaction.
if forced, err := d.maybeForceUpgrade(actualPods, podsToUpgrade); err != nil || forced {
return results.WithError(err)
}

if !esReachable {
// Cannot move on with rolling upgrades if ES cannot be reached.
return results.WithResult(defaultRequeue)
}

// Get the healthy Pods (from a K8S point of view + in the ES cluster)
healthyPods, err := healthyPods(d.Client, statefulSets, esState)
if err != nil {
return results.WithError(err)
}

// Get current masters
actualMasters, err := sset.GetActualMastersForCluster(d.Client, d.ES)
if err != nil {
Expand Down
18 changes: 16 additions & 2 deletions pkg/controller/elasticsearch/driver/upgrade_forced.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,27 @@ import (

"github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1beta1"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/label"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset"
"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"
)

// maybeForceUpgrade may attempt a forced upgrade of all podsToUpgrade if allowed to,
func (d *defaultDriver) MaybeForceUpgrade(statefulSets sset.StatefulSetList) (bool, error) {
thbkrkr marked this conversation as resolved.
Show resolved Hide resolved
// Get the pods to upgrade
podsToUpgrade, err := podsToUpgrade(d.Client, statefulSets)
if err != nil {
return false, err
}
actualPods, err := statefulSets.GetActualPods(d.Client)
if err != nil {
return false, err
}
return d.maybeForceUpgradePods(actualPods, podsToUpgrade)
}

// maybeForceUpgradePods may attempt a forced upgrade of all podsToUpgrade if allowed to,
// in order to unlock situations where the reconciliation may otherwise be stuck
// (eg. no cluster formed, all nodes have a bad spec).
func (d *defaultDriver) maybeForceUpgrade(actualPods []corev1.Pod, podsToUpgrade []corev1.Pod) (attempted bool, err error) {
func (d *defaultDriver) maybeForceUpgradePods(actualPods []corev1.Pod, podsToUpgrade []corev1.Pod) (attempted bool, err error) {
actualBySset := podsByStatefulSetName(actualPods)
toUpgradeBySset := podsByStatefulSetName(podsToUpgrade)

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/elasticsearch/driver/upgrade_forced_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
corev1 "k8s.io/api/core/v1"
)

func Test_defaultDriver_maybeForceUpgrade(t *testing.T) {
func Test_defaultDriver_maybeForceUpgradePods(t *testing.T) {
tests := []struct {
name string
actualPods []corev1.Pod
Expand Down Expand Up @@ -169,7 +169,7 @@ func Test_defaultDriver_maybeForceUpgrade(t *testing.T) {
},
}

attempted, err := d.maybeForceUpgrade(tt.actualPods, tt.podsToUpgrade)
attempted, err := d.maybeForceUpgradePods(tt.actualPods, tt.podsToUpgrade)
require.NoError(t, err)
require.Equal(t, tt.wantAttempted, attempted)
var pods corev1.PodList
Expand Down
109 changes: 69 additions & 40 deletions test/e2e/es/forced_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
package es

import (
"errors"
"fmt"
"testing"

"github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1beta1"
estype "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1beta1"

"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/label"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/name"
Expand All @@ -34,7 +36,7 @@ func TestForceUpgradePendingPods(t *testing.T) {
k,
initial,
// wait for all initial Pods to be Pending
elasticsearch.CheckESPodsPending(initial, k),
[]test.Step{elasticsearch.CheckESPodsPending(initial, k)},
fixed,
).RunSequential(t)
}
Expand All @@ -43,13 +45,23 @@ func TestForceUpgradePendingPodsInOneStatefulSet(t *testing.T) {
// create a cluster in which one StatefulSet is OK,
// and the second one will have Pods that stay Pending forever
initial := elasticsearch.NewBuilder("force-upgrade-pending-sset").
WithESMasterDataNodes(1, elasticsearch.DefaultResources).
WithESDataNodes(2, elasticsearch.DefaultResources)
WithNodeSet(estype.NodeSet{
Name: "ok",
Count: 1,
PodTemplate: elasticsearch.ESPodTemplate(elasticsearch.DefaultResources),
}).
WithNodeSet(estype.NodeSet{
Name: "pending",
Count: 1,
PodTemplate: elasticsearch.ESPodTemplate(elasticsearch.DefaultResources),
})

// make Pods of the 2nds NodeSet pending
initial.Elasticsearch.Spec.NodeSets[1].PodTemplate.Spec.NodeSelector = map[string]string{
"cannot": "be-scheduled",
}
// fix that cluster to remove the wrong NodeSelector

// eventually fix that cluster to remove the wrong NodeSelector
fixed := elasticsearch.Builder{}
fixed.Elasticsearch = *initial.Elasticsearch.DeepCopy()
fixed.Elasticsearch.Spec.NodeSets[1].PodTemplate.Spec.NodeSelector = nil
Expand All @@ -58,28 +70,43 @@ func TestForceUpgradePendingPodsInOneStatefulSet(t *testing.T) {
elasticsearch.ForcedUpgradeTestSteps(
k,
initial,
test.Step{
Name: "Wait for Pods of the first StatefulSet to be running, and second StatefulSet to be Pending",
Test: test.Eventually(func() error {
pendingSset := name.StatefulSet(initial.Elasticsearch.Name, initial.Elasticsearch.Spec.NodeSets[1].Name)
pods, err := k.GetPods(test.ESPodListOptions(initial.Elasticsearch.Namespace, initial.Elasticsearch.Name)...)
if err != nil {
return err
}
if int32(len(pods)) != initial.Elasticsearch.Spec.NodeCount() {
return fmt.Errorf("expected %d pods, got %d", len(pods), initial.Elasticsearch.Spec.NodeCount())
}
for _, p := range pods {
expectedPhase := corev1.PodRunning
if p.Labels[label.StatefulSetNameLabelName] == pendingSset {
expectedPhase = corev1.PodPending
[]test.Step{
{
Name: "Wait for Pods of the first StatefulSet to be running, and second StatefulSet to be Pending",
Test: test.Eventually(func() error {
pendingSset := name.StatefulSet(initial.Elasticsearch.Name, initial.Elasticsearch.Spec.NodeSets[1].Name)
pods, err := k.GetPods(test.ESPodListOptions(initial.Elasticsearch.Namespace, initial.Elasticsearch.Name)...)
if err != nil {
return err
}
if int32(len(pods)) != initial.Elasticsearch.Spec.NodeCount() {
return fmt.Errorf("expected %d pods, got %d", len(pods), initial.Elasticsearch.Spec.NodeCount())
}
for _, p := range pods {
expectedPhase := corev1.PodRunning
if p.Labels[label.StatefulSetNameLabelName] == pendingSset {
expectedPhase = corev1.PodPending
}
if p.Status.Phase != expectedPhase {
return fmt.Errorf("pod %s not %s", p.Name, expectedPhase)
}
}
return nil
}),
},
{
Name: "Wait for the ES service to have endpoints and become technically reachable",
Test: test.Eventually(func() error {
endpoints, err := k.GetEndpoints(initial.Elasticsearch.Namespace, name.HTTPService(initial.Elasticsearch.Name))
if err != nil {
return err
}
if p.Status.Phase != expectedPhase {
return fmt.Errorf("pod %s not %s", p.Name, expectedPhase)
if len(endpoints.Subsets) == 0 || len(endpoints.Subsets[0].Addresses) == 0 {
return errors.New("elasticsearch HTTP service does not have endpoint")
}
}
return nil
}),
return nil
}),
},
},
fixed,
).RunSequential(t)
Expand All @@ -103,23 +130,25 @@ func TestForceUpgradeBootloopingPods(t *testing.T) {
k,
initial,
// wait for Pods to restart due to wrong config
elasticsearch.CheckPodsCondition(
initial,
k,
"Pods should have restarted at least once due to wrong ES config",
func(p corev1.Pod) error {
for _, containerStatus := range p.Status.ContainerStatuses {
if containerStatus.Name != v1beta1.ElasticsearchContainerName {
continue
}
if containerStatus.RestartCount < 1 {
return fmt.Errorf("container not restarted yet")
[]test.Step{
elasticsearch.CheckPodsCondition(
initial,
k,
"Pods should have restarted at least once due to wrong ES config",
func(p corev1.Pod) error {
for _, containerStatus := range p.Status.ContainerStatuses {
if containerStatus.Name != v1beta1.ElasticsearchContainerName {
continue
}
if containerStatus.RestartCount < 1 {
return fmt.Errorf("container not restarted yet")
}
return nil
}
return nil
}
return fmt.Errorf("container %s not found in pod %s", v1beta1.ElasticsearchContainerName, p.Name)
},
),
return fmt.Errorf("container %s not found in pod %s", v1beta1.ElasticsearchContainerName, p.Name)
},
),
},
fixed,
).RunSequential(t)
}
8 changes: 4 additions & 4 deletions test/e2e/test/elasticsearch/steps_forced_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import (
"github.com/elastic/cloud-on-k8s/test/e2e/test"
)

// ForcedUpgradeTestSteps creates the initial cluster that is not expected to run, wait for condition to be reached,
// ForcedUpgradeTestSteps creates the initial cluster that is not expected to run, wait for conditions to be met,
// then mutates it to the fixed cluster, that is expected to become healthy.
func ForcedUpgradeTestSteps(k *test.K8sClient, initial Builder, condition test.Step, fixed Builder) test.StepList {
func ForcedUpgradeTestSteps(k *test.K8sClient, initial Builder, conditions []test.Step, fixed Builder) test.StepList {
return test.StepList{}.
// create the initial (failing) cluster
WithSteps(initial.InitTestSteps(k)).
WithSteps(initial.CreationTestSteps(k)).
// wait for condition to be met
WithStep(condition).
// wait for conditions to be met
WithSteps(conditions).
// apply the fixed Elasticsearch resource
WithSteps(fixed.UpgradeTestSteps(k)).
// ensure the cluster eventually becomes healthy
Expand Down