Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure there is no ongoing Pod deletion before downscaling #1534

Merged
merged 6 commits into from
Aug 12, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions operators/pkg/controller/elasticsearch/driver/downscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ func HandleDownscale(
) *reconciler.Results {
results := &reconciler.Results{}

canProceed, err := noOnGoingDeletion(downscaleCtx, actualStatefulSets)
if err != nil {
return results.WithError(err)
}
if !canProceed {
return results.WithResult(defaultRequeue)
}

// compute the list of StatefulSet downscales to perform
downscales := calculateDownscales(expectedStatefulSets, actualStatefulSets)
leavingNodes := leavingNodeNames(downscales)
Expand All @@ -67,6 +75,19 @@ func HandleDownscale(
return results
}

// noOnGoingDeletion returns true if some pods deletion or creation may still be in progress
func noOnGoingDeletion(downscaleCtx downscaleContext, actualStatefulSets sset.StatefulSetList) (bool, error) {
// Pods we have may not match replicas specified in the StatefulSets spec.
// This can happen if, for example, replicas were recently downscaled to remove a node,
// but the node isn't completely terminated yet, and may still be part of the cluster.
// Moving on with downscaling more nodes may lead to complications when dealing with
// Elasticsearch shards allocation excludes (should not be cleared if the ghost node isn't removed yet)
// or zen settings (must consider terminating masters that are still there).
// Let's retry once expected pods are there.
// MatchActualPods also matches any pod not created yet, for which we'll also requeue.
return actualStatefulSets.MatchActualPods(downscaleCtx.k8sClient, downscaleCtx.es)
}
thbkrkr marked this conversation as resolved.
Show resolved Hide resolved

// ssetDownscale helps with the downscale of a single StatefulSet.
// A StatefulSet removal (going from 0 to 0 replicas) is also considered as a Downscale here.
type ssetDownscale struct {
Expand Down
87 changes: 82 additions & 5 deletions operators/pkg/controller/elasticsearch/driver/downscale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -29,8 +30,60 @@ import (

// Sample StatefulSets to use in tests
var (
sset3Replicas = nodespec.CreateTestSset("sset3Replicas", "7.2.0", 3, true, true)
sset4Replicas = nodespec.CreateTestSset("sset4Replicas", "7.2.0", 4, true, true)
sset3Replicas = nodespec.CreateTestSset("sset3Replicas", "7.2.0", 3, true, true)
podsSset3Replicas = []corev1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: sset3Replicas.Namespace,
Name: sset.PodName(sset3Replicas.Name, 0),
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: sset3Replicas.Namespace,
Name: sset.PodName(sset3Replicas.Name, 1),
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: sset3Replicas.Namespace,
Name: sset.PodName(sset3Replicas.Name, 2),
},
},
}
sset4Replicas = nodespec.CreateTestSset("sset4Replicas", "7.2.0", 4, true, true)
podsSset4Replicas = []corev1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: sset3Replicas.Namespace,
Name: sset.PodName(sset4Replicas.Name, 0),
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: sset3Replicas.Namespace,
Name: sset.PodName(sset4Replicas.Name, 1),
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: sset3Replicas.Namespace,
Name: sset.PodName(sset4Replicas.Name, 2),
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: sset3Replicas.Namespace,
Name: sset.PodName(sset4Replicas.Name, 3),
},
},
}
runtimeObjs = []runtime.Object{&sset3Replicas, &sset4Replicas,
&podsSset3Replicas[0], &podsSset3Replicas[1], &podsSset3Replicas[2],
&podsSset4Replicas[0], &podsSset4Replicas[1], &podsSset4Replicas[2], &podsSset4Replicas[3],
}
requeueResults = (&reconciler.Results{}).WithResult(defaultRequeue)
emptyResults = &reconciler.Results{}
)

