Skip to content

Commit

Permalink
scaling for tiflash (#2237)
Browse files Browse the repository at this point in the history
* scaling for tiflash

* address comments

* remove minor fixes
  • Loading branch information
DanielZhangQD authored Apr 23, 2020
1 parent 1b8ea5a commit 4cb0761
Show file tree
Hide file tree
Showing 17 changed files with 1,036 additions and 119 deletions.
4 changes: 4 additions & 0 deletions pkg/apis/pingcap/v1alpha1/tidbcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ func (tc *TidbCluster) TiDBUpgrading() bool {
return tc.Status.TiDB.Phase == UpgradePhase
}

func (tc *TidbCluster) TiFlashUpgrading() bool {
return tc.Status.TiFlash.Phase == UpgradePhase
}

func (tc *TidbCluster) PDAllPodsStarted() bool {
return tc.PDStsDesiredReplicas() == tc.PDStsActualReplicas()
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/controller/pod_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (rpc *realPodControl) UpdateMetaInfo(tc *v1alpha1.TidbCluster, pod *corev1.
}
}
}
case label.TiKVLabelVal:
case label.TiKVLabelVal, label.TiFlashLabelVal:
if labels[label.StoreIDLabelKey] == "" {
// get store id
stores, err := pdClient.GetStores()
Expand Down Expand Up @@ -305,7 +305,6 @@ func (fpc *FakePodControl) UpdateMetaInfo(_ *v1alpha1.TidbCluster, pod *corev1.P
}

setIfNotEmpty(pod.Labels, label.NameLabelKey, TestName)
setIfNotEmpty(pod.Labels, label.ComponentLabelKey, TestComponentName)
setIfNotEmpty(pod.Labels, label.ManagedByLabelKey, TestManagedByName)
setIfNotEmpty(pod.Labels, label.InstanceLabelKey, TestClusterName)
setIfNotEmpty(pod.Labels, label.ClusterIDLabelKey, TestClusterID)
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/pvc_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func (rpc *realPVCControl) UpdateMetaInfo(tc *v1alpha1.TidbCluster, pvc *corev1.
if pvc.Labels[label.ClusterIDLabelKey] == clusterID &&
pvc.Labels[label.MemberIDLabelKey] == memberID &&
pvc.Labels[label.StoreIDLabelKey] == storeID &&
pvc.Labels[label.AnnPodNameKey] == podName &&
pvc.Annotations[label.AnnPodNameKey] == podName {
klog.V(4).Infof("pvc %s/%s already has labels and annotations synced, skipping, TidbCluster: %s", ns, pvcName, tcName)
return pvc, nil
Expand All @@ -135,6 +136,7 @@ func (rpc *realPVCControl) UpdateMetaInfo(tc *v1alpha1.TidbCluster, pvc *corev1.
setIfNotEmpty(pvc.Labels, label.ClusterIDLabelKey, clusterID)
setIfNotEmpty(pvc.Labels, label.MemberIDLabelKey, memberID)
setIfNotEmpty(pvc.Labels, label.StoreIDLabelKey, storeID)
setIfNotEmpty(pvc.Labels, label.AnnPodNameKey, podName)
setIfNotEmpty(pvc.Annotations, label.AnnPodNameKey, podName)

labels := pvc.GetLabels()
Expand Down Expand Up @@ -244,6 +246,7 @@ func (fpc *FakePVCControl) UpdateMetaInfo(_ *v1alpha1.TidbCluster, pvc *corev1.P
setIfNotEmpty(pvc.Labels, label.ClusterIDLabelKey, pod.Labels[label.ClusterIDLabelKey])
setIfNotEmpty(pvc.Labels, label.MemberIDLabelKey, pod.Labels[label.MemberIDLabelKey])
setIfNotEmpty(pvc.Labels, label.StoreIDLabelKey, pod.Labels[label.StoreIDLabelKey])
setIfNotEmpty(pvc.Labels, label.AnnPodNameKey, pod.GetName())
setIfNotEmpty(pvc.Annotations, label.AnnPodNameKey, pod.GetName())
return nil, fpc.PVCIndexer.Update(pvc)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/tidbcluster/tidb_cluster_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (tcc *defaultTidbClusterControl) updateTidbCluster(tc *v1alpha1.TidbCluster
return err
}

// cleaning all orphan pods(pd or tikv which don't have a related PVC) managed by operator
// cleaning all orphan pods(pd, tikv or tiflash which don't have a related PVC) managed by operator
if _, err := tcc.orphanPodsCleaner.Clean(tc); err != nil {
return err
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/label/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ func (l Label) TiFlash() Label {
return l
}

// IsTiFlash returns whether label is a TiFlash
func (l Label) IsTiFlash() bool {
return l[ComponentLabelKey] == TiFlashLabelVal
}

// IsTiKV returns whether label is a TiKV
func (l Label) IsTiKV() bool {
return l[ComponentLabelKey] == TiKVLabelVal
Expand Down
51 changes: 29 additions & 22 deletions pkg/manager/member/orphan_pods_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

const (
skipReasonOrphanPodsCleanerIsNotPDOrTiKV = "orphan pods cleaner: member type is not pd or tikv"
skipReasonOrphanPodsCleanerIsNotTarget = "orphan pods cleaner: member type is not pd, tikv or tiflash"
skipReasonOrphanPodsCleanerPVCNameIsEmpty = "orphan pods cleaner: pvcName is empty"
skipReasonOrphanPodsCleanerPVCIsFound = "orphan pods cleaner: pvc is found"
skipReasonOrphanPodsCleanerPodHasBeenScheduled = "orphan pods cleaner: pod has been scheduled"
Expand Down Expand Up @@ -82,8 +82,8 @@ func (opc *orphanPodsCleaner) Clean(tc *v1alpha1.TidbCluster) (map[string]string
for _, pod := range pods {
podName := pod.GetName()
l := label.Label(pod.Labels)
if !(l.IsPD() || l.IsTiKV()) {
skipReason[podName] = skipReasonOrphanPodsCleanerIsNotPDOrTiKV
if !(l.IsPD() || l.IsTiKV() || l.IsTiFlash()) {
skipReason[podName] = skipReasonOrphanPodsCleanerIsNotTarget
continue
}

Expand All @@ -92,39 +92,46 @@ func (opc *orphanPodsCleaner) Clean(tc *v1alpha1.TidbCluster) (map[string]string
continue
}

// TODO support multiple pvcs case?
var pvcName string
var pvcNames []string
for _, vol := range pod.Spec.Volumes {
if vol.PersistentVolumeClaim != nil {
pvcName = vol.PersistentVolumeClaim.ClaimName
break
if vol.PersistentVolumeClaim.ClaimName != "" {
pvcNames = append(pvcNames, vol.PersistentVolumeClaim.ClaimName)
}
}
}
if pvcName == "" {
if len(pvcNames) < 1 {
skipReason[podName] = skipReasonOrphanPodsCleanerPVCNameIsEmpty
continue
}

var err error
// check informer cache
_, err = opc.pvcLister.PersistentVolumeClaims(ns).Get(pvcName)
if err == nil {
skipReason[podName] = skipReasonOrphanPodsCleanerPVCIsFound
continue
}
if !errors.IsNotFound(err) {
return skipReason, err
var pvcNotFound bool
for _, p := range pvcNames {
// check informer cache
_, err = opc.pvcLister.PersistentVolumeClaims(ns).Get(p)
if err == nil {
continue
}
if !errors.IsNotFound(err) {
return skipReason, err
}
// if PVC not found in cache, re-check from apiserver directly to make sure the PVC really not exist
_, err = opc.kubeCli.CoreV1().PersistentVolumeClaims(ns).Get(p, metav1.GetOptions{})
if err == nil {
continue
}
if !errors.IsNotFound(err) {
return skipReason, err
}
pvcNotFound = true
break
}

// if PVC not found in cache, re-check from apiserver directly to make sure the PVC really not exist
_, err = opc.kubeCli.CoreV1().PersistentVolumeClaims(ns).Get(pvcName, metav1.GetOptions{})
if err == nil {
if !pvcNotFound {
skipReason[podName] = skipReasonOrphanPodsCleanerPVCIsFound
continue
}
if !errors.IsNotFound(err) {
return skipReason, err
}

// if the PVC is not found in apiserver (also informer cache) and the
// pod has not been scheduled, delete it and let the stateful
Expand Down
54 changes: 52 additions & 2 deletions pkg/manager/member/orphan_pods_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestOrphanPodsCleanerClean(t *testing.T) {
expectFn: func(g *GomegaWithT, skipReason map[string]string, _ *orphanPodsCleaner, err error) {
g.Expect(err).NotTo(HaveOccurred())
g.Expect(len(skipReason)).To(Equal(1))
g.Expect(skipReason["pod-1"]).To(Equal(skipReasonOrphanPodsCleanerIsNotPDOrTiKV))
g.Expect(skipReason["pod-1"]).To(Equal(skipReasonOrphanPodsCleanerIsNotTarget))
},
},
{
Expand Down Expand Up @@ -226,6 +226,56 @@ func TestOrphanPodsCleanerClean(t *testing.T) {
g.Expect(strings.Contains(err.Error(), "not found")).To(BeTrue())
},
},
{
name: "one of two pvcs is not found",
pods: []*corev1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Namespace: metav1.NamespaceDefault,
Labels: label.New().Instance(tc.GetInstanceName()).PD().Labels(),
},
Spec: corev1.PodSpec{
Volumes: []corev1.Volume{
{
Name: "pd1",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: "pvc-1",
},
},
},
{
Name: "pd0",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: "pvc-0",
},
},
},
},
},
Status: corev1.PodStatus{
Phase: corev1.PodPending,
},
},
},
pvcs: []*corev1.PersistentVolumeClaim{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pvc-1",
Namespace: metav1.NamespaceDefault,
},
},
},
expectFn: func(g *GomegaWithT, skipReason map[string]string, opc *orphanPodsCleaner, err error) {
g.Expect(err).NotTo(HaveOccurred())
g.Expect(len(skipReason)).To(Equal(0))
_, err = opc.podLister.Pods("default").Get("pod-1")
g.Expect(err).To(HaveOccurred())
g.Expect(strings.Contains(err.Error(), "not found")).To(BeTrue())
},
},
{
// in theory, this is is possible because we can't check the PVC
// and pod in an atomic operation.
Expand Down Expand Up @@ -471,7 +521,7 @@ func TestOrphanPodsCleanerClean(t *testing.T) {
g.Expect(len(skipReason)).To(Equal(3))
g.Expect(skipReason["pod-2"]).To(Equal(skipReasonOrphanPodsCleanerPVCNameIsEmpty))
g.Expect(skipReason["pod-3"]).To(Equal(skipReasonOrphanPodsCleanerPVCIsFound))
g.Expect(skipReason["pod-4"]).To(Equal(skipReasonOrphanPodsCleanerIsNotPDOrTiKV))
g.Expect(skipReason["pod-4"]).To(Equal(skipReasonOrphanPodsCleanerIsNotTarget))
g.Expect(err).NotTo(HaveOccurred())
_, err = opc.podLister.Pods("default").Get("pod-1")
g.Expect(err).To(HaveOccurred())
Expand Down
25 changes: 21 additions & 4 deletions pkg/manager/member/pd_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestPDScalerScaleOut(t *testing.T) {

scaler, _, pvcIndexer, pvcControl := newFakePDScaler()

pvc := newPVCForStatefulSet(oldSet, v1alpha1.PDMemberType)
pvc := newPVCForStatefulSet(oldSet, v1alpha1.PDMemberType, tc.Name)
pvc.Name = ordinalPVCName(v1alpha1.PDMemberType, oldSet.GetName(), *oldSet.Spec.Replicas)
if !test.annoIsNil {
pvc.Annotations = map[string]string{}
Expand Down Expand Up @@ -137,7 +137,7 @@ func TestPDScalerScaleOut(t *testing.T) {
name: "pvc annotation is not nil but doesn't contain defer deletion annotation",
update: normalPDMember,
pdUpgrading: false,
hasPVC: false,
hasPVC: true,
hasDeferAnn: false,
annoIsNil: false,
pvcDeleteErr: false,
Expand Down Expand Up @@ -259,7 +259,7 @@ func TestPDScalerScaleIn(t *testing.T) {
scaler, pdControl, pvcIndexer, pvcControl := newFakePDScaler()

if test.hasPVC {
pvc := newPVCForStatefulSet(oldSet, v1alpha1.PDMemberType)
pvc := newScaleInPVCForStatefulSet(oldSet, v1alpha1.PDMemberType, tc.Name)
pvcIndexer.Add(pvc)
}

Expand Down Expand Up @@ -407,11 +407,28 @@ func newStatefulSetForPDScale() *apps.StatefulSet {
return set
}

func newPVCForStatefulSet(set *apps.StatefulSet, memberType v1alpha1.MemberType) *corev1.PersistentVolumeClaim {
func newPVCForStatefulSet(set *apps.StatefulSet, memberType v1alpha1.MemberType, name string) *corev1.PersistentVolumeClaim {
podName := ordinalPodName(memberType, name, *set.Spec.Replicas)
l := label.New().Instance(name)
l[label.AnnPodNameKey] = podName
return &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: ordinalPVCName(memberType, set.GetName(), *set.Spec.Replicas),
Namespace: metav1.NamespaceDefault,
Labels: l,
},
}
}

func newScaleInPVCForStatefulSet(set *apps.StatefulSet, memberType v1alpha1.MemberType, name string) *corev1.PersistentVolumeClaim {
podName := ordinalPodName(memberType, name, *set.Spec.Replicas-1)
l := label.New().Instance(name)
l[label.AnnPodNameKey] = podName
return &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: ordinalPVCName(memberType, set.GetName(), *set.Spec.Replicas-1),
Namespace: metav1.NamespaceDefault,
Labels: l,
},
}
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/manager/member/pvc_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

const (
skipReasonPVCCleanerIsNotPDOrTiKV = "pvc cleaner: member type is not pd or tikv"
skipReasonPVCCleanerIsNotTarget = "pvc cleaner: member type is not pd, tikv or tiflash"
skipReasonPVCCleanerDeferDeletePVCNotHasLock = "pvc cleaner: defer delete PVC not has schedule lock"
skipReasonPVCCleanerPVCNotHasLock = "pvc cleaner: pvc not has schedule lock"
skipReasonPVCCleanerPodWaitingForScheduling = "pvc cleaner: waiting for pod scheduling"
Expand Down Expand Up @@ -102,8 +102,8 @@ func (rpc *realPVCCleaner) reclaimPV(tc *v1alpha1.TidbCluster) (map[string]strin
for _, pvc := range pvcs {
pvcName := pvc.GetName()
l := label.Label(pvc.Labels)
if !(l.IsPD() || l.IsTiKV()) {
skipReason[pvcName] = skipReasonPVCCleanerIsNotPDOrTiKV
if !(l.IsPD() || l.IsTiKV() || l.IsTiFlash()) {
skipReason[pvcName] = skipReasonPVCCleanerIsNotTarget
continue
}

Expand Down Expand Up @@ -154,7 +154,7 @@ func (rpc *realPVCCleaner) reclaimPV(tc *v1alpha1.TidbCluster) (map[string]strin
return skipReason, fmt.Errorf("cluster %s/%s get pvc %s pod %s from apiserver failed, err: %v", ns, tcName, pvcName, podName, err)
}

// Without pd or tikv pod reference this defer delete PVC, start to reclaim PV
// Without pod reference this defer delete PVC, start to reclaim PV
pvName := pvc.Spec.VolumeName
pv, err := rpc.pvLister.Get(pvName)
if err != nil {
Expand Down Expand Up @@ -209,8 +209,8 @@ func (rpc *realPVCCleaner) cleanScheduleLock(tc *v1alpha1.TidbCluster) (map[stri
for _, pvc := range pvcs {
pvcName := pvc.GetName()
l := label.Label(pvc.Labels)
if !(l.IsPD() || l.IsTiKV()) {
skipReason[pvcName] = skipReasonPVCCleanerIsNotPDOrTiKV
if !(l.IsPD() || l.IsTiKV() || l.IsTiFlash()) {
skipReason[pvcName] = skipReasonPVCCleanerIsNotTarget
continue
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/manager/member/pvc_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestPVCCleanerReclaimPV(t *testing.T) {
expectFn: func(g *GomegaWithT, skipReason map[string]string, _ *realPVCCleaner, err error) {
g.Expect(err).NotTo(HaveOccurred())
g.Expect(len(skipReason)).To(Equal(1))
g.Expect(skipReason["tidb-test-tidb-0"]).To(Equal(skipReasonPVCCleanerIsNotPDOrTiKV))
g.Expect(skipReason["tidb-test-tidb-0"]).To(Equal(skipReasonPVCCleanerIsNotTarget))
},
},
{
Expand Down Expand Up @@ -866,7 +866,7 @@ func TestPVCCleanerCleanScheduleLock(t *testing.T) {
expectFn: func(g *GomegaWithT, skipReason map[string]string, _ *realPVCCleaner, err error) {
g.Expect(err).NotTo(HaveOccurred())
g.Expect(len(skipReason)).To(Equal(1))
g.Expect(skipReason["tidb-test-tidb-0"]).To(Equal(skipReasonPVCCleanerIsNotPDOrTiKV))
g.Expect(skipReason["tidb-test-tidb-0"]).To(Equal(skipReasonPVCCleanerIsNotTarget))
},
},
{
Expand Down
Loading

0 comments on commit 4cb0761

Please sign in to comment.