Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add only one master node at a time #1654

Merged
merged 7 commits into from
Sep 2, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions pkg/controller/elasticsearch/driver/bootstrap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package driver

import (
"github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1alpha1"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/observer"
"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"
)

const (
// ClusterUUIDAnnotationName used to store the cluster UUID as an annotation when cluster has been bootstrapped.
ClusterUUIDAnnotationName = "elasticsearch.k8s.elastic.co/cluster-uuid"
)

// AnnotatedForBootstrap returns true if the cluster has been annotated with the UUID already.
func AnnotatedForBootstrap(cluster v1alpha1.Elasticsearch) bool {
_, bootstrapped := cluster.Annotations[ClusterUUIDAnnotationName]
return bootstrapped
}

func ReconcileClusterUUID(c k8s.Client, cluster *v1alpha1.Elasticsearch, observedState observer.State) error {
if AnnotatedForBootstrap(*cluster) {
// already annotated, nothing to do.
return nil
}
if clusterIsBootstrapped(observedState) {
// cluster bootstrapped but not annotated yet
return annotateWithUUID(cluster, observedState, c)
}
// cluster not bootstrapped yet
return nil
}

// clusterIsBootstrapped returns true if the cluster has formed and has a UUID.
func clusterIsBootstrapped(observedState observer.State) bool {
return observedState.ClusterState != nil && len(observedState.ClusterState.ClusterUUID) > 0
}

// annotateWithUUID annotates the cluster with its UUID, to mark it as "bootstrapped".
func annotateWithUUID(cluster *v1alpha1.Elasticsearch, observedState observer.State, c k8s.Client) error {
log.Info("Annotating bootstrapped cluster with its UUID", "namespace", cluster.Namespace, "es_name", cluster.Name)
if cluster.Annotations == nil {
cluster.Annotations = make(map[string]string)
}
cluster.Annotations[ClusterUUIDAnnotationName] = observedState.ClusterState.ClusterUUID
return c.Update(cluster)
}
91 changes: 91 additions & 0 deletions pkg/controller/elasticsearch/driver/bootstrap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package driver

import (
"testing"

"github.com/stretchr/testify/require"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

"github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1alpha1"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/observer"
"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
)

var (
bootstrappedES = v1alpha1.Elasticsearch{ObjectMeta: metav1.ObjectMeta{Name: "cluster", Annotations: map[string]string{ClusterUUIDAnnotationName: "uuid"}}}
notBootstrappedES = v1alpha1.Elasticsearch{ObjectMeta: metav1.ObjectMeta{Name: "cluster"}}
)

func TestAnnotatedForBootstrap(t *testing.T) {
require.True(t, AnnotatedForBootstrap(bootstrappedES))
require.False(t, AnnotatedForBootstrap(notBootstrappedES))
}

func Test_annotateWithUUID(t *testing.T) {
require.NoError(t, v1alpha1.AddToScheme(scheme.Scheme))

notBootstrappedCopy := notBootstrappedES.DeepCopy()
observedState := observer.State{ClusterState: &client.ClusterState{ClusterUUID: "cluster-uuid"}}
k8sClient := k8s.WrapClient(fake.NewFakeClient(notBootstrappedCopy))

err := annotateWithUUID(notBootstrappedCopy, observedState, k8sClient)
require.NoError(t, err)
require.True(t, AnnotatedForBootstrap(*notBootstrappedCopy))

var retrieved v1alpha1.Elasticsearch
err = k8sClient.Get(k8s.ExtractNamespacedName(notBootstrappedCopy), &retrieved)
require.NoError(t, err)
require.True(t, AnnotatedForBootstrap(retrieved))

}

