Skip to content

Commit

Permalink
Adapt upscale code to add only one master at a time and patch config …
Browse files Browse the repository at this point in the history
…accordingly
  • Loading branch information
sebgl committed Aug 29, 2019
1 parent 5e944f9 commit 4306cbf
Show file tree
Hide file tree
Showing 5 changed files with 353 additions and 133 deletions.
40 changes: 13 additions & 27 deletions pkg/controller/elasticsearch/driver/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package driver

import (
"github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1alpha1"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/keystore"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/reconciler"
esclient "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client"
Expand Down Expand Up @@ -42,13 +41,23 @@ func (d *defaultDriver) reconcileNodeSpecs(
return results.WithResult(defaultRequeue)
}

expectedResources, err := expectedResources(d.Client, d.ES, observedState, keystoreResources)
expectedResources, err := nodespec.BuildExpectedResources(d.ES, keystoreResources)
if err != nil {
return results.WithError(err)
}

esState := NewMemoizingESState(esClient)

// Phase 1: apply expected StatefulSets resources and scale up.
if err := HandleUpscaleAndSpecChanges(d.Client, d.ES, d.Scheme(), expectedResources, actualStatefulSets); err != nil {
upscaleCtx := upscaleCtx{
k8sClient: d.K8sClient(),
es: d.ES,
scheme: d.Scheme(),
observedState: observedState,
esState: esState,
upscaleStateBuilder: &upscaleStateBuilder{},
}
if err := HandleUpscaleAndSpecChanges(upscaleCtx, actualStatefulSets, expectedResources); err != nil {
return results.WithError(err)
}

Expand Down Expand Up @@ -96,7 +105,7 @@ func (d *defaultDriver) reconcileNodeSpecs(

// Phase 3: handle rolling upgrades.
// Control nodes restart (upgrade) by manually decrementing rollingUpdate.Partition.
rollingUpgradesRes := d.handleRollingUpgrades(esClient, actualStatefulSets)
rollingUpgradesRes := d.handleRollingUpgrades(esClient, esState, actualStatefulSets)
results.WithResults(rollingUpgradesRes)
if rollingUpgradesRes.HasError() {
return results
Expand All @@ -107,26 +116,3 @@ func (d *defaultDriver) reconcileNodeSpecs(
// - grow and shrink
return results
}

func expectedResources(
k8sClient k8s.Client,
es v1alpha1.Elasticsearch,
observedState observer.State,
keystoreResources *keystore.Resources,
) (nodespec.ResourcesList, error) {
resources, err := nodespec.BuildExpectedResources(es, keystoreResources)
if err != nil {
return nil, err
}

// patch configs to consider zen1 minimum master nodes
if err := zen1.SetupMinimumMasterNodesConfig(resources); err != nil {
return nil, err
}
// patch configs to consider zen2 initial master nodes
if err := zen2.SetupInitialMasterNodes(es, observedState, k8sClient, resources); err != nil {
return nil, err
}

return resources, nil
}
82 changes: 0 additions & 82 deletions pkg/controller/elasticsearch/driver/nodes_test.go

This file was deleted.

4 changes: 1 addition & 3 deletions pkg/controller/elasticsearch/driver/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@ import (

func (d *defaultDriver) handleRollingUpgrades(
esClient esclient.Client,
esState ESState,
statefulSets sset.StatefulSetList,
) *reconciler.Results {
results := &reconciler.Results{}

// We need an up-to-date ES state, but avoid requesting information we may not need.
esState := NewMemoizingESState(esClient)

// Maybe upgrade some of the nodes.
res := newRollingUpgrade(d, esClient, esState, statefulSets).run()
results.WithResults(res)
Expand Down
100 changes: 83 additions & 17 deletions pkg/controller/elasticsearch/driver/upscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,50 +11,116 @@ import (
"github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1alpha1"
"github.com/elastic/cloud-on-k8s/pkg/controller/common"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/nodespec"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/observer"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/settings"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/version/zen1"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/version/zen2"
"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"
)

type upscaleCtx struct {
k8sClient k8s.Client
es v1alpha1.Elasticsearch
scheme *runtime.Scheme
observedState observer.State
esState ESState
upscaleStateBuilder *upscaleStateBuilder
}

// HandleUpscaleAndSpecChanges reconciles expected NodeSpec resources.
// It does:
// - create any new StatefulSets
// - update existing StatefulSets specification, to be used for future pods rotation
// - upscale StatefulSet for which we expect more replicas
// - limit master node creation to one at a time
// It does not:
// - perform any StatefulSet downscale (left for downscale phase)
// - perform any pod upgrade (left for rolling upgrade phase)
func HandleUpscaleAndSpecChanges(
k8sClient k8s.Client,
es v1alpha1.Elasticsearch,
scheme *runtime.Scheme,
expectedResources nodespec.ResourcesList,
ctx upscaleCtx,
actualStatefulSets sset.StatefulSetList,
expectedResources nodespec.ResourcesList,
) error {
// TODO: there is a split brain possibility here if going from 1 to 3 masters or 3 to 7.
// We should add only one master node at a time for safety.
// See https://github.com/elastic/cloud-on-k8s/issues/1281.

for _, nodeSpecRes := range expectedResources {
// always reconcile config (will apply to new & recreated pods)
if err := settings.ReconcileConfig(k8sClient, es, nodeSpecRes.StatefulSet.Name, nodeSpecRes.Config); err != nil {
// adjust expected replicas to control nodes creation and deletion
adjusted, err := adjustResources(ctx, actualStatefulSets, expectedResources)
if err != nil {
return err
}
// reconcile all resources
for _, res := range adjusted {
if err := settings.ReconcileConfig(ctx.k8sClient, ctx.es, res.StatefulSet.Name, res.Config); err != nil {
return err
}
if _, err := common.ReconcileService(k8sClient, scheme, &nodeSpecRes.HeadlessService, &es); err != nil {
if _, err := common.ReconcileService(ctx.k8sClient, ctx.scheme, &res.HeadlessService, &ctx.es); err != nil {
return err
}
ssetToApply := *nodeSpecRes.StatefulSet.DeepCopy()
actual, alreadyExists := actualStatefulSets.GetByName(ssetToApply.Name)
if alreadyExists {
ssetToApply = adaptForExistingStatefulSet(actual, ssetToApply)
if err := sset.ReconcileStatefulSet(ctx.k8sClient, ctx.scheme, ctx.es, res.StatefulSet); err != nil {
return err
}
if err := sset.ReconcileStatefulSet(k8sClient, scheme, es, ssetToApply); err != nil {
}
return nil
}

func adjustResources(
ctx upscaleCtx,
actualStatefulSets sset.StatefulSetList,
expectedResources nodespec.ResourcesList,
) (nodespec.ResourcesList, error) {
adjustedResources := make(nodespec.ResourcesList, 0, len(expectedResources))
for _, nodeSpecRes := range expectedResources {
adjustedSset, err := adjustStatefulSetReplicas(ctx, actualStatefulSets, *nodeSpecRes.StatefulSet.DeepCopy())
if err != nil {
return nil, err
}
nodeSpecRes.StatefulSet = adjustedSset
adjustedResources = append(adjustedResources, nodeSpecRes)
}
// adapt resources configuration to match adjusted replicas
if err := adjustZenConfig(ctx.es, adjustedResources); err != nil {
return nil, err
}
return adjustedResources, nil
}

func adjustZenConfig(es v1alpha1.Elasticsearch, resources nodespec.ResourcesList) error {
// patch configs to consider zen1 minimum master nodes
if err := zen1.SetupMinimumMasterNodesConfig(resources); err != nil {
return err
}
// patch configs to consider zen2 initial master nodes if cluster is not bootstrapped yet
if !AnnotatedForBootstrap(es) {
if err := zen2.SetupInitialMasterNodes(resources); err != nil {
return err
}
}
return nil
}

func adjustStatefulSetReplicas(
ctx upscaleCtx,
actualStatefulSets sset.StatefulSetList,
expected appsv1.StatefulSet,
) (appsv1.StatefulSet, error) {
actual, alreadyExists := actualStatefulSets.GetByName(expected.Name)
if alreadyExists {
expected = adaptForExistingStatefulSet(actual, expected)
}
if alreadyExists && isReplicaIncrease(actual, expected) {
upscaleState, err := ctx.upscaleStateBuilder.InitOnce(ctx.k8sClient, ctx.es, ctx.esState)
if err != nil {
return appsv1.StatefulSet{}, err
}
expected = upscaleState.limitMasterNodesCreation(actualStatefulSets, expected)
}
return expected, nil
}

// isReplicaIncrease returns true if expected replicas are higher than actual replicas.
func isReplicaIncrease(actual appsv1.StatefulSet, expected appsv1.StatefulSet) bool {
return sset.GetReplicas(expected) > sset.GetReplicas(actual)
}

// adaptForExistingStatefulSet modifies ssetToApply to account for the existing StatefulSet.
// It avoids triggering downscales (done later), and makes sure new pods are created with the newest revision.
func adaptForExistingStatefulSet(actualSset appsv1.StatefulSet, ssetToApply appsv1.StatefulSet) appsv1.StatefulSet {
Expand Down
Loading

0 comments on commit 4306cbf

Please sign in to comment.