Skip to content
This repository has been archived by the owner on Oct 20, 2022. It is now read-only.

Fix pod status which rarely fails in unit tests #145

Merged
merged 8 commits into from
Nov 26, 2019
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ endif

docker-e2e-test-fix-arg:
ifeq ($(E2E_ARGS),)
@echo "args are: RollingRestart ; ClusterScaleDown ; ClusterScaleUp ; ClusterScaleDownSimple" && exit 1
@echo "args are: ExecuteCleanup; RollingRestart ; ClusterScaleDown ; ClusterScaleUp ; ClusterScaleDownSimple" && exit 1
endif
docker run --rm --env GO111MODULE=on -v $(PWD):$(WORKDIR) -v $(KUBECONFIG):/root/.kube/config -v $(MINIKUBE_CONFIG):$(MINIKUBE_CONFIG_MOUNT) $(BUILD_IMAGE):$(OPERATOR_SDK_VERSION) /bin/bash -c 'operator-sdk test local ./test/e2e --debug --image $(E2EIMAGE) --go-test-flags "-v -timeout 60m -run ^TestCassandraCluster$$/^group$$/^$(E2E_ARGS)$$" --namespace cassandra-e2e' || { kubectl get events --all-namespaces --sort-by .metadata.creationTimestamp ; exit 1; }

Expand Down
19 changes: 10 additions & 9 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"os"
"runtime"
"strconv"
"strings"

"k8s.io/apimachinery/pkg/util/intstr"

Expand Down Expand Up @@ -49,8 +50,8 @@ var (
)

const (
LogLevelEnvVar = "LOG_LEVEL"
ResyncPeriodEnvVar = "RESYNC_PERIOD"
logLevelEnvVar = "LOG_LEVEL"
resyncPeriodEnvVar = "RESYNC_PERIOD"
)

//to be set by compilator with -ldflags "-X main.compileDate=`date -u +.%Y%m%d.%H%M%S`"
Expand All @@ -67,18 +68,18 @@ func printVersion() {
}

