Skip to content

Commit

Permalink
Add only one master node at a time
Browse files Browse the repository at this point in the history
To better match zen1 and zen2 requirements (especially zen 1),
we should only create one master node at a time.

This is achieved here by:

* slowly incrementing master StatefulSets replicas, as long as the
upscaleState allows us to do so
* inspecting the current nodes at upscaleState initialization. If the
cluster is not bootstrapped yet, any number of masters can be created.
Else, we allow master node creation only if there's not a master in the
process of being created already.
* because the above point can hardly tolerate an out-of-date cache, we
check pods vs. StatefulSet spec expectations beforehand.
* zen1 & zen2 configuration in elasticsearch.yml now needs to be patched
after adjusting the number of replicas

Some side-effects of this features:

* cluster bootstrap/UUID annotation is moved to its own file since not
only related to zen2
* downscale expectations checks can be removed, since checked at a
higher level
  • Loading branch information
sebgl committed Aug 28, 2019
1 parent ac13247 commit e17c3d6
Show file tree
Hide file tree
Showing 19 changed files with 582 additions and 503 deletions.
50 changes: 50 additions & 0 deletions pkg/controller/elasticsearch/driver/bootstrap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package driver

import (
"github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1alpha1"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/observer"
"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"
)

const (
// ClusterUUIDAnnotationName used to store the cluster UUID as an annotation when cluster has been bootstrapped.
ClusterUUIDAnnotationName = "elasticsearch.k8s.elastic.co/cluster-uuid"
)

// AnnotatedForBootstrap returns true if the cluster has been annotated with the UUID already.
func AnnotatedForBootstrap(cluster v1alpha1.Elasticsearch) bool {
_, bootstrapped := cluster.Annotations[ClusterUUIDAnnotationName]
return bootstrapped
}

func ReconcileClusterUUID(c k8s.Client, cluster *v1alpha1.Elasticsearch, observedState observer.State) error {
if AnnotatedForBootstrap(*cluster) {
// already annotated, nothing to do.
return nil
}
if clusterIsBootstrapped(observedState) {
// cluster bootstrapped but not annotated yet
return annotateWithUUID(cluster, observedState, c)
}
// cluster not bootstrapped yet
return nil
}

// clusterIsBootstrapped returns true if the cluster has formed and has a UUID.
func clusterIsBootstrapped(observedState observer.State) bool {
return observedState.ClusterState != nil && len(observedState.ClusterState.ClusterUUID) > 0
}

