diff --git a/pkg/webhook/pod/pd_deleter.go b/pkg/webhook/pod/pd_deleter.go index aa896ba9ca..0e1fa663d1 100644 --- a/pkg/webhook/pod/pd_deleter.go +++ b/pkg/webhook/pod/pd_deleter.go @@ -84,7 +84,7 @@ func (pc *PodAdmissionControl) admitDeletePdPods(payload *admitPayload) *admissi // check the pd pods which have been upgraded before were all health if isUpgrading { klog.Infof("receive delete pd pod[%s/%s] of tc[%s/%s] is upgrading, make sure former pd upgraded status was health", namespace, name, namespace, tcName) - err = checkFormerPDPodStatus(pc.kubeCli, payload.pdClient, payload.tc, namespace, ordinal, *payload.ownerStatefulSet.Spec.Replicas) + err = checkFormerPDPodStatus(pc.kubeCli, payload.pdClient, payload.tc, payload.ownerStatefulSet, ordinal) if err != nil { return util.ARFail(err) } diff --git a/pkg/webhook/pod/pd_deleter_test.go b/pkg/webhook/pod/pd_deleter_test.go index 4911f8b45c..65cfe2dd6f 100644 --- a/pkg/webhook/pod/pd_deleter_test.go +++ b/pkg/webhook/pod/pd_deleter_test.go @@ -15,8 +15,6 @@ package pod import ( "fmt" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime" "testing" . "github.com/onsi/gomega" @@ -30,7 +28,9 @@ import ( admission "k8s.io/api/admission/v1beta1" apps "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" kubefake "k8s.io/client-go/kubernetes/fake" k8sTesting "k8s.io/client-go/testing" ) @@ -67,7 +67,7 @@ func TestPDDeleterDelete(t *testing.T) { deletePod := newPDPodForPDPodAdmissionControl() ownerStatefulSet := newOwnerStatefulSetForPDPodAdmissionControl() - tc := newTidbClusterForPodAdmissionControl() + tc := newTidbClusterForPodAdmissionControl(pdReplicas, tikvReplicas) kubeCli := kubefake.NewSimpleClientset() if test.UpdatePVCErr { diff --git a/pkg/webhook/pod/pods_test.go b/pkg/webhook/pod/pods_test.go index d9b62a27d9..14633beafc 100644 --- a/pkg/webhook/pod/pods_test.go +++ b/pkg/webhook/pod/pods_test.go @@ -14,6 +14,7 @@ package pod import ( + "strconv" "testing" . "github.com/onsi/gomega" @@ -139,8 +140,8 @@ func newPodAdmissionControl(kubeCli kubernetes.Interface) *PodAdmissionControl { } } -func newTidbClusterForPodAdmissionControl() *v1alpha1.TidbCluster { - return &v1alpha1.TidbCluster{ +func newTidbClusterForPodAdmissionControl(pdReplicas int32, tikvReplicas int32) *v1alpha1.TidbCluster { + tc := &v1alpha1.TidbCluster{ TypeMeta: metav1.TypeMeta{ Kind: "TidbCluster", APIVersion: "pingcap.com/v1alpha1", @@ -171,26 +172,29 @@ func newTidbClusterForPodAdmissionControl() *v1alpha1.TidbCluster { TiKV: v1alpha1.TiKVStatus{ Synced: true, Phase: v1alpha1.NormalPhase, - Stores: map[string]v1alpha1.TiKVStore{ - "0": { - PodName: memberUtils.TikvPodName(tcName, 0), - LeaderCount: 1, - State: v1alpha1.TiKVStateUp, - }, - "1": { - PodName: memberUtils.TikvPodName(tcName, 1), - LeaderCount: 1, - State: v1alpha1.TiKVStateUp, - }, - "2": { - PodName: memberUtils.TikvPodName(tcName, 2), - LeaderCount: 1, - State: v1alpha1.TiKVStateUp, - }, - }, + Stores: map[string]v1alpha1.TiKVStore{}, + }, + PD: v1alpha1.PDStatus{ + Synced: true, + Phase: v1alpha1.NormalPhase, + Members: map[string]v1alpha1.PDMember{}, }, }, } + for i := 0; int32(i) < tikvReplicas; i++ { + tc.Status.TiKV.Stores[strconv.Itoa(i)] = v1alpha1.TiKVStore{ + PodName: memberUtils.TikvPodName(tcName, int32(i)), + LeaderCount: 1, + State: v1alpha1.TiKVStateUp, + } + } + for i := 0; int32(i) < pdReplicas; i++ { + tc.Status.PD.Members[memberUtils.PdPodName(tcName, int32(i))] = v1alpha1.PDMember{ + Health: true, + Name: memberUtils.PdPodName(tcName, int32(i)), + } + } + return tc } func newNormalPod() *corev1.Pod { diff --git a/pkg/webhook/pod/tikv_deleter.go b/pkg/webhook/pod/tikv_deleter.go index 917595e2f0..f1b2e74850 100644 --- a/pkg/webhook/pod/tikv_deleter.go +++ b/pkg/webhook/pod/tikv_deleter.go @@ -183,7 +183,7 @@ func (pc *PodAdmissionControl) admitDeleteUpTiKVPod(payload *admitPayload, store } if isUpgrading { - err = checkFormerTiKVPodStatus(pc.kubeCli, payload.tc, ordinal, *payload.ownerStatefulSet.Spec.Replicas, storesInfo) + err = checkFormerTiKVPodStatus(pc.kubeCli, payload.tc, ordinal, payload.ownerStatefulSet, storesInfo) if err != nil { klog.Infof("tc[%s/%s]'s tikv pod[%s/%s] failed to delete,%v", namespace, tcName, namespace, name, err) return util.ARFail(err) diff --git a/pkg/webhook/pod/tikv_deleter_test.go b/pkg/webhook/pod/tikv_deleter_test.go index b0e7d0c8d3..e13dadf008 100644 --- a/pkg/webhook/pod/tikv_deleter_test.go +++ b/pkg/webhook/pod/tikv_deleter_test.go @@ -59,7 +59,7 @@ func TestTiKVDeleterDelete(t *testing.T) { t.Log(test.name) deleteTiKVPod := newTiKVPod(1) ownerStatefulSet := newOwnerStatefulsetForTikv() - tc := newTidbClusterForPodAdmissionControl() + tc := newTidbClusterForPodAdmissionControl(pdReplicas, tikvReplicas) kubeCli := kubefake.NewSimpleClientset() podAdmissionControl := newPodAdmissionControl(kubeCli) diff --git a/pkg/webhook/pod/tikv_util.go b/pkg/webhook/pod/tikv_util.go index 2689d3f343..8a781cfd4d 100644 --- a/pkg/webhook/pod/tikv_util.go +++ b/pkg/webhook/pod/tikv_util.go @@ -18,25 +18,28 @@ import ( "strings" "time" - "k8s.io/client-go/kubernetes" - "k8s.io/klog" - + "github.com/pingcap/advanced-statefulset/client/apis/apps/v1/helper" "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" memberUtil "github.com/pingcap/tidb-operator/pkg/manager/member" "github.com/pingcap/tidb-operator/pkg/pdapi" apps "k8s.io/api/apps/v1" core "k8s.io/api/core/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/klog" ) // checkFormerTiKVPodStatus would check all the former tikv pods whether their store state were UP during Upgrading // check need both check former pod is ready ,store up, and no evict leader -func checkFormerTiKVPodStatus(kubeCli kubernetes.Interface, tc *v1alpha1.TidbCluster, ordinal int32, replicas int32, storesInfo *pdapi.StoresInfo) error { +func checkFormerTiKVPodStatus(kubeCli kubernetes.Interface, tc *v1alpha1.TidbCluster, ordinal int32, set *apps.StatefulSet, storesInfo *pdapi.StoresInfo) error { tcName := tc.Name namespace := tc.Namespace - for i := replicas - 1; i > ordinal; i-- { + for i := range helper.GetPodOrdinals(tc.Spec.TiKV.Replicas, set) { + if i <= ordinal { + continue + } podName := memberUtil.TikvPodName(tcName, i) pod, err := kubeCli.CoreV1().Pods(namespace).Get(podName, meta.GetOptions{}) if err != nil { diff --git a/pkg/webhook/pod/util.go b/pkg/webhook/pod/util.go index f5d9200a2f..5df239522b 100644 --- a/pkg/webhook/pod/util.go +++ b/pkg/webhook/pod/util.go @@ -19,7 +19,6 @@ import ( "github.com/pingcap/advanced-statefulset/client/apis/apps/v1/helper" "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" - "github.com/pingcap/tidb-operator/pkg/features" "github.com/pingcap/tidb-operator/pkg/label" memberUtil "github.com/pingcap/tidb-operator/pkg/manager/member" "github.com/pingcap/tidb-operator/pkg/pdapi" @@ -64,7 +63,7 @@ func addDeferDeletingToPVC(pvc *core.PersistentVolumeClaim, kubeCli kubernetes.I // check whether the former upgraded pd pods were healthy in PD cluster during PD upgrading. // If not,then return an error -func checkFormerPDPodStatus(kubeCli kubernetes.Interface, pdClient pdapi.PDClient, tc *v1alpha1.TidbCluster, namespace string, ordinal int32, replicas int32) error { +func checkFormerPDPodStatus(kubeCli kubernetes.Interface, pdClient pdapi.PDClient, tc *v1alpha1.TidbCluster, set *apps.StatefulSet, ordinal int32) error { healthInfo, err := pdClient.GetHealth() if err != nil { return err @@ -73,9 +72,14 @@ func checkFormerPDPodStatus(kubeCli kubernetes.Interface, pdClient pdapi.PDClien for _, memberHealth := range healthInfo.Healths { membersHealthMap[memberHealth.Name] = memberHealth.Health } + namespace := tc.Namespace tcName := tc.Name - for i := replicas - 1; i > ordinal; i-- { + + for i := range helper.GetPodOrdinals(tc.Spec.PD.Replicas, set) { + if i <= ordinal { + continue + } podName := memberUtil.PdPodName(tcName, i) pod, err := kubeCli.CoreV1().Pods(namespace).Get(podName, meta.GetOptions{}) if err != nil { @@ -155,21 +159,9 @@ func checkFormerPodRestartStatus(kubeCli kubernetes.Interface, memberType v1alph return false, nil } - if features.DefaultFeatureGate.Enabled(features.AdvancedStatefulSet) { - for k := range helper.GetPodOrdinals(replicas, payload.ownerStatefulSet) { - if k > ordinal { - existed, err := f(tc.Name, k, memberType) - if err != nil { - return false, err - } - if existed { - return true, nil - } - } - } - } else { - for i := replicas - 1; i > ordinal; i-- { - existed, err := f(tc.Name, i, memberType) + for k := range helper.GetPodOrdinals(replicas, payload.ownerStatefulSet) { + if k > ordinal { + existed, err := f(tc.Name, k, memberType) if err != nil { return false, err } @@ -178,6 +170,7 @@ func checkFormerPodRestartStatus(kubeCli kubernetes.Interface, memberType v1alph } } } + return false, nil } diff --git a/pkg/webhook/pod/util_test.go b/pkg/webhook/pod/util_test.go new file mode 100644 index 0000000000..320d56e45c --- /dev/null +++ b/pkg/webhook/pod/util_test.go @@ -0,0 +1,223 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pod + +import ( + "fmt" + "github.com/pingcap/kvproto/pkg/metapb" + "testing" + + . "github.com/onsi/gomega" + "github.com/pingcap/advanced-statefulset/client/apis/apps/v1/helper" + "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/controller" + memberUtils "github.com/pingcap/tidb-operator/pkg/manager/member" + "github.com/pingcap/tidb-operator/pkg/pdapi" + operatorUtils "github.com/pingcap/tidb-operator/pkg/util" + apps "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + kubeinformers "k8s.io/client-go/informers" + kubefake "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" +) + +func TestCheckPDFormerPodStatus(t *testing.T) { + g := NewGomegaWithT(t) + type testcase struct { + stsReplicas int32 + name string + targetOrdinal int32 + deleteSlots []int32 + permit bool + } + tests := []testcase{ + { + stsReplicas: 5, + name: "last target ordinal", + targetOrdinal: 4, + deleteSlots: []int32{}, + permit: true, + }, + { + stsReplicas: 5, + name: "FirstTargetOrdinal", + targetOrdinal: 0, + deleteSlots: []int32{}, + permit: true, + }, + { + stsReplicas: 4, + name: "mid target ordinal, check success", + targetOrdinal: 1, + deleteSlots: []int32{0}, + permit: true, + }, + { + stsReplicas: 4, + name: "mid target ordinal, check success", + targetOrdinal: 1, + deleteSlots: []int32{2}, + permit: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + kubeCli, _ := newFakeComponent() + pdControl := pdapi.NewFakePDControl(kubeCli) + slots := sets.NewInt32(test.deleteSlots...) + tc := newTidbClusterForPodAdmissionControl(test.stsReplicas, test.stsReplicas) + fakePDClient := controller.NewFakePDClient(pdControl, tc) + sts := buildTargetStatefulSet(tc, v1alpha1.PDMemberType) + err := helper.SetDeleteSlots(sts, slots) + g.Expect(err).NotTo(HaveOccurred()) + healthInfo := &pdapi.HealthInfo{} + for i := range helper.GetPodOrdinals(test.stsReplicas, sts) { + healthInfo.Healths = append(healthInfo.Healths, pdapi.MemberHealth{ + Name: memberUtils.PdPodName(tc.Name, i), + Health: true, + }) + pod := buildPod(tc, v1alpha1.PDMemberType, i) + pod.Labels[apps.ControllerRevisionHashLabelKey] = sts.Status.UpdateRevision + kubeCli.CoreV1().Pods(tc.Namespace).Create(pod) + } + fakePDClient.AddReaction(pdapi.GetHealthActionType, func(action *pdapi.Action) (interface{}, error) { + return healthInfo, nil + }) + + err = checkFormerPDPodStatus(kubeCli, fakePDClient, tc, sts, test.targetOrdinal) + if test.permit { + g.Expect(err).NotTo(HaveOccurred()) + } else { + g.Expect(err).Should(HaveOccurred()) + } + }) + } +} + +func TestCheckTiKVFormerPodStatus(t *testing.T) { + g := NewGomegaWithT(t) + type testcase struct { + stsReplicas int32 + name string + targetOrdinal int32 + deleteSlots []int32 + permit bool + } + tests := []testcase{ + { + stsReplicas: 5, + name: "last target ordinal", + targetOrdinal: 4, + deleteSlots: []int32{}, + permit: true, + }, + { + stsReplicas: 5, + name: "FirstTargetOrdinal", + targetOrdinal: 0, + deleteSlots: []int32{}, + permit: true, + }, + { + stsReplicas: 4, + name: "mid target ordinal, check success", + targetOrdinal: 1, + deleteSlots: []int32{0}, + permit: true, + }, + { + stsReplicas: 4, + name: "mid target ordinal, check success", + targetOrdinal: 1, + deleteSlots: []int32{2}, + permit: true, + }, + } + + for _, test := range tests { + kubeCli, _ := newFakeComponent() + slots := sets.NewInt32(test.deleteSlots...) + tc := newTidbClusterForPodAdmissionControl(test.stsReplicas, test.stsReplicas) + sts := buildTargetStatefulSet(tc, v1alpha1.TiKVMemberType) + err := helper.SetDeleteSlots(sts, slots) + g.Expect(err).NotTo(HaveOccurred()) + for i := range helper.GetPodOrdinals(test.stsReplicas, sts) { + pod := buildPod(tc, v1alpha1.TiKVMemberType, i) + pod.Labels[apps.ControllerRevisionHashLabelKey] = sts.Status.UpdateRevision + kubeCli.CoreV1().Pods(tc.Namespace).Create(pod) + } + err = checkFormerTiKVPodStatus(kubeCli, tc, test.targetOrdinal, sts, buildStoresInfo(tc, sts)) + if test.permit { + g.Expect(err).NotTo(HaveOccurred()) + } else { + g.Expect(err).Should(HaveOccurred()) + } + } +} + +func buildStoresInfo(tc *v1alpha1.TidbCluster, sts *apps.StatefulSet) *pdapi.StoresInfo { + ssi := &pdapi.StoresInfo{ + Stores: []*pdapi.StoreInfo{}, + } + for i := range helper.GetPodOrdinals(tc.Spec.TiKV.Replicas, sts) { + si := buildStoreInfo(tc, i) + ssi.Stores = append(ssi.Stores, si) + } + return ssi +} + +func buildStoreInfo(tc *v1alpha1.TidbCluster, ordinal int32) *pdapi.StoreInfo { + si := &pdapi.StoreInfo{ + Store: &pdapi.MetaStore{ + Store: &metapb.Store{ + Address: fmt.Sprintf("%s-tikv-%d.%s-tikv-peer.%s.svc:20160", tc.Name, ordinal, tc.Name, tc.Namespace), + }, + StateName: v1alpha1.TiKVStateUp, + }, + } + return si +} + +func buildPod(tc *v1alpha1.TidbCluster, memberType v1alpha1.MemberType, ordinal int32) *corev1.Pod { + pod := &corev1.Pod{} + pod.Namespace = tc.Namespace + pod.Name = operatorUtils.GetPodName(tc, memberType, ordinal) + pod.Labels = map[string]string{} + return pod +} + +func buildTargetStatefulSet(tc *v1alpha1.TidbCluster, memberType v1alpha1.MemberType) *apps.StatefulSet { + sts := &apps.StatefulSet{} + sts.Name = fmt.Sprintf("%s-%s", tcName, memberType.String()) + sts.Namespace = tc.Namespace + sts.Status.UpdateRevision = "1" + switch memberType { + case v1alpha1.PDMemberType: + sts.Spec.Replicas = &tc.Spec.PD.Replicas + tc.Status.PD.StatefulSet = &sts.Status + case v1alpha1.TiKVMemberType: + sts.Spec.Replicas = &tc.Spec.TiKV.Replicas + tc.Status.TiKV.StatefulSet = &sts.Status + } + return sts +} + +func newFakeComponent() (*kubefake.Clientset, cache.Indexer) { + kubeCli := kubefake.NewSimpleClientset() + kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeCli, 0) + podInformer := kubeInformerFactory.Core().V1().Pods() + return kubeCli, podInformer.Informer().GetIndexer() +}