Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor downscales and add unit tests #1506

Merged
merged 10 commits into from
Aug 9, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions operators/pkg/controller/elasticsearch/client/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,11 @@ type ClusterState struct {
Version int `json:"version"`
MasterNode string `json:"master_node"`
Nodes map[string]ClusterStateNode `json:"nodes"`
RoutingTable struct {
Indices map[string]Shards `json:"indices"`
} `json:"routing_table"`
RoutingTable RoutingTable `json:"routing_table"`
}

type RoutingTable struct {
Indices map[string]Shards `json:"indices"`
}

// IsEmpty returns true if this is an empty struct without data.
Expand Down
285 changes: 197 additions & 88 deletions operators/pkg/controller/elasticsearch/driver/downscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
package driver

import (
appsv1 "k8s.io/api/apps/v1"

"github.com/elastic/cloud-on-k8s/operators/pkg/apis/elasticsearch/v1alpha1"
"github.com/elastic/cloud-on-k8s/operators/pkg/controller/common/reconciler"
esclient "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/client"
"github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/label"
Expand All @@ -14,122 +17,228 @@ import (
"github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/sset"
"github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/version/zen1"
"github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/version/zen2"
appsv1 "k8s.io/api/apps/v1"
"github.com/elastic/cloud-on-k8s/operators/pkg/utils/k8s"
)

func (d *defaultDriver) HandleDownscale(
// downscaleContext holds the context of this downscale, including clients and states,
// propagated from the main driver.
type downscaleContext struct {
// clients
k8sClient k8s.Client
esClient esclient.Client
// driver states
resourcesState reconcile.ResourcesState
observedState observer.State
reconcileState *reconcile.State
expectations *reconciler.Expectations
// ES cluster
es v1alpha1.Elasticsearch
}

// HandleDownscale attempts to downscale actual StatefulSets towards expected ones.
func HandleDownscale(
downscaleCtx downscaleContext,
expectedStatefulSets sset.StatefulSetList,
actualStatefulSets sset.StatefulSetList,
esClient esclient.Client,
resourcesState reconcile.ResourcesState,
observedState observer.State,
reconcileState *reconcile.State,
) *reconciler.Results {
results := &reconciler.Results{}

// compute the list of nodes leaving the cluster, from which
// data should be migrated away
leavingNodes := []string{}
// compute the list of StatefulSet downscales to perform
downscales := calculateDownscales(expectedStatefulSets, actualStatefulSets)
leavingNodes := leavingNodeNames(downscales)

// process each statefulset for downscale
for i, actual := range actualStatefulSets {
expected, shouldExist := expectedStatefulSets.GetByName(actual.Name)
targetReplicas := int32(0) // sset removal
if shouldExist { // sset downscale
targetReplicas = sset.Replicas(expected)
// migrate data away from nodes that should be removed
if err := scheduleDataMigrations(downscaleCtx.esClient, leavingNodes); err != nil {
return results.WithError(err)
}

for _, downscale := range downscales {
// attempt the StatefulSet downscale (may or may not remove nodes)
requeue, err := attemptDownscale(downscaleCtx, downscale, leavingNodes, actualStatefulSets)
if err != nil {
return results.WithError(err)
}
leaving, removalResult := d.scaleStatefulSetDown(actualStatefulSets, &actualStatefulSets[i], targetReplicas, esClient, resourcesState, observedState, reconcileState)
results.WithResults(removalResult)
if removalResult.HasError() {
return results
if requeue {
// retry downscaling this statefulset later
results.WithResult(defaultRequeue)
}
leavingNodes = append(leavingNodes, leaving...)
}

// migrate data away from nodes leaving the cluster
log.V(1).Info("Migrating data away from nodes", "nodes", leavingNodes)
if err := migration.MigrateData(esClient, leavingNodes); err != nil {
return results.WithError(err)
return results
}

// ssetDownscale helps with the downscale of a single StatefulSet.
// A StatefulSet removal (going from 0 to 0 replicas) is also considered as a Downscale here.
type ssetDownscale struct {
statefulSet appsv1.StatefulSet
initialReplicas int32
targetReplicas int32
}

// leavingNodeNames returns names of the nodes that are supposed to leave the Elasticsearch cluster
// for this StatefulSet. They are ordered by highest ordinal first;
func (d ssetDownscale) leavingNodeNames() []string {
if d.targetReplicas >= d.initialReplicas {
return nil
}
leavingNodes := make([]string, 0, d.initialReplicas-d.targetReplicas)
for i := d.initialReplicas - 1; i >= d.targetReplicas; i-- {
leavingNodes = append(leavingNodes, sset.PodName(d.statefulSet.Name, i))
}
return leavingNodes
}

return results
// isRemoval returns true if this downscale is a StatefulSet removal.
func (d ssetDownscale) isRemoval() bool {
// StatefulSet does not have any replica, and should not have one
return d.initialReplicas == 0 && d.targetReplicas == 0
}

// scaleStatefulSetDown scales the given StatefulSet down to targetReplicas, if possible.
// It returns the names of the nodes that will leave the cluster.
func (d *defaultDriver) scaleStatefulSetDown(
allStatefulSets sset.StatefulSetList,
ssetToScaleDown *appsv1.StatefulSet,
targetReplicas int32,
esClient esclient.Client,
resourcesState reconcile.ResourcesState,
observedState observer.State,
reconcileState *reconcile.State,
) ([]string, *reconciler.Results) {
results := &reconciler.Results{}
logger := log.WithValues("namespace", ssetToScaleDown.Namespace, "statefulset", ssetToScaleDown.Name)
// isReplicaDecrease returns true if this downscale corresponds to decreasing replicas.
func (d ssetDownscale) isReplicaDecrease() bool {
return d.targetReplicas < d.initialReplicas
}

// leavingNodeNames returns the names of all nodes that should leave the cluster (across StatefulSets).
func leavingNodeNames(downscales []ssetDownscale) []string {
leavingNodes := []string{}
for _, d := range downscales {
leavingNodes = append(leavingNodes, d.leavingNodeNames()...)
}
return leavingNodes
}

if sset.Replicas(*ssetToScaleDown) == 0 && targetReplicas == 0 {
// no replicas expected, StatefulSet can be safely deleted
logger.Info("Deleting statefulset", "namespace", ssetToScaleDown.Namespace, "name", ssetToScaleDown.Name)
if err := d.Client.Delete(ssetToScaleDown); err != nil {
return nil, results.WithError(err)
// calculateDownscales compares expected and actual StatefulSets to return a list of ssetDownscale.
// We also include StatefulSets removal (0 replicas) in those downscales.
func calculateDownscales(expectedStatefulSets sset.StatefulSetList, actualStatefulSets sset.StatefulSetList) []ssetDownscale {
downscales := []ssetDownscale{}
for _, actualSset := range actualStatefulSets {
actualReplicas := sset.Replicas(actualSset)
expectedSset, shouldExist := expectedStatefulSets.GetByName(actualSset.Name)
expectedReplicas := int32(0) // sset removal
if shouldExist { // sset downscale
expectedReplicas = sset.Replicas(expectedSset)
}
if expectedReplicas == 0 || // removal
expectedReplicas < actualReplicas { // downscale
downscales = append(downscales, ssetDownscale{
sebgl marked this conversation as resolved.
Show resolved Hide resolved
statefulSet: actualSset,
initialReplicas: actualReplicas,
targetReplicas: expectedReplicas,
})
}
}
// copy the current replicas, to be decremented with nodes to remove
initialReplicas := sset.Replicas(*ssetToScaleDown)
updatedReplicas := initialReplicas

// leaving nodes names can be built from StatefulSet name and ordinals
// nodes are ordered by highest ordinal first
var leavingNodes []string
for i := initialReplicas - 1; i > targetReplicas-1; i-- {
leavingNodes = append(leavingNodes, sset.PodName(ssetToScaleDown.Name, i))
return downscales
}

// scheduleDataMigrations requests Elasticsearch to migrate data away from leavingNodes.
// If leavingNodes is empty, it clears any existing settings.
func scheduleDataMigrations(esClient esclient.Client, leavingNodes []string) error {
if len(leavingNodes) != 0 {
log.V(1).Info("Migrating data away from nodes", "nodes", leavingNodes)
}
return migration.MigrateData(esClient, leavingNodes)
sebgl marked this conversation as resolved.
Show resolved Hide resolved
}

// TODO: don't remove last master/last data nodes?
// TODO: detect cases where data migration cannot happen since no nodes to host shards?
// attemptDownscale attempts to decrement the number of replicas of the given StatefulSet,
// or deletes the StatefulSet entirely if it should not contain any replica.
// Nodes whose data migration is not over will not be removed.
// A boolean is returned to indicate if a requeue should be scheduled if the entire downscale could not be performed.
func attemptDownscale(ctx downscaleContext, downscale ssetDownscale, allLeavingNodes []string, statefulSets sset.StatefulSetList) (bool, error) {
switch {
case downscale.isRemoval():
ssetLogger(downscale.statefulSet).Info("Deleting statefulset")
return false, ctx.k8sClient.Delete(&downscale.statefulSet)

case downscale.isReplicaDecrease():
// adjust the theoretical downscale to one we can safely perform
performable := calculatePerformableDownscale(ctx, downscale, allLeavingNodes)
if !performable.isReplicaDecrease() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice.

// no downscale can be performed for now, let's requeue
return true, nil
}
// do performable downscale, and requeue if needed
shouldRequeue := performable.targetReplicas != downscale.targetReplicas
return shouldRequeue, doDownscale(ctx, performable, statefulSets)

for _, node := range leavingNodes {
if migration.IsMigratingData(observedState, node, leavingNodes) {
// data migration not over yet: schedule a requeue
logger.V(1).Info("Data migration not over yet, skipping node deletion", "node", node)
reconcileState.UpdateElasticsearchMigrating(resourcesState, observedState)
results.WithResult(defaultRequeue)
default:
// nothing to do
return false, nil
}
}

// calculatePerformableDownscale updates the given downscale target replicas to account for nodes
// which cannot be safely deleted yet.
// It returns the updated downscale and a boolean indicating whether a requeue should be done.
func calculatePerformableDownscale(
ctx downscaleContext,
downscale ssetDownscale,
allLeavingNodes []string,
) ssetDownscale {
// TODO: only one master node downscale at a time

// create another downscale based on the provided one, for which we'll slowly decrease target replicas
performableDownscale := ssetDownscale{
statefulSet: downscale.statefulSet,
initialReplicas: downscale.initialReplicas,
targetReplicas: downscale.initialReplicas, // target set to initial
}
// iterate on all leaving nodes (ordered by highest ordinal first)
for _, node := range downscale.leavingNodeNames() {
if migration.IsMigratingData(ctx.observedState, node, allLeavingNodes) {
ssetLogger(downscale.statefulSet).V(1).Info("Data migration not over yet, skipping node deletion", "node", node)
ctx.reconcileState.UpdateElasticsearchMigrating(ctx.resourcesState, ctx.observedState)
// no need to check other nodes since we remove them in order and this one isn't ready anyway
break
return performableDownscale
}
// data migration over: allow pod to be removed
updatedReplicas--
performableDownscale.targetReplicas--
}
return performableDownscale
}

if updatedReplicas < initialReplicas {
// trigger deletion of nodes whose data migration is over
logger.Info("Scaling replicas down", "from", initialReplicas, "to", updatedReplicas)
ssetToScaleDown.Spec.Replicas = &updatedReplicas

if label.IsMasterNodeSet(*ssetToScaleDown) {
// Update Zen1 minimum master nodes API, accounting for the updated downscaled replicas.
_, err := zen1.UpdateMinimumMasterNodes(d.Client, d.ES, esClient, allStatefulSets, reconcileState)
if err != nil {
return nil, results.WithError(err)
}
// Update zen2 settings to exclude leaving master nodes from voting.
excludeNodes := make([]string, 0, initialReplicas-updatedReplicas)
for i := updatedReplicas; i < initialReplicas; i++ {
excludeNodes = append(excludeNodes, sset.PodName(ssetToScaleDown.Name, i))
}
if err := zen2.AddToVotingConfigExclusions(esClient, *ssetToScaleDown, excludeNodes); err != nil {
return nil, results.WithError(err)
}
}
// doDownscale schedules nodes removal for the given downscale, and updates zen settings accordingly.
func doDownscale(downscaleCtx downscaleContext, downscale ssetDownscale, actualStatefulSets sset.StatefulSetList) error {
ssetLogger(downscale.statefulSet).Info(
"Scaling replicas down",
"from", downscale.initialReplicas,
"to", downscale.targetReplicas,
)

if err := d.Client.Update(ssetToScaleDown); err != nil {
return nil, results.WithError(err)
}
// Expect the updated statefulset in the cache for next reconciliation.
d.Expectations.ExpectGeneration(ssetToScaleDown.ObjectMeta)
if err := updateZenSettingsForDownscale(downscaleCtx, downscale, actualStatefulSets); err != nil {
return err
}

downscale.statefulSet.Spec.Replicas = &downscale.targetReplicas
if err := downscaleCtx.k8sClient.Update(&downscale.statefulSet); err != nil {
return err
}

// Expect the updated statefulset in the cache for next reconciliation.
downscaleCtx.expectations.ExpectGeneration(downscale.statefulSet.ObjectMeta)

return nil
}

// updateZenSettingsForDownscale makes sure zen1 and zen2 settings are updated to account for nodes
// that will soon be removed.
func updateZenSettingsForDownscale(ctx downscaleContext, downscale ssetDownscale, actualStatefulSets sset.StatefulSetList) error {
if !label.IsMasterNodeSet(downscale.statefulSet) {
// nothing to do
return nil
}

// TODO: only update in case 2->1 masters.
// Update Zen1 minimum master nodes API, accounting for the updated downscaled replicas.
_, err := zen1.UpdateMinimumMasterNodes(ctx.k8sClient, ctx.es, ctx.esClient, actualStatefulSets, ctx.reconcileState)
if err != nil {
return err
}

// Update zen2 settings to exclude leaving master nodes from voting.
if err := zen2.AddToVotingConfigExclusions(ctx.esClient, downscale.statefulSet, downscale.leavingNodeNames()); err != nil {
return err
}

return leavingNodes, results
return nil
}
Loading