// fakeESClient mocks the ES client to register function calls that were made.
Expand Down Expand Up @@ -74,7 +127,7 @@ func TestHandleDownscale(t *testing.T) {
// We want to downscale 2 StatefulSets (3 -> 1 and 4 -> 2) in version 7.X,
// but should only be allowed a partial downscale (3 -> 1 and 4 -> 3).

k8sClient := k8s.WrapClient(fake.NewFakeClient(&sset3Replicas, &sset4Replicas))
k8sClient := k8s.WrapClient(fake.NewFakeClient(runtimeObjs...))
esClient := &fakeESClient{}
actualStatefulSets := sset.StatefulSetList{sset3Replicas, sset4Replicas}
downscaleCtx := downscaleContext{
Expand Down Expand Up @@ -134,6 +187,9 @@ func TestHandleDownscale(t *testing.T) {
sset4ReplicasExpectedAfterDownscale.Spec.Replicas = common.Int32(3)
expectedAfterDownscale := []appsv1.StatefulSet{sset3ReplicasDownscaled, sset4ReplicasExpectedAfterDownscale}

// a requeue should be requested since all nodes were not downscaled
require.Equal(t, requeueResults, results)

// voting config exclusion should have been added for leaving masters
require.True(t, esClient.AddVotingConfigExclusionsCalled)
require.Equal(t, []string{"sset3Replicas-2", "sset3Replicas-1", "sset4Replicas-3"}, esClient.AddVotingConfigExclusionsCalledWith)
Expand All @@ -144,18 +200,35 @@ func TestHandleDownscale(t *testing.T) {
require.NoError(t, err)
require.Equal(t, expectedAfterDownscale, actual.Items)

// running the downscale again should give the same results
// running the downscale again should requeue since some pods are not terminated yet
results = HandleDownscale(downscaleCtx, requestedStatefulSets, actual.Items)
require.False(t, results.HasError())
require.Equal(t, requeueResults, results)
// no StatefulSet should have been updated
err = k8sClient.List(&client.ListOptions{}, &actual)
require.NoError(t, err)
require.Equal(t, expectedAfterDownscale, actual.Items)

// simulate pods deletion that would be done by the StatefulSet controller
require.NoError(t, k8sClient.Delete(&podsSset3Replicas[2]))
require.NoError(t, k8sClient.Delete(&podsSset3Replicas[1]))
require.NoError(t, k8sClient.Delete(&podsSset4Replicas[3]))

// running the downscale again should requeue since data migration is still not over
results = HandleDownscale(downscaleCtx, requestedStatefulSets, actual.Items)
require.False(t, results.HasError())
require.Equal(t, requeueResults, results)
// no StatefulSet should have been updated
err = k8sClient.List(&client.ListOptions{}, &actual)
require.NoError(t, err)
require.Equal(t, expectedAfterDownscale, actual.Items)

// once data migration is over the complete downscale should go through
// once data migration is over the downscale should continue
downscaleCtx.observedState.ClusterState.RoutingTable.Indices["index-1"].Shards["0"][0].Node = "sset4Replicas-1"
expectedAfterDownscale[1].Spec.Replicas = common.Int32(2)
results = HandleDownscale(downscaleCtx, requestedStatefulSets, actual.Items)
require.False(t, results.HasError())
require.Equal(t, emptyResults, results)
err = k8sClient.List(&client.ListOptions{}, &actual)
require.NoError(t, err)
require.Equal(t, expectedAfterDownscale, actual.Items)
Expand All @@ -164,9 +237,13 @@ func TestHandleDownscale(t *testing.T) {
require.True(t, esClient.ExcludeFromShardAllocationCalled)
require.Equal(t, "sset4Replicas-2", esClient.ExcludeFromShardAllocationCalledWith)

// simulate pod deletion
require.NoError(t, k8sClient.Delete(&podsSset4Replicas[2]))

// running the downscale again should not remove any new node
results = HandleDownscale(downscaleCtx, requestedStatefulSets, actual.Items)
require.False(t, results.HasError())
require.Equal(t, emptyResults, results)
err = k8sClient.List(&client.ListOptions{}, &actual)
require.NoError(t, err)
require.Equal(t, expectedAfterDownscale, actual.Items)
Expand Down
22 changes: 20 additions & 2 deletions operators/pkg/controller/elasticsearch/sset/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"

"github.com/elastic/cloud-on-k8s/operators/pkg/apis/elasticsearch/v1alpha1"
"github.com/elastic/cloud-on-k8s/operators/pkg/controller/common/version"
"github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/label"
"github.com/elastic/cloud-on-k8s/operators/pkg/utils/k8s"
"github.com/elastic/cloud-on-k8s/operators/pkg/utils/stringsutil"
)

var log = logf.Log.WithName("statefulset")
Expand Down Expand Up @@ -68,11 +70,11 @@ func (l StatefulSetList) PodNames() []string {
return names
}

// GetActualPods returns the list of pods currently existing in the StatefulSetList.
// GetActualPodsForStatefulSet returns the list of pods currently existing in the StatefulSetList.
sebgl marked this conversation as resolved.
Show resolved Hide resolved
func (l StatefulSetList) GetActualPods(c k8s.Client) ([]corev1.Pod, error) {
allPods := []corev1.Pod{}
for _, statefulSet := range l {
pods, err := GetActualPods(c, statefulSet)
pods, err := GetActualPodsForStatefulSet(c, statefulSet)
if err != nil {
return nil, err
}
Expand All @@ -81,6 +83,22 @@ func (l StatefulSetList) GetActualPods(c k8s.Client) ([]corev1.Pod, error) {
return allPods, nil
}

// MatchActualPods returns true if actual existing pods match what is specified in the StatefulSetList.
// It may return false if there are pods in the process of being created (but not created yet)
// or terminated (but not removed yet).
func (l StatefulSetList) MatchActualPods(c k8s.Client, es v1alpha1.Elasticsearch) (bool, error) {
sebgl marked this conversation as resolved.
Show resolved Hide resolved
// pods we expect to be there based on StatefulSets spec
expectedPods := l.PodNames()
// pods that are there for this cluster
actualRawPods, err := GetActualPodsForCluster(c, es)
if err != nil {
return false, err
}
actualPods := k8s.PodNames(actualRawPods)
// check if they match
return len(expectedPods) == len(actualPods) && stringsutil.StringsInSlice(expectedPods, actualPods), nil
sebgl marked this conversation as resolved.
Show resolved Hide resolved
}

// DeepCopy returns a copy of the StatefulSetList with no reference to the original StatefulSetList.
func (l StatefulSetList) DeepCopy() StatefulSetList {
result := make(StatefulSetList, 0, len(l))
Expand Down
90 changes: 90 additions & 0 deletions operators/pkg/controller/elasticsearch/sset/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

"github.com/elastic/cloud-on-k8s/operators/pkg/apis/elasticsearch/v1alpha1"
"github.com/elastic/cloud-on-k8s/operators/pkg/controller/common"
"github.com/elastic/cloud-on-k8s/operators/pkg/controller/common/version"
"github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/label"
"github.com/elastic/cloud-on-k8s/operators/pkg/utils/k8s"
Expand Down Expand Up @@ -86,3 +88,91 @@ func TestStatefulSetList_GetExistingPods(t *testing.T) {
// This cannot be done currently since the fake client does not support label list options.
// See https://github.com/kubernetes-sigs/controller-runtime/pull/311
}

func TestStatefulSetList_MatchActualPods(t *testing.T) {
es := v1alpha1.Elasticsearch{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "cluster"},
}
statefulSet1 := appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "sset1"},
Spec: appsv1.StatefulSetSpec{
Replicas: common.Int32(2),
},
}
statefulSet2 := appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "sset2"},
Spec: appsv1.StatefulSetSpec{
Replicas: common.Int32(1),
},
}
sset1Pod0 := corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: es.Namespace, Name: "sset1-0", Labels: map[string]string{
label.ClusterNameLabelName: es.Name,
}},
}
sset1Pod1 := corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: es.Namespace, Name: "sset1-1", Labels: map[string]string{
label.ClusterNameLabelName: es.Name,
}},
}
sset1Pod2 := corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: es.Namespace, Name: "sset1-2", Labels: map[string]string{
label.ClusterNameLabelName: es.Name,
}},
}
sset2Pod0 := corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: es.Namespace, Name: "sset2-0", Labels: map[string]string{
label.ClusterNameLabelName: es.Name,
}},
}
tests := []struct {
name string
l StatefulSetList
c k8s.Client
want bool
}{
{
name: "no pods, no sset",
l: nil,
c: k8s.WrapClient(fake.NewFakeClient()),
want: true,
},
{
name: "some pods, no sset",
l: nil,
c: k8s.WrapClient(fake.NewFakeClient(&sset1Pod0)),
want: false,
},
{
name: "some statefulSets, no pod",
l: StatefulSetList{statefulSet1, statefulSet2},
c: k8s.WrapClient(fake.NewFakeClient(&statefulSet1, &statefulSet2)),
want: false,
},
{
name: "missing sset1Pod1",
l: StatefulSetList{statefulSet1, statefulSet2},
c: k8s.WrapClient(fake.NewFakeClient(&statefulSet1, &statefulSet2, &sset1Pod0, &sset2Pod0)),
want: false,
},
{
name: "additional pod sset1Pod2 that shouldn't be there",
l: StatefulSetList{statefulSet1, statefulSet2},
c: k8s.WrapClient(fake.NewFakeClient(&statefulSet1, &statefulSet2, &sset1Pod0, &sset1Pod1, &sset1Pod2, &sset2Pod0)),
want: false,
},
{
name: "pods match sset spec",
l: StatefulSetList{statefulSet1, statefulSet2},
c: k8s.WrapClient(fake.NewFakeClient(&statefulSet1, &statefulSet2, &sset1Pod0, &sset1Pod1, &sset2Pod0)),
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tt.l.MatchActualPods(tt.c, es)
require.NoError(t, err)
require.Equal(t, tt.want, got)
})
}
}
23 changes: 13 additions & 10 deletions operators/pkg/controller/elasticsearch/sset/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/elastic/cloud-on-k8s/operators/pkg/apis/elasticsearch/v1alpha1"
"github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/label"
"github.com/elastic/cloud-on-k8s/operators/pkg/utils/k8s"
)
Expand All @@ -34,9 +35,9 @@ func PodRevision(pod corev1.Pod) string {
return pod.Labels[appsv1.StatefulSetRevisionLabel]
}