// annotateWithUUID annotates the cluster with its UUID, to mark it as "bootstrapped".
func annotateWithUUID(cluster *v1alpha1.Elasticsearch, observedState observer.State, c k8s.Client) error {
log.Info("Annotating bootstrapped cluster with its UUID", "namespace", cluster.Namespace, "es_name", cluster.Name)
if cluster.Annotations == nil {
cluster.Annotations = make(map[string]string)
}
cluster.Annotations[ClusterUUIDAnnotationName] = observedState.ClusterState.ClusterUUID
return c.Update(cluster)
}
21 changes: 0 additions & 21 deletions pkg/controller/elasticsearch/driver/downscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@ func HandleDownscale(
) *reconciler.Results {
results := &reconciler.Results{}

canProceed, err := noOnGoingDeletion(downscaleCtx, actualStatefulSets)
if err != nil {
return results.WithError(err)
}
if !canProceed {
return results.WithResult(defaultRequeue)
}

// compute the list of StatefulSet downscales to perform
downscales := calculateDownscales(expectedStatefulSets, actualStatefulSets)
leavingNodes := leavingNodeNames(downscales)
Expand Down Expand Up @@ -64,19 +56,6 @@ func HandleDownscale(
return results
}

// noOnGoingDeletion returns true if some pods deletion or creation may still be in progress
func noOnGoingDeletion(downscaleCtx downscaleContext, actualStatefulSets sset.StatefulSetList) (bool, error) {
// Pods we have may not match replicas specified in the StatefulSets spec.
// This can happen if, for example, replicas were recently downscaled to remove a node,
// but the node isn't completely terminated yet, and may still be part of the cluster.
// Moving on with downscaling more nodes may lead to complications when dealing with
// Elasticsearch shards allocation excludes (should not be cleared if the ghost node isn't removed yet)
// or zen settings (must consider terminating masters that are still there).
// Let's retry once expected pods are there.
// PodReconciliationDone also matches any pod not created yet, for which we'll also requeue.
return actualStatefulSets.PodReconciliationDone(downscaleCtx.k8sClient, downscaleCtx.es)
}

// calculateDownscales compares expected and actual StatefulSets to return a list of ssetDownscale.
// We also include StatefulSets removal (0 replicas) in those downscales.
func calculateDownscales(expectedStatefulSets sset.StatefulSetList, actualStatefulSets sset.StatefulSetList) []ssetDownscale {
Expand Down
9 changes: 0 additions & 9 deletions pkg/controller/elasticsearch/driver/downscale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,6 @@ func TestHandleDownscale(t *testing.T) {
require.NoError(t, err)
require.Equal(t, expectedAfterDownscale, actual.Items)

// running the downscale again should requeue since some pods are not terminated yet
results = HandleDownscale(downscaleCtx, requestedStatefulSets, actual.Items)
require.False(t, results.HasError())
require.Equal(t, requeueResults, results)
// no StatefulSet should have been updated
err = k8sClient.List(&client.ListOptions{}, &actual)
require.NoError(t, err)
require.Equal(t, expectedAfterDownscale, actual.Items)

// simulate pods deletion that would be done by the StatefulSet controller
require.NoError(t, k8sClient.Delete(&podsSsetMaster3Replicas[2]))
require.NoError(t, k8sClient.Delete(&podsSsetData4Replicas[3]))
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/elasticsearch/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,12 @@ func (d *defaultDriver) Reconcile() *reconciler.Results {
return results.WithError(err)
}

// set an annotation with the ClusterUUID, if bootstrapped
if err := ReconcileClusterUUID(d.Client, &d.ES, observedState); err != nil {
return results.WithError(err)
}

// reconcile StatefulSets and nodes configuration
res = d.reconcileNodeSpecs(esReachable, esClient, d.ReconcileState, observedState, *resourcesState, keystoreResources)
if results.WithResults(res).HasError() {
return results
Expand Down
68 changes: 40 additions & 28 deletions pkg/controller/elasticsearch/driver/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package driver

import (
"github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1alpha1"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/keystore"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/reconciler"
esclient "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client"
Expand Down Expand Up @@ -33,23 +32,32 @@ func (d *defaultDriver) reconcileNodeSpecs(
return results.WithError(err)
}

if !d.Expectations.GenerationExpected(actualStatefulSets.ObjectMetas()...) {
// Our cache of StatefulSets is out of date compared to previous reconciliation operations.
// Continuing with the reconciliation at this point may lead to:
// - errors on rejected sset updates (conflict since cached resource out of date): that's ok
// - calling ES orchestration settings (zen1/zen2/allocation excludes) with wrong assumptions: that's not ok
// Hence we choose to abort the reconciliation early: will run again later with an updated cache.
log.V(1).Info("StatefulSet cache out-of-date, re-queueing", "namespace", d.ES.Namespace, "es_name", d.ES.Name)
// check if actual StatefulSets and corresponding pods match our expectations before applying any change
ok, err := d.expectationsMet(actualStatefulSets)
if err != nil {
return results.WithError(err)
}
if !ok {
return results.WithResult(defaultRequeue)
}

expectedResources, err := expectedResources(d.Client, d.ES, observedState, keystoreResources)
expectedResources, err := nodespec.BuildExpectedResources(d.ES, keystoreResources)
if err != nil {
return results.WithError(err)
}

esState := NewMemoizingESState(esClient)

// Phase 1: apply expected StatefulSets resources and scale up.
if err := HandleUpscaleAndSpecChanges(d.Client, d.ES, d.Scheme(), expectedResources, actualStatefulSets); err != nil {
upscaleCtx := upscaleCtx{
k8sClient: d.K8sClient(),
es: d.ES,
scheme: d.Scheme(),
observedState: observedState,
esState: esState,
upscaleState: &upscaleStateBuilder{},
}
if err := HandleUpscaleAndSpecChanges(upscaleCtx, actualStatefulSets, expectedResources); err != nil {
return results.WithError(err)
}

Expand Down Expand Up @@ -97,7 +105,7 @@ func (d *defaultDriver) reconcileNodeSpecs(

// Phase 3: handle rolling upgrades.
// Control nodes restart (upgrade) by manually decrementing rollingUpdate.Partition.
rollingUpgradesRes := d.handleRollingUpgrades(esClient, actualStatefulSets)
rollingUpgradesRes := d.handleRollingUpgrades(esClient, esState, actualStatefulSets)
results.WithResults(rollingUpgradesRes)
if rollingUpgradesRes.HasError() {
return results
Expand All @@ -109,25 +117,29 @@ func (d *defaultDriver) reconcileNodeSpecs(
return results
}

func expectedResources(
k8sClient k8s.Client,
es v1alpha1.Elasticsearch,
observedState observer.State,
keystoreResources *keystore.Resources,
) (nodespec.ResourcesList, error) {
resources, err := nodespec.BuildExpectedResources(es, keystoreResources)
if err != nil {
return nil, err
func (d *defaultDriver) expectationsMet(actualStatefulSets sset.StatefulSetList) (bool, error) {
if !d.Expectations.GenerationExpected(actualStatefulSets.ObjectMetas()...) {
// Our cache of StatefulSets is out of date compared to previous reconciliation operations.
// Continuing with the reconciliation at this point may lead to:
// - errors on rejected sset updates (conflict since cached resource out of date): that's ok
// - calling ES orchestration settings (zen1/zen2/allocation excludes) with wrong assumptions: that's not ok
// Hence we choose to abort the reconciliation early: will run again later with an updated cache.
log.V(1).Info("StatefulSet cache out-of-date, re-queueing", "namespace", d.ES.Namespace, "es_name", d.ES.Name)
return false, nil
}

// patch configs to consider zen1 minimum master nodes
if err := zen1.SetupMinimumMasterNodesConfig(resources); err != nil {
return nil, err
podsReconciled, err := actualStatefulSets.PodReconciliationDone(d.Client)
if err != nil {
return false, err
}
// patch configs to consider zen2 initial master nodes
if err := zen2.SetupInitialMasterNodes(es, observedState, k8sClient, resources); err != nil {
return nil, err
if !podsReconciled {
// Pods we have in the cache do not match StatefulSets we have in the cache.
// This can happen if some pods have been scheduled for creation/deletion/upgrade
// but the operation has not happened or been observed yet.
// Continuing with nodes reconciliation at this point would be dangerous, since
// we may update ES orchestration settings (zen1/zen2/allocation excludes) with
// wrong assumptions (especially on master-eligible and ES version mismatches).
return false, nil
}

return resources, nil
return true, nil
}
82 changes: 0 additions & 82 deletions pkg/controller/elasticsearch/driver/nodes_test.go

This file was deleted.

8 changes: 3 additions & 5 deletions pkg/controller/elasticsearch/driver/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@ import (

func (d *defaultDriver) handleRollingUpgrades(
esClient esclient.Client,
esState ESState,
statefulSets sset.StatefulSetList,
) *reconciler.Results {
results := &reconciler.Results{}

// We need an up-to-date ES state, but avoid requesting information we may not need.
esState := NewMemoizingESState(esClient)

// Maybe upgrade some of the nodes.
res := newRollingUpgrade(d, esClient, esState, statefulSets).run()
results.WithResults(res)
Expand Down Expand Up @@ -280,11 +278,11 @@ func (d *defaultDriver) MaybeEnableShardsAllocation(
}

// Make sure all pods scheduled for upgrade have been upgraded.
scheduledUpgradesDone, err := sset.ScheduledUpgradesDone(d.Client, statefulSets)
done, err := statefulSets.PodReconciliationDone(d.Client)
if err != nil {
return results.WithError(err)
}
if !scheduledUpgradesDone {
if !done {
log.V(1).Info(
"Rolling upgrade not over yet, some pods don't have the updated revision, keeping shard allocations disabled",
"namespace", d.ES.Namespace,
Expand Down
Loading

0 comments on commit e17c3d6

Please sign in to comment.