Skip to content

Commit

Permalink
Handle StatefulSets scale up/down/replacement (#1218)
Browse files Browse the repository at this point in the history
Add support for scaling a StatefulSet up (just update sset replicas) or
down (migrate data away then remove nodes when ready). Which also adds
support for renaming a StatefulSet, by creating the new sset, then
slowly remove the existing one with data migration.

It ignores any zen1/zen2 consideration for now (works fine with zen2 in most
cases).
It ignores any change budget consideration for now.
Unit tests and E2E tests are missing, but a refactoring is to be
expected to handle zen2/zen2 and the changeBudget. I'd prefer waiting
for this refactoring to happen before dealing with large unit testing.
Consider this as still work in progress.

The PR also modifies the way pods and ssets are watched by the Elasticsearch controller.
  • Loading branch information
sebgl authored Jul 10, 2019
1 parent d495796 commit a15245a
Show file tree
Hide file tree
Showing 13 changed files with 301 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ spec:
name:
description: Name is a logical name for this set of nodes. Used
as a part of the managed Elasticsearch node.name setting.
maxLength: 12
maxLength: 19
pattern: '[a-zA-Z0-9-]+'
type: string
nodeCount:
Expand Down
2 changes: 1 addition & 1 deletion operators/config/samples/apm/apm_es_kibana.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ metadata:
spec:
version: "7.1.0"
nodes:
- name: all
- name: default
nodeCount: 3
---
apiVersion: apm.k8s.elastic.co/v1alpha1
Expand Down
2 changes: 1 addition & 1 deletion operators/config/samples/elasticsearch/elasticsearch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ metadata:
spec:
version: "7.1.0"
nodes:
- name: all
- name: default
config:
# most Elasticsearch configuration parameters are possible to set, e.g:
node.attr.attr_name: attr_value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ metadata:
spec:
version: "7.1.0"
nodes:
- name: all
- name: default
nodeCount: 3
volumeClaimTemplates:
- metadata:
Expand Down
2 changes: 1 addition & 1 deletion operators/config/samples/kibana/kibana_es.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ metadata:
spec:
version: "7.1.0"
nodes:
- name: all
- name: default
nodeCount: 1
---
apiVersion: kibana.k8s.elastic.co/v1alpha1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (es ElasticsearchSpec) NodeCount() int32 {
type NodeSpec struct {
// Name is a logical name for this set of nodes. Used as a part of the managed Elasticsearch node.name setting.
// +kubebuilder:validation:Pattern=[a-zA-Z0-9-]+
// +kubebuilder:validation:MaxLength=12
// +kubebuilder:validation:MaxLength=19
Name string `json:"name"`

// Config represents Elasticsearch configuration.
Expand Down
229 changes: 167 additions & 62 deletions operators/pkg/controller/elasticsearch/driver/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@ import (
"crypto/x509"
"fmt"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
controller "sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/migration"
esvolume "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/volume"

"github.com/elastic/cloud-on-k8s/operators/pkg/apis/elasticsearch/v1alpha1"
"github.com/elastic/cloud-on-k8s/operators/pkg/controller/common"
"github.com/elastic/cloud-on-k8s/operators/pkg/controller/common/events"
Expand All @@ -39,7 +43,6 @@ import (
"github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/user"
esversion "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/version"
"github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/version/version6"
esvolume "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/volume"
"github.com/elastic/cloud-on-k8s/operators/pkg/utils/k8s"
)

Expand Down Expand Up @@ -212,7 +215,6 @@ func (d *defaultDriver) Reconcile(
return results.WithError(err)
}

namespacedName := k8s.ExtractNamespacedName(&es)
//
//// There might be some ongoing creations and deletions our k8s client cache
//// hasn't seen yet. In such case, requeue until we are in-sync.
Expand Down Expand Up @@ -323,68 +325,11 @@ func (d *defaultDriver) Reconcile(
)
}

actualStatefulSets, err := sset.RetrieveActualStatefulSets(d.Client, namespacedName)
if err != nil {
return results.WithError(err)
}

nodeSpecResources, err := nodespec.BuildExpectedResources(es, podTemplateSpecBuilder)
if err != nil {
return results.WithError(err)
}

// TODO: handle zen2 initial master nodes more cleanly
// should be empty once cluster is bootstraped
var initialMasters []string
// TODO: refactor/move
for _, res := range nodeSpecResources {
cfg, err := res.Config.Unpack()
if err != nil {
return results.WithError(err)
}
if cfg.Node.Master {
for i := 0; i < int(*res.StatefulSet.Spec.Replicas); i++ {
initialMasters = append(initialMasters, fmt.Sprintf("%s-%d", res.StatefulSet.Name, i))
}
}
}
for i := range nodeSpecResources {
if err := nodeSpecResources[i].Config.SetStrings(settings.ClusterInitialMasterNodes, initialMasters...); err != nil {
return results.WithError(err)
}
}

// create or update all expected ssets
// TODO: safe upgrades
for _, nodeSpec := range nodeSpecResources {
if err := settings.ReconcileConfig(d.Client, es, nodeSpec.StatefulSet.Name, nodeSpec.Config); err != nil {
return results.WithError(err)
}
if _, err := common.ReconcileService(d.Client, d.Scheme, &nodeSpec.HeadlessService, &es); err != nil {
return results.WithError(err)
}
if err := sset.ReconcileStatefulSet(d.Client, d.Scheme, es, nodeSpec.StatefulSet); err != nil {
return results.WithError(err)
}
}

// delete all unexpected ssets
for _, actual := range actualStatefulSets {
if _, shouldExist := nodeSpecResources.StatefulSets().GetByName(actual.Name); !shouldExist {
// TODO: safe node removal
if err := d.Client.Delete(&actual); err != nil {
return results.WithError(err)
}
}
res = d.reconcileNodeSpecs(es, podTemplateSpecBuilder, esClient, observedState)
if results.WithResults(res).HasError() {
return results
}

// TODO:
// - safe sset replacement
// - safe node removal (data migration)
// - safe node upgrade (rollingUpdate.Partition + shards allocation)
// - change budget
// - zen1, zen2

//
//// Call Zen1 setting updater before new masters are created to ensure that they immediately start with the
//// correct value for minimum_master_nodes.
Expand Down Expand Up @@ -568,6 +513,166 @@ func removePodFromList(pods []corev1.Pod, pod corev1.Pod) []corev1.Pod {
return pods
}

func (d *defaultDriver) reconcileNodeSpecs(
es v1alpha1.Elasticsearch,
podSpecBuilder esversion.PodTemplateSpecBuilder,
esClient esclient.Client,
observedState observer.State,
) *reconciler.Results {
results := &reconciler.Results{}

actualStatefulSets, err := sset.RetrieveActualStatefulSets(d.Client, k8s.ExtractNamespacedName(&es))
if err != nil {
return results.WithError(err)
}

nodeSpecResources, err := nodespec.BuildExpectedResources(es, podSpecBuilder)
if err != nil {
return results.WithError(err)
}

// TODO: handle zen2 initial master nodes more cleanly
// should be empty once cluster is bootstraped
var initialMasters []string
// TODO: refactor/move
for _, res := range nodeSpecResources {
cfg, err := res.Config.Unpack()
if err != nil {
return results.WithError(err)
}
if cfg.Node.Master {
for i := 0; i < int(*res.StatefulSet.Spec.Replicas); i++ {
initialMasters = append(initialMasters, fmt.Sprintf("%s-%d", res.StatefulSet.Name, i))
}
}
}
for i := range nodeSpecResources {
if err := nodeSpecResources[i].Config.SetStrings(settings.ClusterInitialMasterNodes, initialMasters...); err != nil {
return results.WithError(err)
}
}

// Phase 1: apply expected StatefulSets resources, but don't scale down.
// The goal is to:
// 1. scale sset up (eg. go from 3 to 5 replicas).
// 2. apply configuration changes on the sset resource, to be used for future pods creation/recreation,
// but do not rotate pods yet.
// 3. do **not** apply replicas scale down, otherwise nodes would be deleted before
// we handle a clean deletion.
for _, nodeSpecRes := range nodeSpecResources {
// always reconcile config (will apply to new & recreated pods)
if err := settings.ReconcileConfig(d.Client, es, nodeSpecRes.StatefulSet.Name, nodeSpecRes.Config); err != nil {
return results.WithError(err)
}
if _, err := common.ReconcileService(d.Client, d.Scheme, &nodeSpecRes.HeadlessService, &es); err != nil {
return results.WithError(err)
}
ssetToApply := *nodeSpecRes.StatefulSet.DeepCopy()
actual, exists := actualStatefulSets.GetByName(ssetToApply.Name)
if exists && sset.Replicas(ssetToApply) < sset.Replicas(actual) {
// sset needs to be scaled down
// update the sset to use the new spec but don't scale replicas down for now
ssetToApply.Spec.Replicas = actual.Spec.Replicas
}
if err := sset.ReconcileStatefulSet(d.Client, d.Scheme, es, ssetToApply); err != nil {
return results.WithError(err)
}
}

// Phase 2: handle sset scale down.
// We want to safely remove nodes from the cluster, either because the sset requires less replicas,
// or because it should be removed entirely.
for i, actual := range actualStatefulSets {
expected, shouldExist := nodeSpecResources.StatefulSets().GetByName(actual.Name)
switch {
// stateful set removal
case !shouldExist:
target := int32(0)
removalResult := d.scaleStatefulSetDown(&actualStatefulSets[i], target, esClient, observedState)
results.WithResults(removalResult)
if removalResult.HasError() {
return results
}
// stateful set downscale
case actual.Spec.Replicas != nil && sset.Replicas(expected) < sset.Replicas(actual):
target := sset.Replicas(expected)
downscaleResult := d.scaleStatefulSetDown(&actualStatefulSets[i], target, esClient, observedState)
if downscaleResult.HasError() {
return results
}
}
}

// TODO:
// - safe node upgrade (rollingUpdate.Partition + shards allocation)
// - change budget
// - zen1, zen2
return results
}

func (d *defaultDriver) scaleStatefulSetDown(
statefulSet *appsv1.StatefulSet,
targetReplicas int32,
esClient esclient.Client,
observedState observer.State,
) *reconciler.Results {
results := &reconciler.Results{}
logger := log.WithValues("statefulset", k8s.ExtractNamespacedName(statefulSet))

if sset.Replicas(*statefulSet) == 0 && targetReplicas == 0 {
// we don't expect any new replicas in this statefulset, remove it
logger.Info("Deleting statefulset")
if err := d.Client.Delete(statefulSet); err != nil {
return results.WithError(err)
}
}
// copy the current replicas, to be decremented with nodes to remove
initialReplicas := sset.Replicas(*statefulSet)
updatedReplicas := initialReplicas

// leaving nodes names can be built from StatefulSet name and ordinals
// nodes are ordered by highest ordinal first
var leavingNodes []string
for i := initialReplicas - 1; i > targetReplicas-1; i-- {
leavingNodes = append(leavingNodes, sset.PodName(statefulSet.Name, int(i)))
}

// TODO: don't remove last master/last data nodes?
// TODO: detect cases where data migration cannot happen since no nodes to host shards?

// migrate data away from these nodes before removing them
logger.V(1).Info("Migrating data away from nodes", "nodes", leavingNodes)
if err := migration.MigrateData(esClient, leavingNodes); err != nil {
return results.WithError(err)
}

for _, node := range leavingNodes {
if migration.IsMigratingData(observedState, node, leavingNodes) {
// data migration not over yet: schedule a requeue
logger.V(1).Info("Data migration not over yet, skipping node deletion", "node", node)
results.WithResult(defaultRequeue)
// no need to check other nodes since we remove them in order and this one isn't ready anyway
break
}
// data migration over: allow pod to be removed
updatedReplicas--
}

if updatedReplicas != initialReplicas {
// update cluster coordination settings to account for nodes deletion
// TODO: update zen1/zen2

// trigger deletion of nodes whose data migration is over
logger.V(1).Info("Scaling replicas down", "from", initialReplicas, "to", updatedReplicas)
statefulSet.Spec.Replicas = &updatedReplicas
if err := d.Client.Update(statefulSet); err != nil {
return results.WithError(err)
}
}

return nil
}

//
//// calculateChanges calculates the changes we'd need to perform to go from the current cluster configuration to the
//// desired one.
Expand Down
Loading

0 comments on commit a15245a

Please sign in to comment.