func getLogLevel() logrus.Level {
logLevel, found := os.LookupEnv(LogLevelEnvVar)
logLevel, found := os.LookupEnv(logLevelEnvVar)
if !found {
return logrus.InfoLevel
}
switch logLevel {
case "Debug":
switch strings.ToUpper(logLevel) {
case "DEBUG":
return logrus.DebugLevel
case "Info":
case "INFO":
return logrus.InfoLevel
case "Error":
case "ERROR":
return logrus.ErrorLevel
case "Warn":
case "WARN":
return logrus.WarnLevel
}
return logrus.InfoLevel
Expand All @@ -87,7 +88,7 @@ func getLogLevel() logrus.Level {
func getResyncPeriod() int {
var resyncPeriod int
var err error
resync, found := os.LookupEnv(ResyncPeriodEnvVar)
resync, found := os.LookupEnv(resyncPeriodEnvVar)
if !found {
resyncPeriod = api.DefaultResyncPeriod
} else {
Expand Down
52 changes: 31 additions & 21 deletions pkg/controller/cassandracluster/pod_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,14 @@ func (rcc *ReconcileCassandraCluster) ensureOperation(cc *api.CassandraCluster,
go rcc.monitorOperation(hostName, cc, dcRackName, pod, operationName)
continue
}
// Add the operatorName to the last pod operation in case the operator pod is replaced
status.CassandraRackStatus[dcRackName].PodLastOperation.OperatorName = os.Getenv("POD_NAME")
err := rcc.startOperation(cc, status, pod, dcRackName, operationName)
if err != nil {
logrus.WithFields(logrus.Fields{"cluster": cc.Name, "rack": dcRackName,
"pod": pod.Name, "err": err}).Debug("Failed to start operation on pod")
continue
}
// Add the operatorName to the last pod operation in case the operator pod is replaced
status.CassandraRackStatus[dcRackName].PodLastOperation.OperatorName = os.Getenv("POD_NAME")
go rcc.runOperation(operationName, hostName, cc, dcRackName, pod, status)
}
}
Expand Down Expand Up @@ -512,6 +512,34 @@ func (rcc *ReconcileCassandraCluster) ensureDecommissionFinalizing(cc *api.Cassa
return breakResyncLoop, nil
}

func (rcc *ReconcileCassandraCluster) podsSlice(cc *api.CassandraCluster, status *api.CassandraClusterStatus,
podLastOperation api.PodLastOperation, dcRackName, operationName, operatorName string) ([]v1.Pod, bool) {
checkOnly := false
podsSlice := make([]v1.Pod, 0)
// Operator is different from when the previous operation was started
// Set checkOnly to restart the monitoring function to wait until the operation is done
if podLastOperation.Name == operationName && podLastOperation.Status == api.StatusOngoing &&
podLastOperation.OperatorName != "" && podLastOperation.OperatorName != operatorName {
logrus.WithFields(logrus.Fields{"cluster": cc.Name, "rack": dcRackName,
"podLastOperation.OperatorName": podLastOperation.OperatorName, "operatorName": operatorName,
"operation": strings.Title(operationName)}).Info("Operator's name is different, we enable checking routines")
podLastOperation.OperatorName = operatorName

for _, podName := range podLastOperation.Pods {
p, err := rcc.GetPod(cc.Namespace, podName)
if err != nil || p.Status.Phase != v1.PodRunning || p.DeletionTimestamp != nil {
continue
}
podsSlice = append(podsSlice, *p)
}
checkOnly = true
return podsSlice, checkOnly
}
dcName, rackName := cc.GetDCAndRackFromDCRackName(dcRackName)
podsSlice = rcc.initOperation(cc, status, dcName, rackName, operationName)
return podsSlice, checkOnly
}

// Get pods that need an operation to run on
// Returns if checking is needed (can happen if the operator has been killed during an operation)
func (rcc *ReconcileCassandraCluster) getPodsToWorkOn(cc *api.CassandraCluster, dcName, rackName string,
Expand All @@ -533,25 +561,7 @@ func (rcc *ReconcileCassandraCluster) getPodsToWorkOn(cc *api.CassandraCluster,
"podLastOperation.OperatorName": podLastOperation.OperatorName,
"podLastOperation.Pods": podLastOperation.Pods}).Debug("Display information about pods")

// Operator is different from when the previous operation was started
// Set checkOnly to restart the monitoring function to wait until the operation is done
if podLastOperation.Name == operationName && podLastOperation.OperatorName != operatorName &&
podLastOperation.Status == api.StatusOngoing {
checkOnly = true
podLastOperation.OperatorName = operatorName
logrus.WithFields(logrus.Fields{"cluster": cc.Name, "rack": dcRackName,
"operation": strings.Title(operationName)}).Debug("Operator's name is different, we enable checking routines")

for _, podName := range podLastOperation.Pods {
p, err := rcc.GetPod(cc.Namespace, podName)
if err != nil || p.Status.Phase != v1.PodRunning || p.DeletionTimestamp != nil {
continue
}
podsSlice = append(podsSlice, *p)
}
} else {
podsSlice = rcc.initOperation(cc, status, dcName, rackName, operationName)
}
podsSlice, checkOnly = rcc.podsSlice(cc, status, *podLastOperation, dcRackName, operationName, operatorName)

if checkOnly {
if len(podsSlice) == 0 {
Expand Down
79 changes: 79 additions & 0 deletions pkg/controller/cassandracluster/pod_operation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2019 Orange
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cassandracluster

import (
"testing"

api "github.com/Orange-OpenSource/cassandra-k8s-operator/pkg/apis/db/v1alpha1"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestPodsSlice(t *testing.T) {
assert := assert.New(t)

rcc, cc := helperInitCluster(t, "cassandracluster-2DC.yaml")
status := &cc.Status

operatorName := "new-name"
oldOperatorName := "old-name"
operationName := "cleanup"
dcRackName := "dc1-rack1"
podLastOperation := &status.CassandraRackStatus[dcRackName].PodLastOperation
podLastOperation.Name = operationName

// Conditions to return checkOnly set to true with an empty podsSlice
podLastOperation.Status = api.StatusOngoing
podLastOperation.OperatorName = oldOperatorName

podsSlice, checkOnly := rcc.podsSlice(cc, status, *podLastOperation, dcRackName, operationName, operatorName)

assert.Equal(len(podsSlice), 0)
assert.Equal(checkOnly, true)

// Missing condition sets checkOnly to false
podLastOperation.Status = api.StatusDone

podsSlice, checkOnly = rcc.podsSlice(cc, status, *podLastOperation, dcRackName, operationName, operatorName)

assert.Equal(len(podsSlice), 0)
assert.Equal(checkOnly, false)

//Create a pod to have something to put in podsSlice
pod := &v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "cassandra-demo-dc1-rack1-0",
Namespace: "ns",
Labels: map[string]string{"app": "cassandracluster"},
},
}
pod.Status.Phase = v1.PodRunning
rcc.CreatePod(pod)

podLastOperation.Status = api.StatusOngoing
podLastOperation.Pods = []string{pod.GetName()}
// Set the operator name to a different value than the current operator name
podLastOperation.OperatorName = oldOperatorName

podsSlice, checkOnly = rcc.podsSlice(cc, status, *podLastOperation, dcRackName, operationName, operatorName)
assert.Equal(podsSlice, []v1.Pod{*pod})
assert.Equal(checkOnly, true)
}
15 changes: 5 additions & 10 deletions test/e2e/cassandracluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package e2e

import (
goctx "context"
"errors"
"fmt"
"os"
"testing"
Expand Down Expand Up @@ -425,22 +424,18 @@ func cassandraClusterCleanupTest(t *testing.T, f *framework.Framework, ctx *fram
return executed, err
}

checkRack1 := func(cc *api.CassandraCluster) (bool, error) {
return conditionFunc(cc, "rack1", 0)
}

checkRack2 := func(cc *api.CassandraCluster) (bool, error) {
return conditionFunc(cc, "rack2", 0)
checkRack := func(rack string) func(cc *api.CassandraCluster) (bool, error) {
return func(cc *api.CassandraCluster) (bool, error) { return conditionFunc(cc, rack, 0) }
}

logrus.Infof("Wait for cleanup to finish in rack1\n")
err = mye2eutil.WaitForStatusChange(t, f, namespace, clusterName, 1*time.Second, 60*time.Second, checkRack1)
err = mye2eutil.WaitForStatusChange(t, f, namespace, clusterName, 1*time.Second, 60*time.Second, checkRack("rack1"))
if err != nil {
t.Errorf("WaitForStatusChange failed: %s", err)
}

logrus.Infof("Wait for cleanup to finish in rack2\n")
err = mye2eutil.WaitForStatusChange(t, f, namespace, clusterName, 1*time.Second, 60*time.Second, checkRack2)
err = mye2eutil.WaitForStatusChange(t, f, namespace, clusterName, 1*time.Second, 60*time.Second, checkRack("rack2"))
if err != nil {
t.Errorf("WaitForStatusChange failed: %s", err)
}
Expand All @@ -464,7 +459,7 @@ func findServicePort(name string, ports []v1.ServicePort) (*v1.ServicePort, erro
return &port, nil
}
}
return nil, errors.New(fmt.Sprintf("Failed to find service port: %s", name))
return nil, fmt.Errorf("Failed to find service port: %s", name)
}

func getStatefulSet(name string, namespace string, f *framework.Framework, t *testing.T) *appsv1.StatefulSet {
Expand Down