Skip to content

Commit

Permalink
wip - add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sebgl committed Aug 29, 2019
1 parent e17c3d6 commit 9894f81
Show file tree
Hide file tree
Showing 8 changed files with 542 additions and 61 deletions.
36 changes: 36 additions & 0 deletions pkg/controller/elasticsearch/driver/expectations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package driver

import (
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset"
)

func (d *defaultDriver) expectationsMet(actualStatefulSets sset.StatefulSetList) (bool, error) {
if !d.Expectations.GenerationExpected(actualStatefulSets.ObjectMetas()...) {
// Our cache of StatefulSets is out of date compared to previous reconciliation operations.
// Continuing with the reconciliation at this point may lead to:
// - errors on rejected sset updates (conflict since cached resource out of date): that's ok
// - calling ES orchestration settings (zen1/zen2/allocation excludes) with wrong assumptions: that's not ok
// Hence we choose to abort the reconciliation early: will run again later with an updated cache.
log.V(1).Info("StatefulSet cache out-of-date, re-queueing", "namespace", d.ES.Namespace, "es_name", d.ES.Name)
return false, nil
}

podsReconciled, err := actualStatefulSets.PodReconciliationDone(d.Client)
if err != nil {
return false, err
}
if !podsReconciled {
// Pods we have in the cache do not match StatefulSets we have in the cache.
// This can happen if some pods have been scheduled for creation/deletion/upgrade
// but the operation has not happened or been observed yet.
// Continuing with nodes reconciliation at this point would be dangerous, since
// we may update ES orchestration settings (zen1/zen2/allocation excludes) with
// wrong assumptions (especially on master-eligible and ES version mismatches).
return false, nil
}
return true, nil
}
61 changes: 61 additions & 0 deletions pkg/controller/elasticsearch/driver/expectations_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package driver

import (
"testing"

"sigs.k8s.io/controller-runtime/pkg/client/fake"

"github.com/elastic/cloud-on-k8s/pkg/controller/common"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/reconciler"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset"
"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"

"github.com/stretchr/testify/require"
)

