Skip to content

Commit

Permalink
Revise interface for the native api in the controller package (pingca…
Browse files Browse the repository at this point in the history
…p#2820) (pingcap#2824)

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>

Co-authored-by: Song Gao <disxiaofei@163.com>
  • Loading branch information
ti-srebot and Yisaer authored Jun 28, 2020
1 parent 553d0f8 commit 1b60ffe
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 127 deletions.
65 changes: 38 additions & 27 deletions pkg/controller/pod_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb-operator/pkg/pdapi"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
Expand All @@ -37,8 +38,8 @@ import (
type PodControlInterface interface {
// TODO change this to UpdatePod
UpdateMetaInfo(*v1alpha1.TidbCluster, *corev1.Pod) (*corev1.Pod, error)
DeletePod(*v1alpha1.TidbCluster, *corev1.Pod) error
UpdatePod(*v1alpha1.TidbCluster, *corev1.Pod) (*corev1.Pod, error)
DeletePod(runtime.Object, *corev1.Pod) error
UpdatePod(runtime.Object, *corev1.Pod) (*corev1.Pod, error)
}

type realPodControl struct {
Expand All @@ -63,9 +64,14 @@ func NewRealPodControl(
}
}

func (rpc *realPodControl) UpdatePod(tc *v1alpha1.TidbCluster, pod *corev1.Pod) (*corev1.Pod, error) {
ns := tc.GetNamespace()
tcName := tc.GetName()
func (rpc *realPodControl) UpdatePod(controller runtime.Object, pod *corev1.Pod) (*corev1.Pod, error) {
controllerMo, ok := controller.(metav1.Object)
if !ok {
return nil, fmt.Errorf("%T is not a metav1.Object, cannot call setControllerReference", controller)
}
kind := controller.GetObjectKind().GroupVersionKind().Kind
name := controllerMo.GetName()
namespace := controllerMo.GetNamespace()
podName := pod.GetName()

labels := pod.GetLabels()
Expand All @@ -75,20 +81,20 @@ func (rpc *realPodControl) UpdatePod(tc *v1alpha1.TidbCluster, pod *corev1.Pod)
// don't wait due to limited number of clients, but backoff after the default number of steps
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
var updateErr error
updatePod, updateErr = rpc.kubeCli.CoreV1().Pods(ns).Update(pod)
updatePod, updateErr = rpc.kubeCli.CoreV1().Pods(namespace).Update(pod)
if updateErr == nil {
klog.Infof("Pod: [%s/%s] updated successfully, TidbCluster: [%s/%s]", ns, podName, ns, tcName)
klog.Infof("Pod: [%s/%s] updated successfully, %s: [%s/%s]", namespace, podName, kind, namespace, name)
return nil
}
klog.Errorf("failed to update Pod: [%s/%s], error: %v", ns, podName, updateErr)
klog.Errorf("failed to update Pod: [%s/%s], error: %v", namespace, podName, updateErr)

if updated, err := rpc.podLister.Pods(ns).Get(podName); err == nil {
if updated, err := rpc.podLister.Pods(namespace).Get(podName); err == nil {
// make a copy so we don't mutate the shared cache
pod = updated.DeepCopy()
pod.Labels = labels
pod.Annotations = ann
} else {
utilruntime.HandleError(fmt.Errorf("error getting updated Pod %s/%s from lister: %v", ns, podName, err))
utilruntime.HandleError(fmt.Errorf("error getting updated Pod %s/%s from lister: %v", namespace, podName, err))
}

return updateErr
Expand Down Expand Up @@ -187,34 +193,39 @@ func (rpc *realPodControl) UpdateMetaInfo(tc *v1alpha1.TidbCluster, pod *corev1.
return updatePod, err
}

func (rpc *realPodControl) DeletePod(tc *v1alpha1.TidbCluster, pod *corev1.Pod) error {
ns := tc.GetNamespace()
tcName := tc.GetName()
func (rpc *realPodControl) DeletePod(controller runtime.Object, pod *corev1.Pod) error {
controllerMo, ok := controller.(metav1.Object)
if !ok {
return fmt.Errorf("%T is not a metav1.Object, cannot call setControllerReference", controller)
}
kind := controller.GetObjectKind().GroupVersionKind().Kind
name := controllerMo.GetName()
namespace := controllerMo.GetNamespace()

podName := pod.GetName()
preconditions := metav1.Preconditions{UID: &pod.UID, ResourceVersion: &pod.ResourceVersion}
deleteOptions := metav1.DeleteOptions{Preconditions: &preconditions}
err := rpc.kubeCli.CoreV1().Pods(ns).Delete(podName, &deleteOptions)
err := rpc.kubeCli.CoreV1().Pods(namespace).Delete(podName, &deleteOptions)
if err != nil {
klog.Errorf("failed to delete Pod: [%s/%s], TidbCluster: %s, %v", ns, podName, tcName, err)
klog.Errorf("failed to delete Pod: [%s/%s], %s: %s, %v", namespace, podName, kind, namespace, err)
} else {
klog.V(4).Infof("delete Pod: [%s/%s] successfully, TidbCluster: %s", ns, podName, tcName)
klog.V(4).Infof("delete Pod: [%s/%s] successfully, %s: %s", namespace, podName, kind, namespace)
}
rpc.recordPodEvent("delete", tc, podName, err)
rpc.recordPodEvent("delete", kind, name, controller, podName, err)
return err
}

func (rpc *realPodControl) recordPodEvent(verb string, tc *v1alpha1.TidbCluster, podName string, err error) {
tcName := tc.GetName()
func (rpc *realPodControl) recordPodEvent(verb, kind, name string, object runtime.Object, podName string, err error) {
if err == nil {
reason := fmt.Sprintf("Successful%s", strings.Title(verb))
msg := fmt.Sprintf("%s Pod %s in TidbCluster %s successful",
strings.ToLower(verb), podName, tcName)
rpc.recorder.Event(tc, corev1.EventTypeNormal, reason, msg)
msg := fmt.Sprintf("%s Pod %s in %s %s successful",
strings.ToLower(verb), podName, kind, name)
rpc.recorder.Event(object, corev1.EventTypeNormal, reason, msg)
} else {
reason := fmt.Sprintf("Failed%s", strings.Title(verb))
msg := fmt.Sprintf("%s Pod %s in TidbCluster %s failed error: %s",
strings.ToLower(verb), podName, tcName, err)
rpc.recorder.Event(tc, corev1.EventTypeWarning, reason, msg)
msg := fmt.Sprintf("%s Pod %s in %s %s failed error: %s",
strings.ToLower(verb), podName, kind, name, err)
rpc.recorder.Event(object, corev1.EventTypeWarning, reason, msg)
}
}

Expand Down Expand Up @@ -313,7 +324,7 @@ func (fpc *FakePodControl) UpdateMetaInfo(_ *v1alpha1.TidbCluster, pod *corev1.P
return pod, fpc.PodIndexer.Update(pod)
}

func (fpc *FakePodControl) DeletePod(_ *v1alpha1.TidbCluster, pod *corev1.Pod) error {
func (fpc *FakePodControl) DeletePod(_ runtime.Object, pod *corev1.Pod) error {
defer fpc.deletePodTracker.Inc()
if fpc.deletePodTracker.ErrorReady() {
defer fpc.deletePodTracker.Reset()
Expand All @@ -323,7 +334,7 @@ func (fpc *FakePodControl) DeletePod(_ *v1alpha1.TidbCluster, pod *corev1.Pod) e
return fpc.PodIndexer.Delete(pod)
}

func (fpc *FakePodControl) UpdatePod(_ *v1alpha1.TidbCluster, pod *corev1.Pod) (*corev1.Pod, error) {
func (fpc *FakePodControl) UpdatePod(_ runtime.Object, pod *corev1.Pod) (*corev1.Pod, error) {
defer fpc.updatePodTracker.Inc()
if fpc.updatePodTracker.ErrorReady() {
defer fpc.updatePodTracker.Reset()
Expand Down
96 changes: 57 additions & 39 deletions pkg/controller/pvc_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import (
"fmt"
"strings"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/label"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
Expand All @@ -34,9 +35,9 @@ import (

// PVCControlInterface manages PVCs used in TidbCluster
type PVCControlInterface interface {
UpdateMetaInfo(*v1alpha1.TidbCluster, *corev1.PersistentVolumeClaim, *corev1.Pod) (*corev1.PersistentVolumeClaim, error)
UpdatePVC(*v1alpha1.TidbCluster, *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error)
DeletePVC(*v1alpha1.TidbCluster, *corev1.PersistentVolumeClaim) error
UpdateMetaInfo(runtime.Object, *corev1.PersistentVolumeClaim, *corev1.Pod) (*corev1.PersistentVolumeClaim, error)
UpdatePVC(runtime.Object, *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error)
DeletePVC(runtime.Object, *corev1.PersistentVolumeClaim) error
GetPVC(name, namespace string) (*corev1.PersistentVolumeClaim, error)
}

Expand All @@ -62,53 +63,71 @@ func (rpc *realPVCControl) GetPVC(name, namespace string) (*corev1.PersistentVol
return rpc.pvcLister.PersistentVolumeClaims(namespace).Get(name)
}

func (rpc *realPVCControl) DeletePVC(tc *v1alpha1.TidbCluster, pvc *corev1.PersistentVolumeClaim) error {
ns := tc.GetNamespace()
tcName := tc.GetName()
func (rpc *realPVCControl) DeletePVC(controller runtime.Object, pvc *corev1.PersistentVolumeClaim) error {
controllerMo, ok := controller.(metav1.Object)
if !ok {
return fmt.Errorf("%T is not a metav1.Object, cannot call setControllerReference", controller)
}
kind := controller.GetObjectKind().GroupVersionKind().Kind
name := controllerMo.GetName()
namespace := controllerMo.GetNamespace()

pvcName := pvc.GetName()
err := rpc.kubeCli.CoreV1().PersistentVolumeClaims(tc.GetNamespace()).Delete(pvcName, nil)
err := rpc.kubeCli.CoreV1().PersistentVolumeClaims(namespace).Delete(pvcName, nil)
if err != nil {
klog.Errorf("failed to delete PVC: [%s/%s], TidbCluster: %s, %v", ns, pvcName, tcName, err)
klog.Errorf("failed to delete PVC: [%s/%s], %s: %s, %v", namespace, pvcName, kind, name, err)
}
klog.V(4).Infof("delete PVC: [%s/%s] successfully, TidbCluster: %s", ns, pvcName, tcName)
rpc.recordPVCEvent("delete", tc, pvcName, err)
klog.V(4).Infof("delete PVC: [%s/%s] successfully, %s: %s", namespace, pvcName, kind, name)
rpc.recordPVCEvent("delete", kind, name, controller, pvcName, err)
return err
}

func (rpc *realPVCControl) UpdatePVC(tc *v1alpha1.TidbCluster, pvc *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error) {
ns := tc.GetNamespace()
tcName := tc.GetName()
func (rpc *realPVCControl) UpdatePVC(controller runtime.Object, pvc *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error) {
controllerMo, ok := controller.(metav1.Object)
if !ok {
return nil, fmt.Errorf("%T is not a metav1.Object, cannot call setControllerReference", controller)
}
kind := controller.GetObjectKind().GroupVersionKind().Kind
name := controllerMo.GetName()
namespace := controllerMo.GetNamespace()

pvcName := pvc.GetName()

labels := pvc.GetLabels()
ann := pvc.GetAnnotations()
var updatePVC *corev1.PersistentVolumeClaim
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
var updateErr error
updatePVC, updateErr = rpc.kubeCli.CoreV1().PersistentVolumeClaims(ns).Update(pvc)
updatePVC, updateErr = rpc.kubeCli.CoreV1().PersistentVolumeClaims(namespace).Update(pvc)
if updateErr == nil {
klog.Infof("update PVC: [%s/%s] successfully, TidbCluster: %s", ns, pvcName, tcName)
klog.Infof("update PVC: [%s/%s] successfully, %s: %s", namespace, pvcName, kind, name)
return nil
}
klog.Errorf("failed to update PVC: [%s/%s], TidbCluster: %s, error: %v", ns, pvcName, tcName, updateErr)
klog.Errorf("failed to update PVC: [%s/%s], %s: %s, error: %v", namespace, pvcName, kind, name, updateErr)

if updated, err := rpc.pvcLister.PersistentVolumeClaims(ns).Get(pvcName); err == nil {
if updated, err := rpc.pvcLister.PersistentVolumeClaims(namespace).Get(pvcName); err == nil {
// make a copy so we don't mutate the shared cache
pvc = updated.DeepCopy()
pvc.Labels = labels
pvc.Annotations = ann
} else {
utilruntime.HandleError(fmt.Errorf("error getting updated PVC %s/%s from lister: %v", ns, pvcName, err))
utilruntime.HandleError(fmt.Errorf("error getting updated PVC %s/%s from lister: %v", namespace, pvcName, err))
}

return updateErr
})
return updatePVC, err
}

func (rpc *realPVCControl) UpdateMetaInfo(tc *v1alpha1.TidbCluster, pvc *corev1.PersistentVolumeClaim, pod *corev1.Pod) (*corev1.PersistentVolumeClaim, error) {
ns := tc.GetNamespace()
tcName := tc.GetName()
func (rpc *realPVCControl) UpdateMetaInfo(controller runtime.Object, pvc *corev1.PersistentVolumeClaim, pod *corev1.Pod) (*corev1.PersistentVolumeClaim, error) {
controllerMo, ok := controller.(metav1.Object)
if !ok {
return nil, fmt.Errorf("%T is not a metav1.Object, cannot call setControllerReference", controller)
}
kind := controller.GetObjectKind().GroupVersionKind().Kind
name := controllerMo.GetName()
namespace := controllerMo.GetNamespace()

pvcName := pvc.GetName()
podName := pod.GetName()

Expand All @@ -129,7 +148,7 @@ func (rpc *realPVCControl) UpdateMetaInfo(tc *v1alpha1.TidbCluster, pvc *corev1.
pvc.Labels[label.StoreIDLabelKey] == storeID &&
pvc.Labels[label.AnnPodNameKey] == podName &&
pvc.Annotations[label.AnnPodNameKey] == podName {
klog.V(4).Infof("pvc %s/%s already has labels and annotations synced, skipping, TidbCluster: %s", ns, pvcName, tcName)
klog.V(4).Infof("pvc %s/%s already has labels and annotations synced, skipping, %s: %s", namespace, pvcName, kind, name)
return pvc, nil
}

Expand All @@ -144,39 +163,38 @@ func (rpc *realPVCControl) UpdateMetaInfo(tc *v1alpha1.TidbCluster, pvc *corev1.
var updatePVC *corev1.PersistentVolumeClaim
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
var updateErr error
updatePVC, updateErr = rpc.kubeCli.CoreV1().PersistentVolumeClaims(ns).Update(pvc)
updatePVC, updateErr = rpc.kubeCli.CoreV1().PersistentVolumeClaims(namespace).Update(pvc)
if updateErr == nil {
klog.V(4).Infof("update PVC: [%s/%s] successfully, TidbCluster: %s", ns, pvcName, tcName)
klog.V(4).Infof("update PVC: [%s/%s] successfully, %s: %s", namespace, pvcName, kind, name)
return nil
}
klog.Errorf("failed to update PVC: [%s/%s], TidbCluster: %s, error: %v", ns, pvcName, tcName, updateErr)
klog.Errorf("failed to update PVC: [%s/%s], %s: %s, error: %v", namespace, pvcName, kind, name, updateErr)

if updated, err := rpc.pvcLister.PersistentVolumeClaims(ns).Get(pvcName); err == nil {
if updated, err := rpc.pvcLister.PersistentVolumeClaims(namespace).Get(pvcName); err == nil {
// make a copy so we don't mutate the shared cache
pvc = updated.DeepCopy()
pvc.Labels = labels
pvc.Annotations = ann
} else {
utilruntime.HandleError(fmt.Errorf("error getting updated PVC %s/%s from lister: %v", ns, pvcName, err))
utilruntime.HandleError(fmt.Errorf("error getting updated PVC %s/%s from lister: %v", namespace, pvcName, err))
}

return updateErr
})
return updatePVC, err
}

func (rpc *realPVCControl) recordPVCEvent(verb string, tc *v1alpha1.TidbCluster, pvcName string, err error) {
tcName := tc.GetName()
func (rpc *realPVCControl) recordPVCEvent(verb, kind, name string, object runtime.Object, pvcName string, err error) {
if err == nil {
reason := fmt.Sprintf("Successful%s", strings.Title(verb))
msg := fmt.Sprintf("%s PVC %s in TidbCluster %s successful",
strings.ToLower(verb), pvcName, tcName)
rpc.recorder.Event(tc, corev1.EventTypeNormal, reason, msg)
msg := fmt.Sprintf("%s PVC %s in %s %s successful",
strings.ToLower(verb), pvcName, kind, name)
rpc.recorder.Event(object, corev1.EventTypeNormal, reason, msg)
} else {
reason := fmt.Sprintf("Failed%s", strings.Title(verb))
msg := fmt.Sprintf("%s PVC %s in TidbCluster %s failed error: %s",
strings.ToLower(verb), pvcName, tcName, err)
rpc.recorder.Event(tc, corev1.EventTypeWarning, reason, msg)
msg := fmt.Sprintf("%s PVC %s in %s %s failed error: %s",
strings.ToLower(verb), pvcName, kind, name, err)
rpc.recorder.Event(object, corev1.EventTypeWarning, reason, msg)
}
}

Expand Down Expand Up @@ -209,7 +227,7 @@ func (fpc *FakePVCControl) SetDeletePVCError(err error, after int) {
}

// DeletePVC deletes the pvc
func (fpc *FakePVCControl) DeletePVC(_ *v1alpha1.TidbCluster, pvc *corev1.PersistentVolumeClaim) error {
func (fpc *FakePVCControl) DeletePVC(_ runtime.Object, pvc *corev1.PersistentVolumeClaim) error {
defer fpc.deletePVCTracker.Inc()
if fpc.deletePVCTracker.ErrorReady() {
defer fpc.deletePVCTracker.Reset()
Expand All @@ -220,7 +238,7 @@ func (fpc *FakePVCControl) DeletePVC(_ *v1alpha1.TidbCluster, pvc *corev1.Persis
}

// UpdatePVC updates the annotation, labels and spec of pvc
func (fpc *FakePVCControl) UpdatePVC(_ *v1alpha1.TidbCluster, pvc *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error) {
func (fpc *FakePVCControl) UpdatePVC(_ runtime.Object, pvc *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error) {
defer fpc.updatePVCTracker.Inc()
if fpc.updatePVCTracker.ErrorReady() {
defer fpc.updatePVCTracker.Reset()
Expand All @@ -231,7 +249,7 @@ func (fpc *FakePVCControl) UpdatePVC(_ *v1alpha1.TidbCluster, pvc *corev1.Persis
}

// UpdateMetaInfo updates the meta info of pvc
func (fpc *FakePVCControl) UpdateMetaInfo(_ *v1alpha1.TidbCluster, pvc *corev1.PersistentVolumeClaim, pod *corev1.Pod) (*corev1.PersistentVolumeClaim, error) {
func (fpc *FakePVCControl) UpdateMetaInfo(_ runtime.Object, pvc *corev1.PersistentVolumeClaim, pod *corev1.Pod) (*corev1.PersistentVolumeClaim, error) {
defer fpc.updatePVCTracker.Inc()
if fpc.updatePVCTracker.ErrorReady() {
defer fpc.updatePVCTracker.Reset()
Expand Down
Loading

0 comments on commit 1b60ffe

Please sign in to comment.