Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor downscales and add unit tests #1506

Merged
merged 10 commits into from
Aug 9, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 31 additions & 22 deletions operators/pkg/controller/elasticsearch/driver/downscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package driver

import (
appsv1 "k8s.io/api/apps/v1"

"github.com/elastic/cloud-on-k8s/operators/pkg/apis/elasticsearch/v1alpha1"
"github.com/elastic/cloud-on-k8s/operators/pkg/controller/common/reconciler"
esclient "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/client"
Expand All @@ -16,7 +18,6 @@ import (
"github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/version/zen1"
"github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/version/zen2"
"github.com/elastic/cloud-on-k8s/operators/pkg/utils/k8s"
appsv1 "k8s.io/api/apps/v1"
)

// downscaleContext holds the context of this downscale, including clients and states,
Expand Down Expand Up @@ -67,32 +68,37 @@ func HandleDownscale(
}

// ssetDownscale helps with the downscale of a single StatefulSet.
// A StatefulSet removal (going from 0 to 0 replicas) is also considered as a Downscale here.
type ssetDownscale struct {
statefulSet appsv1.StatefulSet
initialReplicas int32
targetReplicas int32
}

// leavingNodeNames returns names of the nodes that are supposed to leave the Elasticsearch cluster
// for this StatefulSet.
// for this StatefulSet. They are ordered by highest ordinal first;
func (d ssetDownscale) leavingNodeNames() []string {
if d.targetReplicas >= d.initialReplicas {
return nil
}
leavingNodes := make([]string, 0, d.initialReplicas-d.targetReplicas)
// nodes are ordered by highest ordinal first
for i := d.initialReplicas - 1; i >= d.targetReplicas; i-- {
leavingNodes = append(leavingNodes, sset.PodName(d.statefulSet.Name, i))
}
return leavingNodes
}

// canRemove returns true if the StatefulSet can be safely removed (no replicas).
func (d ssetDownscale) canRemoveStatefulSet() bool {
// isRemoval returns true if this downscale is a StatefulSet removal.
func (d ssetDownscale) isRemoval() bool {
// StatefulSet does not have any replica, and should not have one
return d.initialReplicas == 0 && d.targetReplicas == 0
}

// isReplicaDecrease returns true if this downscale corresponds to decreasing replicas.
func (d ssetDownscale) isReplicaDecrease() bool {
return d.targetReplicas < d.initialReplicas
}

// leavingNodeNames returns the names of all nodes that should leave the cluster (across StatefulSets).
func leavingNodeNames(downscales []ssetDownscale) []string {
leavingNodes := []string{}
Expand All @@ -103,6 +109,7 @@ func leavingNodeNames(downscales []ssetDownscale) []string {
}

// calculateDownscales compares expected and actual StatefulSets to return a list of ssetDownscale.
// We also include StatefulSets removal (0 replicas) in those downscales.
func calculateDownscales(expectedStatefulSets sset.StatefulSetList, actualStatefulSets sset.StatefulSetList) []ssetDownscale {
downscales := []ssetDownscale{}
for _, actualSset := range actualStatefulSets {
Expand Down Expand Up @@ -138,25 +145,26 @@ func scheduleDataMigrations(esClient esclient.Client, leavingNodes []string) err
// Nodes whose data migration is not over will not be removed.
// A boolean is returned to indicate if a requeue should be scheduled if the entire downscale could not be performed.
func attemptDownscale(ctx downscaleContext, downscale ssetDownscale, allLeavingNodes []string, statefulSets sset.StatefulSetList) (bool, error) {
// TODO: only one master node downscale at a time
if downscale.canRemoveStatefulSet() {
switch {
case downscale.isRemoval():
ssetLogger(downscale.statefulSet).Info("Deleting statefulset")
if err := ctx.k8sClient.Delete(&downscale.statefulSet); err != nil {
return false, err
return false, ctx.k8sClient.Delete(&downscale.statefulSet)

case downscale.isReplicaDecrease():
// adjust the theoretical downscale to one we can safely perform
performable := calculatePerformableDownscale(ctx, downscale, allLeavingNodes)
if !performable.isReplicaDecrease() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice.

// no downscale can be performed for now, let's requeue
return true, nil
}
return false, nil
}
// do performable downscale, and requeue if needed
shouldRequeue := performable.targetReplicas != downscale.targetReplicas
return shouldRequeue, doDownscale(ctx, performable, statefulSets)

if downscale.targetReplicas >= downscale.initialReplicas {
default:
// nothing to do
return false, nil
}

// adjust the theoretical downscale to one we can safely perform
performable, shouldRequeue := calculatePerformableDownscale(ctx, downscale, allLeavingNodes)

// do the performable downscale
return shouldRequeue, doDownscale(ctx, performable, statefulSets)
}

// calculatePerformableDownscale updates the given downscale target replicas to account for nodes
Expand All @@ -166,7 +174,9 @@ func calculatePerformableDownscale(
ctx downscaleContext,
downscale ssetDownscale,
allLeavingNodes []string,
) (ssetDownscale, bool) {
) ssetDownscale {
// TODO: only one master node downscale at a time

// create another downscale based on the provided one, for which we'll slowly decrease target replicas
performableDownscale := ssetDownscale{
statefulSet: downscale.statefulSet,
Expand All @@ -179,13 +189,12 @@ func calculatePerformableDownscale(
ssetLogger(downscale.statefulSet).V(1).Info("Data migration not over yet, skipping node deletion", "node", node)
ctx.reconcileState.UpdateElasticsearchMigrating(ctx.resourcesState, ctx.observedState)
// no need to check other nodes since we remove them in order and this one isn't ready anyway
// make sure we requeue
return performableDownscale, true
return performableDownscale
}
// data migration over: allow pod to be removed
performableDownscale.targetReplicas--
}
return performableDownscale, false
return performableDownscale
}

// doDownscale schedules nodes removal for the given downscale, and updates zen settings accordingly.
Expand Down
19 changes: 6 additions & 13 deletions operators/pkg/controller/elasticsearch/driver/downscale_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package driver

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package driver

import (
"context"
"reflect"
Expand Down Expand Up @@ -423,10 +423,9 @@ func Test_calculatePerformableDownscale(t *testing.T) {
allLeavingNodes []string
}
tests := []struct {
name string
args args
want ssetDownscale
wantRequeue bool
name string
args args
want ssetDownscale
}{
{
name: "no downscale planned",
Expand All @@ -442,7 +441,6 @@ func Test_calculatePerformableDownscale(t *testing.T) {
initialReplicas: 3,
targetReplicas: 3,
},
wantRequeue: false,
},
{
name: "downscale possible from 3 to 1",
Expand All @@ -465,7 +463,6 @@ func Test_calculatePerformableDownscale(t *testing.T) {
initialReplicas: 3,
targetReplicas: 1,
},
wantRequeue: false,
},
{
name: "downscale not possible: data migration not ready",
Expand All @@ -489,18 +486,14 @@ func Test_calculatePerformableDownscale(t *testing.T) {
initialReplicas: 3,
targetReplicas: 3,
},
wantRequeue: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, gotRequeue := calculatePerformableDownscale(tt.args.ctx, tt.args.downscale, tt.args.allLeavingNodes)
got := calculatePerformableDownscale(tt.args.ctx, tt.args.downscale, tt.args.allLeavingNodes)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("calculatePerformableDownscale() got = %v, want %v", got, tt.want)
}
if gotRequeue != tt.wantRequeue {
t.Errorf("calculatePerformableDownscale() got1 = %v, want %v", gotRequeue, tt.wantRequeue)
}
})
}
}
Expand Down