func Test_defaultDriver_expectationsMet(t *testing.T) {
d := &defaultDriver{DefaultDriverParameters{
Expectations: reconciler.NewExpectations(),
Client: k8s.WrapClient(fake.NewFakeClient()),
}}

// no expectations set
met, err := d.expectationsMet(sset.StatefulSetList{})
require.NoError(t, err)
require.True(t, met)

// a sset generation is expected
statefulSet := sset.TestSset{Name: "sset"}.Build()
statefulSet.Generation = 123
d.Expectations.ExpectGeneration(statefulSet.ObjectMeta)
// but not met yet
statefulSet.Generation = 122
met, err = d.expectationsMet(sset.StatefulSetList{statefulSet})
require.NoError(t, err)
require.False(t, met)
// met now
statefulSet.Generation = 123
met, err = d.expectationsMet(sset.StatefulSetList{statefulSet})
require.NoError(t, err)
require.True(t, met)

// we expect some sset replicas to exist
// but corresponding pod does not exist
statefulSet.Spec.Replicas = common.Int32(1)
// expectations should not be met: we miss a pod
met, err = d.expectationsMet(sset.StatefulSetList{statefulSet})
require.NoError(t, err)
require.False(t, met)

// add the missing pod
pod := sset.TestPod{Name: "sset-0", StatefulSetName: statefulSet.Name}.Build()
d.Client = k8s.WrapClient(fake.NewFakeClient(&pod))
// expectations should be met
met, err = d.expectationsMet(sset.StatefulSetList{statefulSet})
require.NoError(t, err)
require.True(t, met)
}
39 changes: 6 additions & 33 deletions pkg/controller/elasticsearch/driver/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ func (d *defaultDriver) reconcileNodeSpecs(

// Phase 1: apply expected StatefulSets resources and scale up.
upscaleCtx := upscaleCtx{
k8sClient: d.K8sClient(),
es: d.ES,
scheme: d.Scheme(),
observedState: observedState,
esState: esState,
upscaleState: &upscaleStateBuilder{},
k8sClient: d.K8sClient(),
es: d.ES,
scheme: d.Scheme(),
observedState: observedState,
esState: esState,
upscaleStateBuilder: &upscaleStateBuilder{},
}
if err := HandleUpscaleAndSpecChanges(upscaleCtx, actualStatefulSets, expectedResources); err != nil {
return results.WithError(err)
Expand Down Expand Up @@ -116,30 +116,3 @@ func (d *defaultDriver) reconcileNodeSpecs(
// - grow and shrink
return results
}

func (d *defaultDriver) expectationsMet(actualStatefulSets sset.StatefulSetList) (bool, error) {
if !d.Expectations.GenerationExpected(actualStatefulSets.ObjectMetas()...) {
// Our cache of StatefulSets is out of date compared to previous reconciliation operations.
// Continuing with the reconciliation at this point may lead to:
// - errors on rejected sset updates (conflict since cached resource out of date): that's ok
// - calling ES orchestration settings (zen1/zen2/allocation excludes) with wrong assumptions: that's not ok
// Hence we choose to abort the reconciliation early: will run again later with an updated cache.
log.V(1).Info("StatefulSet cache out-of-date, re-queueing", "namespace", d.ES.Namespace, "es_name", d.ES.Name)
return false, nil
}

podsReconciled, err := actualStatefulSets.PodReconciliationDone(d.Client)
if err != nil {
return false, err
}
if !podsReconciled {
// Pods we have in the cache do not match StatefulSets we have in the cache.
// This can happen if some pods have been scheduled for creation/deletion/upgrade
// but the operation has not happened or been observed yet.
// Continuing with nodes reconciliation at this point would be dangerous, since
// we may update ES orchestration settings (zen1/zen2/allocation excludes) with
// wrong assumptions (especially on master-eligible and ES version mismatches).
return false, nil
}
return true, nil
}
26 changes: 13 additions & 13 deletions pkg/controller/elasticsearch/driver/upscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import (
)

type upscaleCtx struct {
k8sClient k8s.Client
es v1alpha1.Elasticsearch
scheme *runtime.Scheme
observedState observer.State
esState ESState
upscaleState *upscaleStateBuilder
k8sClient k8s.Client
es v1alpha1.Elasticsearch
scheme *runtime.Scheme
observedState observer.State
esState ESState
upscaleStateBuilder *upscaleStateBuilder
}

// HandleUpscaleAndSpecChanges reconciles expected NodeSpec resources.
Expand All @@ -43,7 +43,7 @@ func HandleUpscaleAndSpecChanges(
expectedResources nodespec.ResourcesList,
) error {
// adjust expected replicas to control nodes creation and deletion
adjusted, err := adjustReplicas(ctx, actualStatefulSets, expectedResources)
adjusted, err := adjustResources(ctx, actualStatefulSets, expectedResources)
if err != nil {
return err
}
Expand All @@ -62,34 +62,34 @@ func HandleUpscaleAndSpecChanges(
return nil
}

func adjustReplicas(
func adjustResources(
ctx upscaleCtx,
actualStatefulSets sset.StatefulSetList,
expectedResources nodespec.ResourcesList,
) (nodespec.ResourcesList, error) {
adjustedResources := make(nodespec.ResourcesList, 0, len(expectedResources))
for _, nodeSpecRes := range expectedResources {
adjustedSset, err := adjustStatefulSetReplicas(ctx, actualStatefulSets, nodeSpecRes.StatefulSet)
adjustedSset, err := adjustStatefulSetReplicas(ctx, actualStatefulSets, *nodeSpecRes.StatefulSet.DeepCopy())
if err != nil {
return nil, err
}
nodeSpecRes.StatefulSet = adjustedSset
adjustedResources = append(adjustedResources, nodeSpecRes)
}
// adapt resources configuration to match adjusted replicas
if err := adjustZenConfig(ctx, adjustedResources); err != nil {
if err := adjustZenConfig(ctx.es, adjustedResources); err != nil {
return nil, err
}
return adjustedResources, nil
}

func adjustZenConfig(ctx upscaleCtx, resources nodespec.ResourcesList) error {
func adjustZenConfig(es v1alpha1.Elasticsearch, resources nodespec.ResourcesList) error {
// patch configs to consider zen1 minimum master nodes
if err := zen1.SetupMinimumMasterNodesConfig(resources); err != nil {
return err
}
// patch configs to consider zen2 initial master nodes if cluster is not bootstrapped yet
if !AnnotatedForBootstrap(ctx.es) {
if !AnnotatedForBootstrap(es) {
if err := zen2.SetupInitialMasterNodes(resources); err != nil {
return err
}
Expand All @@ -107,7 +107,7 @@ func adjustStatefulSetReplicas(
expected = adaptForExistingStatefulSet(actual, expected)
}
if alreadyExists && isReplicaIncrease(actual, expected) {
upscaleState, err := ctx.upscaleState.InitOnce(ctx.k8sClient, ctx.es, ctx.esState)
upscaleState, err := ctx.upscaleStateBuilder.InitOnce(ctx.k8sClient, ctx.es, ctx.esState)
if err != nil {
return appsv1.StatefulSet{}, err
}
Expand Down
26 changes: 17 additions & 9 deletions pkg/controller/elasticsearch/driver/upscale_state.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package driver

import (
Expand Down Expand Up @@ -64,20 +68,24 @@ func isMasterNodeJoining(pod corev1.Pod, esState ESState) (bool, error) {
if pod.Status.Phase == corev1.PodPending {
return true, nil
}

// - Running but not Ready (ES process still starting)
if pod.Status.Phase == corev1.PodRunning && !k8s.IsPodReady(pod) {
return true, nil
}

// - Running & Ready but not part of the cluster
// This does a synchronous request to Elasticsearch.
// Relying instead on a previous (out of date) observed ES state would risk a mismatch
// if a node was removed then re-added into the cluster.
inCluster, err := esState.NodesInCluster([]string{pod.Name})
if err != nil {
return false, err
}
if !inCluster {
return true, nil
if pod.Status.Phase == corev1.PodRunning && k8s.IsPodReady(pod) {
// This does a synchronous request to Elasticsearch.
// Relying instead on a previous (out of date) observed ES state would risk a mismatch
// if a node was removed then re-added into the cluster.
inCluster, err := esState.NodesInCluster([]string{pod.Name})
if err != nil {
return false, err
}
if !inCluster {
return true, nil
}
}

// Otherwise, consider the pod is not in the process of joining the cluster.
Expand Down
Loading

0 comments on commit 9894f81

Please sign in to comment.