func TestReconcileClusterUUID(t *testing.T) {
require.NoError(t, v1alpha1.AddToScheme(scheme.Scheme))
tests := []struct {
name string
c k8s.Client
cluster *v1alpha1.Elasticsearch
observedState observer.State
wantCluster *v1alpha1.Elasticsearch
}{
{
name: "already annotated",
cluster: &bootstrappedES,
wantCluster: &bootstrappedES,
},
{
name: "not annotated, but not bootstrapped yet (cluster state empty)",
cluster: &notBootstrappedES,
observedState: observer.State{ClusterState: nil},
wantCluster: &notBootstrappedES,
},
{
name: "not annotated, but not bootstrapped yet (cluster UUID empty)",
cluster: &notBootstrappedES,
observedState: observer.State{ClusterState: &client.ClusterState{ClusterUUID: ""}},
wantCluster: &notBootstrappedES,
},
{
name: "not annotated, but bootstrapped",
c: k8s.WrapClient(fake.NewFakeClient(&notBootstrappedES)),
cluster: &notBootstrappedES,
observedState: observer.State{ClusterState: &client.ClusterState{ClusterUUID: "uuid"}},
wantCluster: &bootstrappedES,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := ReconcileClusterUUID(tt.c, tt.cluster, tt.observedState)
require.NoError(t, err)
require.Equal(t, tt.wantCluster, tt.cluster)
})
}
}
21 changes: 0 additions & 21 deletions pkg/controller/elasticsearch/driver/downscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@ func HandleDownscale(
) *reconciler.Results {
results := &reconciler.Results{}

canProceed, err := noOnGoingDeletion(downscaleCtx, actualStatefulSets)
if err != nil {
return results.WithError(err)
}
if !canProceed {
return results.WithResult(defaultRequeue)
}

// compute the list of StatefulSet downscales to perform
downscales := calculateDownscales(expectedStatefulSets, actualStatefulSets)
leavingNodes := leavingNodeNames(downscales)
Expand Down Expand Up @@ -64,19 +56,6 @@ func HandleDownscale(
return results
}

// noOnGoingDeletion returns true if some pods deletion or creation may still be in progress
func noOnGoingDeletion(downscaleCtx downscaleContext, actualStatefulSets sset.StatefulSetList) (bool, error) {
// Pods we have may not match replicas specified in the StatefulSets spec.
// This can happen if, for example, replicas were recently downscaled to remove a node,
// but the node isn't completely terminated yet, and may still be part of the cluster.
// Moving on with downscaling more nodes may lead to complications when dealing with
// Elasticsearch shards allocation excludes (should not be cleared if the ghost node isn't removed yet)
// or zen settings (must consider terminating masters that are still there).
// Let's retry once expected pods are there.
// PodReconciliationDone also matches any pod not created yet, for which we'll also requeue.
return actualStatefulSets.PodReconciliationDone(downscaleCtx.k8sClient, downscaleCtx.es)
}

// calculateDownscales compares expected and actual StatefulSets to return a list of ssetDownscale.
// We also include StatefulSets removal (0 replicas) in those downscales.
func calculateDownscales(expectedStatefulSets sset.StatefulSetList, actualStatefulSets sset.StatefulSetList) []ssetDownscale {
Expand Down
9 changes: 0 additions & 9 deletions pkg/controller/elasticsearch/driver/downscale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,6 @@ func TestHandleDownscale(t *testing.T) {
require.NoError(t, err)
require.Equal(t, expectedAfterDownscale, actual.Items)

// running the downscale again should requeue since some pods are not terminated yet
results = HandleDownscale(downscaleCtx, requestedStatefulSets, actual.Items)
require.False(t, results.HasError())
require.Equal(t, requeueResults, results)
// no StatefulSet should have been updated
err = k8sClient.List(&client.ListOptions{}, &actual)
require.NoError(t, err)
require.Equal(t, expectedAfterDownscale, actual.Items)

// simulate pods deletion that would be done by the StatefulSet controller
require.NoError(t, k8sClient.Delete(&podsSsetMaster3Replicas[2]))
require.NoError(t, k8sClient.Delete(&podsSsetData4Replicas[3]))
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/elasticsearch/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,12 @@ func (d *defaultDriver) Reconcile() *reconciler.Results {
return results.WithError(err)
}

// set an annotation with the ClusterUUID, if bootstrapped
if err := ReconcileClusterUUID(d.Client, &d.ES, observedState); err != nil {
return results.WithError(err)
}

// reconcile StatefulSets and nodes configuration
res = d.reconcileNodeSpecs(esReachable, esClient, d.ReconcileState, observedState, *resourcesState, keystoreResources)
if results.WithResults(res).HasError() {
return results
Expand Down
36 changes: 36 additions & 0 deletions pkg/controller/elasticsearch/driver/expectations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package driver

import (
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset"
)

func (d *defaultDriver) expectationsMet(actualStatefulSets sset.StatefulSetList) (bool, error) {
if !d.Expectations.GenerationExpected(actualStatefulSets.ObjectMetas()...) {
// Our cache of StatefulSets is out of date compared to previous reconciliation operations.
// Continuing with the reconciliation at this point may lead to:
// - errors on rejected sset updates (conflict since cached resource out of date): that's ok
// - calling ES orchestration settings (zen1/zen2/allocation excludes) with wrong assumptions: that's not ok
// Hence we choose to abort the reconciliation early: will run again later with an updated cache.
log.V(1).Info("StatefulSet cache out-of-date, re-queueing", "namespace", d.ES.Namespace, "es_name", d.ES.Name)
return false, nil
}

podsReconciled, err := actualStatefulSets.PodReconciliationDone(d.Client)
if err != nil {
return false, err
}
if !podsReconciled {
// Pods we have in the cache do not match StatefulSets we have in the cache.
// This can happen if some pods have been scheduled for creation/deletion/upgrade
// but the operation has not happened or been observed yet.
// Continuing with nodes reconciliation at this point would be dangerous, since
// we may update ES orchestration settings (zen1/zen2/allocation excludes) with
// wrong assumptions (especially on master-eligible and ES version mismatches).
return false, nil
}
return true, nil
}
61 changes: 61 additions & 0 deletions pkg/controller/elasticsearch/driver/expectations_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package driver

import (
"testing"

"sigs.k8s.io/controller-runtime/pkg/client/fake"

"github.com/elastic/cloud-on-k8s/pkg/controller/common"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/reconciler"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset"
"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"

"github.com/stretchr/testify/require"
)

func Test_defaultDriver_expectationsMet(t *testing.T) {
d := &defaultDriver{DefaultDriverParameters{
Expectations: reconciler.NewExpectations(),
Client: k8s.WrapClient(fake.NewFakeClient()),
}}

// no expectations set
met, err := d.expectationsMet(sset.StatefulSetList{})
require.NoError(t, err)
require.True(t, met)

// a sset generation is expected
statefulSet := sset.TestSset{Name: "sset"}.Build()
statefulSet.Generation = 123
d.Expectations.ExpectGeneration(statefulSet.ObjectMeta)
// but not met yet
statefulSet.Generation = 122
met, err = d.expectationsMet(sset.StatefulSetList{statefulSet})
require.NoError(t, err)
require.False(t, met)
// met now
statefulSet.Generation = 123
met, err = d.expectationsMet(sset.StatefulSetList{statefulSet})
require.NoError(t, err)
require.True(t, met)

// we expect some sset replicas to exist
// but corresponding pod does not exist
statefulSet.Spec.Replicas = common.Int32(1)
// expectations should not be met: we miss a pod
met, err = d.expectationsMet(sset.StatefulSetList{statefulSet})
require.NoError(t, err)
require.False(t, met)

// add the missing pod
pod := sset.TestPod{Name: "sset-0", StatefulSetName: statefulSet.Name}.Build()
d.Client = k8s.WrapClient(fake.NewFakeClient(&pod))
// expectations should be met
met, err = d.expectationsMet(sset.StatefulSetList{statefulSet})
require.NoError(t, err)
require.True(t, met)
}
53 changes: 19 additions & 34 deletions pkg/controller/elasticsearch/driver/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package driver

import (
"github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1alpha1"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/keystore"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/reconciler"
esclient "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client"
Expand Down Expand Up @@ -33,23 +32,32 @@ func (d *defaultDriver) reconcileNodeSpecs(
return results.WithError(err)
}

if !d.Expectations.GenerationExpected(actualStatefulSets.ObjectMetas()...) {
// Our cache of StatefulSets is out of date compared to previous reconciliation operations.
// Continuing with the reconciliation at this point may lead to:
// - errors on rejected sset updates (conflict since cached resource out of date): that's ok
// - calling ES orchestration settings (zen1/zen2/allocation excludes) with wrong assumptions: that's not ok
// Hence we choose to abort the reconciliation early: will run again later with an updated cache.
log.V(1).Info("StatefulSet cache out-of-date, re-queueing", "namespace", d.ES.Namespace, "es_name", d.ES.Name)
// check if actual StatefulSets and corresponding pods match our expectations before applying any change
ok, err := d.expectationsMet(actualStatefulSets)
if err != nil {
return results.WithError(err)
}
if !ok {
return results.WithResult(defaultRequeue)
}

expectedResources, err := expectedResources(d.Client, d.ES, observedState, keystoreResources)
expectedResources, err := nodespec.BuildExpectedResources(d.ES, keystoreResources)
if err != nil {
return results.WithError(err)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So after this state we may have updated some ssets, so we'll end up with sset update conflicts in next steps (downscale, rolling upgrade). Which is fine, but I'm wondering whether we should maybe either:

  • requeue here if the downscale step did any change
  • re-check expectations here to catch any change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may also recheck expectations before downscale and handleRollingUpgrades It would make it explicit and maybe less error prone when this code will be updated.


esState := NewMemoizingESState(esClient)

// Phase 1: apply expected StatefulSets resources and scale up.
if err := HandleUpscaleAndSpecChanges(d.Client, d.ES, d.Scheme(), expectedResources, actualStatefulSets); err != nil {
upscaleCtx := upscaleCtx{
k8sClient: d.K8sClient(),
es: d.ES,
scheme: d.Scheme(),
observedState: observedState,
esState: esState,
upscaleStateBuilder: &upscaleStateBuilder{},
}
if err := HandleUpscaleAndSpecChanges(upscaleCtx, actualStatefulSets, expectedResources); err != nil {
return results.WithError(err)
}

Expand Down Expand Up @@ -97,7 +105,7 @@ func (d *defaultDriver) reconcileNodeSpecs(

// Phase 3: handle rolling upgrades.
// Control nodes restart (upgrade) by manually decrementing rollingUpdate.Partition.
rollingUpgradesRes := d.handleRollingUpgrades(esClient, actualStatefulSets)
rollingUpgradesRes := d.handleRollingUpgrades(esClient, esState, actualStatefulSets)
results.WithResults(rollingUpgradesRes)
if rollingUpgradesRes.HasError() {
return results
Expand All @@ -108,26 +116,3 @@ func (d *defaultDriver) reconcileNodeSpecs(
// - grow and shrink
return results
}

func expectedResources(
k8sClient k8s.Client,
es v1alpha1.Elasticsearch,
observedState observer.State,
keystoreResources *keystore.Resources,
) (nodespec.ResourcesList, error) {
resources, err := nodespec.BuildExpectedResources(es, keystoreResources)
if err != nil {
return nil, err
}

// patch configs to consider zen1 minimum master nodes
if err := zen1.SetupMinimumMasterNodesConfig(resources); err != nil {
return nil, err
}
// patch configs to consider zen2 initial master nodes
if err := zen2.SetupInitialMasterNodes(es, observedState, k8sClient, resources); err != nil {
return nil, err
}

return resources, nil
}
Loading