// GetActualPods return the existing pods associated to this StatefulSet.
// GetActualPodsForStatefulSet returns the existing pods associated to this StatefulSet.
// The returned pods may not match the expected StatefulSet replicas in a transient situation.
func GetActualPods(c k8s.Client, sset appsv1.StatefulSet) ([]corev1.Pod, error) {
func GetActualPodsForStatefulSet(c k8s.Client, sset appsv1.StatefulSet) ([]corev1.Pod, error) {
var pods corev1.PodList
if err := c.List(&client.ListOptions{
Namespace: sset.Namespace,
Expand All @@ -49,16 +50,18 @@ func GetActualPods(c k8s.Client, sset appsv1.StatefulSet) ([]corev1.Pod, error)
return pods.Items, nil
}

func GetActualPodsNames(c k8s.Client, sset appsv1.StatefulSet) ([]string, error) {
actualPods, err := GetActualPods(c, sset)
if err != nil {
// GetActualPodsForCluster return the existing pods associated to this cluster.
func GetActualPodsForCluster(c k8s.Client, es v1alpha1.Elasticsearch) ([]corev1.Pod, error) {
var pods corev1.PodList
if err := c.List(&client.ListOptions{
Namespace: es.Namespace,
LabelSelector: labels.SelectorFromSet(map[string]string{
label.ClusterNameLabelName: es.Name,
}),
}, &pods); err != nil {
return nil, err
}
names := make([]string, 0, len(actualPods))
for _, p := range actualPods {
names = append(names, p.Name)
}
return names, nil
return pods.Items, nil
}

// ScheduledUpgradesDone returns true if all pods scheduled for upgrade have been upgraded.
Expand Down
Loading