Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revise interface for the native api in the controller package #2820

Merged
merged 2 commits into from
Jun 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 38 additions & 27 deletions pkg/controller/pod_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb-operator/pkg/pdapi"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
Expand All @@ -37,8 +38,8 @@ import (
type PodControlInterface interface {
// TODO change this to UpdatePod
UpdateMetaInfo(*v1alpha1.TidbCluster, *corev1.Pod) (*corev1.Pod, error)
DeletePod(*v1alpha1.TidbCluster, *corev1.Pod) error
UpdatePod(*v1alpha1.TidbCluster, *corev1.Pod) (*corev1.Pod, error)
DeletePod(runtime.Object, *corev1.Pod) error
UpdatePod(runtime.Object, *corev1.Pod) (*corev1.Pod, error)
}

type realPodControl struct {
Expand All @@ -63,9 +64,14 @@ func NewRealPodControl(
}
}

func (rpc *realPodControl) UpdatePod(tc *v1alpha1.TidbCluster, pod *corev1.Pod) (*corev1.Pod, error) {
ns := tc.GetNamespace()
tcName := tc.GetName()
func (rpc *realPodControl) UpdatePod(controller runtime.Object, pod *corev1.Pod) (*corev1.Pod, error) {
controllerMo, ok := controller.(metav1.Object)
if !ok {
return nil, fmt.Errorf("%T is not a metav1.Object, cannot call setControllerReference", controller)
}
kind := controller.GetObjectKind().GroupVersionKind().Kind
name := controllerMo.GetName()
namespace := controllerMo.GetNamespace()
podName := pod.GetName()

labels := pod.GetLabels()
Expand All @@ -75,20 +81,20 @@ func (rpc *realPodControl) UpdatePod(tc *v1alpha1.TidbCluster, pod *corev1.Pod)
// don't wait due to limited number of clients, but backoff after the default number of steps
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
var updateErr error
updatePod, updateErr = rpc.kubeCli.CoreV1().Pods(ns).Update(pod)
updatePod, updateErr = rpc.kubeCli.CoreV1().Pods(namespace).Update(pod)
if updateErr == nil {
klog.Infof("Pod: [%s/%s] updated successfully, TidbCluster: [%s/%s]", ns, podName, ns, tcName)
klog.Infof("Pod: [%s/%s] updated successfully, %s: [%s/%s]", namespace, podName, kind, namespace, name)
return nil
}
klog.Errorf("failed to update Pod: [%s/%s], error: %v", ns, podName, updateErr)
klog.Errorf("failed to update Pod: [%s/%s], error: %v", namespace, podName, updateErr)

if updated, err := rpc.podLister.Pods(ns).Get(podName); err == nil {
if updated, err := rpc.podLister.Pods(namespace).Get(podName); err == nil {
// make a copy so we don't mutate the shared cache
pod = updated.DeepCopy()
pod.Labels = labels
pod.Annotations = ann
} else {
utilruntime.HandleError(fmt.Errorf("error getting updated Pod %s/%s from lister: %v", ns, podName, err))
utilruntime.HandleError(fmt.Errorf("error getting updated Pod %s/%s from lister: %v", namespace, podName, err))
}

return updateErr
Expand Down Expand Up @@ -187,34 +193,39 @@ func (rpc *realPodControl) UpdateMetaInfo(tc *v1alpha1.TidbCluster, pod *corev1.
return updatePod, err
}

func (rpc *realPodControl) DeletePod(tc *v1alpha1.TidbCluster, pod *corev1.Pod) error {
ns := tc.GetNamespace()
tcName := tc.GetName()
func (rpc *realPodControl) DeletePod(controller runtime.Object, pod *corev1.Pod) error {
controllerMo, ok := controller.(metav1.Object)
if !ok {
return fmt.Errorf("%T is not a metav1.Object, cannot call setControllerReference", controller)
}
kind := controller.GetObjectKind().GroupVersionKind().Kind
name := controllerMo.GetName()
namespace := controllerMo.GetNamespace()

podName := pod.GetName()
preconditions := metav1.Preconditions{UID: &pod.UID, ResourceVersion: &pod.ResourceVersion}
deleteOptions := metav1.DeleteOptions{Preconditions: &preconditions}
err := rpc.kubeCli.CoreV1().Pods(ns).Delete(podName, &deleteOptions)
err := rpc.kubeCli.CoreV1().Pods(namespace).Delete(podName, &deleteOptions)
if err != nil {
klog.Errorf("failed to delete Pod: [%s/%s], TidbCluster: %s, %v", ns, podName, tcName, err)
klog.Errorf("failed to delete Pod: [%s/%s], %s: %s, %v", namespace, podName, kind, namespace, err)
} else {
klog.V(4).Infof("delete Pod: [%s/%s] successfully, TidbCluster: %s", ns, podName, tcName)
klog.V(4).Infof("delete Pod: [%s/%s] successfully, %s: %s", namespace, podName, kind, namespace)
}
rpc.recordPodEvent("delete", tc, podName, err)
rpc.recordPodEvent("delete", kind, name, controller, podName, err)
return err
}

func (rpc *realPodControl) recordPodEvent(verb string, tc *v1alpha1.TidbCluster, podName string, err error) {
tcName := tc.GetName()
func (rpc *realPodControl) recordPodEvent(verb, kind, name string, object runtime.Object, podName string, err error) {
if err == nil {
reason := fmt.Sprintf("Successful%s", strings.Title(verb))
msg := fmt.Sprintf("%s Pod %s in TidbCluster %s successful",
strings.ToLower(verb), podName, tcName)
rpc.recorder.Event(tc, corev1.EventTypeNormal, reason, msg)
msg := fmt.Sprintf("%s Pod %s in %s %s successful",
strings.ToLower(verb), podName, kind, name)
rpc.recorder.Event(object, corev1.EventTypeNormal, reason, msg)
} else {
reason := fmt.Sprintf("Failed%s", strings.Title(verb))
msg := fmt.Sprintf("%s Pod %s in TidbCluster %s failed error: %s",
strings.ToLower(verb), podName, tcName, err)
rpc.recorder.Event(tc, corev1.EventTypeWarning, reason, msg)
msg := fmt.Sprintf("%s Pod %s in %s %s failed error: %s",
strings.ToLower(verb), podName, kind, name, err)
rpc.recorder.Event(object, corev1.EventTypeWarning, reason, msg)
}
}

Expand Down Expand Up @@ -313,7 +324,7 @@ func (fpc *FakePodControl) UpdateMetaInfo(_ *v1alpha1.TidbCluster, pod *corev1.P
return pod, fpc.PodIndexer.Update(pod)
}

func (fpc *FakePodControl) DeletePod(_ *v1alpha1.TidbCluster, pod *corev1.Pod) error {
func (fpc *FakePodControl) DeletePod(_ runtime.Object, pod *corev1.Pod) error {
defer fpc.deletePodTracker.Inc()
if fpc.deletePodTracker.ErrorReady() {
defer fpc.deletePodTracker.Reset()
Expand All @@ -323,7 +334,7 @@ func (fpc *FakePodControl) DeletePod(_ *v1alpha1.TidbCluster, pod *corev1.Pod) e
return fpc.PodIndexer.Delete(pod)
}

func (fpc *FakePodControl) UpdatePod(_ *v1alpha1.TidbCluster, pod *corev1.Pod) (*corev1.Pod, error) {
func (fpc *FakePodControl) UpdatePod(_ runtime.Object, pod *corev1.Pod) (*corev1.Pod, error) {
defer fpc.updatePodTracker.Inc()
if fpc.updatePodTracker.ErrorReady() {
defer fpc.updatePodTracker.Reset()
Expand Down
96 changes: 57 additions & 39 deletions pkg/controller/pvc_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import (
"fmt"
"strings"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/label"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
Expand All @@ -34,9 +35,9 @@ import (

// PVCControlInterface manages PVCs used in TidbCluster
type PVCControlInterface interface {
UpdateMetaInfo(*v1alpha1.TidbCluster, *corev1.PersistentVolumeClaim, *corev1.Pod) (*corev1.PersistentVolumeClaim, error)
UpdatePVC(*v1alpha1.TidbCluster, *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error)
DeletePVC(*v1alpha1.TidbCluster, *corev1.PersistentVolumeClaim) error
UpdateMetaInfo(runtime.Object, *corev1.PersistentVolumeClaim, *corev1.Pod) (*corev1.PersistentVolumeClaim, error)
UpdatePVC(runtime.Object, *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error)
DeletePVC(runtime.Object, *corev1.PersistentVolumeClaim) error
GetPVC(name, namespace string) (*corev1.PersistentVolumeClaim, error)
}

Expand All @@ -62,53 +63,71 @@ func (rpc *realPVCControl) GetPVC(name, namespace string) (*corev1.PersistentVol
return rpc.pvcLister.PersistentVolumeClaims(namespace).Get(name)
}

func (rpc *realPVCControl) DeletePVC(tc *v1alpha1.TidbCluster, pvc *corev1.PersistentVolumeClaim) error {
ns := tc.GetNamespace()
tcName := tc.GetName()
func (rpc *realPVCControl) DeletePVC(controller runtime.Object, pvc *corev1.PersistentVolumeClaim) error {
controllerMo, ok := controller.(metav1.Object)
if !ok {
return fmt.Errorf("%T is not a metav1.Object, cannot call setControllerReference", controller)
}
kind := controller.GetObjectKind().GroupVersionKind().Kind
name := controllerMo.GetName()
namespace := controllerMo.GetNamespace()

pvcName := pvc.GetName()
err := rpc.kubeCli.CoreV1().PersistentVolumeClaims(tc.GetNamespace()).Delete(pvcName, nil)
err := rpc.kubeCli.CoreV1().PersistentVolumeClaims(namespace).Delete(pvcName, nil)
if err != nil {
klog.Errorf("failed to delete PVC: [%s/%s], TidbCluster: %s, %v", ns, pvcName, tcName, err)
klog.Errorf("failed to delete PVC: [%s/%s], %s: %s, %v", namespace, pvcName, kind, name, err)
}
klog.V(4).Infof("delete PVC: [%s/%s] successfully, TidbCluster: %s", ns, pvcName, tcName)
rpc.recordPVCEvent("delete", tc, pvcName, err)
klog.V(4).Infof("delete PVC: [%s/%s] successfully, %s: %s", namespace, pvcName, kind, name)
rpc.recordPVCEvent("delete", kind, name, controller, pvcName, err)
return err
}

func (rpc *realPVCControl) UpdatePVC(tc *v1alpha1.TidbCluster, pvc *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error) {
ns := tc.GetNamespace()
tcName := tc.GetName()
func (rpc *realPVCControl) UpdatePVC(controller runtime.Object, pvc *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error) {
controllerMo, ok := controller.(metav1.Object)
if !ok {
return nil, fmt.Errorf("%T is not a metav1.Object, cannot call setControllerReference", controller)
}
kind := controller.GetObjectKind().GroupVersionKind().Kind
name := controllerMo.GetName()
namespace := controllerMo.GetNamespace()

pvcName := pvc.GetName()

labels := pvc.GetLabels()
ann := pvc.GetAnnotations()
var updatePVC *corev1.PersistentVolumeClaim
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
var updateErr error
updatePVC, updateErr = rpc.kubeCli.CoreV1().PersistentVolumeClaims(ns).Update(pvc)
updatePVC, updateErr = rpc.kubeCli.CoreV1().PersistentVolumeClaims(namespace).Update(pvc)
if updateErr == nil {
klog.Infof("update PVC: [%s/%s] successfully, TidbCluster: %s", ns, pvcName, tcName)
klog.Infof("update PVC: [%s/%s] successfully, %s: %s", namespace, pvcName, kind, name)
return nil
}
klog.Errorf("failed to update PVC: [%s/%s], TidbCluster: %s, error: %v", ns, pvcName, tcName, updateErr)
klog.Errorf("failed to update PVC: [%s/%s], %s: %s, error: %v", namespace, pvcName, kind, name, updateErr)

if updated, err := rpc.pvcLister.PersistentVolumeClaims(ns).Get(pvcName); err == nil {
if updated, err := rpc.pvcLister.PersistentVolumeClaims(namespace).Get(pvcName); err == nil {
// make a copy so we don't mutate the shared cache
pvc = updated.DeepCopy()
pvc.Labels = labels
pvc.Annotations = ann
} else {
utilruntime.HandleError(fmt.Errorf("error getting updated PVC %s/%s from lister: %v", ns, pvcName, err))
utilruntime.HandleError(fmt.Errorf("error getting updated PVC %s/%s from lister: %v", namespace, pvcName, err))
}

return updateErr
})
return updatePVC, err
}

func (rpc *realPVCControl) UpdateMetaInfo(tc *v1alpha1.TidbCluster, pvc *corev1.PersistentVolumeClaim, pod *corev1.Pod) (*corev1.PersistentVolumeClaim, error) {
ns := tc.GetNamespace()
tcName := tc.GetName()
func (rpc *realPVCControl) UpdateMetaInfo(controller runtime.Object, pvc *corev1.PersistentVolumeClaim, pod *corev1.Pod) (*corev1.PersistentVolumeClaim, error) {
controllerMo, ok := controller.(metav1.Object)
if !ok {
return nil, fmt.Errorf("%T is not a metav1.Object, cannot call setControllerReference", controller)
}
kind := controller.GetObjectKind().GroupVersionKind().Kind
name := controllerMo.GetName()
namespace := controllerMo.GetNamespace()

pvcName := pvc.GetName()
podName := pod.GetName()

Expand All @@ -129,7 +148,7 @@ func (rpc *realPVCControl) UpdateMetaInfo(tc *v1alpha1.TidbCluster, pvc *corev1.
pvc.Labels[label.StoreIDLabelKey] == storeID &&
pvc.Labels[label.AnnPodNameKey] == podName &&
pvc.Annotations[label.AnnPodNameKey] == podName {
klog.V(4).Infof("pvc %s/%s already has labels and annotations synced, skipping, TidbCluster: %s", ns, pvcName, tcName)
klog.V(4).Infof("pvc %s/%s already has labels and annotations synced, skipping, %s: %s", namespace, pvcName, kind, name)
return pvc, nil
}

Expand All @@ -144,39 +163,38 @@ func (rpc *realPVCControl) UpdateMetaInfo(tc *v1alpha1.TidbCluster, pvc *corev1.
var updatePVC *corev1.PersistentVolumeClaim
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
var updateErr error
updatePVC, updateErr = rpc.kubeCli.CoreV1().PersistentVolumeClaims(ns).Update(pvc)
updatePVC, updateErr = rpc.kubeCli.CoreV1().PersistentVolumeClaims(namespace).Update(pvc)
if updateErr == nil {
klog.V(4).Infof("update PVC: [%s/%s] successfully, TidbCluster: %s", ns, pvcName, tcName)
klog.V(4).Infof("update PVC: [%s/%s] successfully, %s: %s", namespace, pvcName, kind, name)
return nil
}
klog.Errorf("failed to update PVC: [%s/%s], TidbCluster: %s, error: %v", ns, pvcName, tcName, updateErr)
klog.Errorf("failed to update PVC: [%s/%s], %s: %s, error: %v", namespace, pvcName, kind, name, updateErr)

if updated, err := rpc.pvcLister.PersistentVolumeClaims(ns).Get(pvcName); err == nil {
if updated, err := rpc.pvcLister.PersistentVolumeClaims(namespace).Get(pvcName); err == nil {
// make a copy so we don't mutate the shared cache
pvc = updated.DeepCopy()
pvc.Labels = labels
pvc.Annotations = ann
} else {
utilruntime.HandleError(fmt.Errorf("error getting updated PVC %s/%s from lister: %v", ns, pvcName, err))
utilruntime.HandleError(fmt.Errorf("error getting updated PVC %s/%s from lister: %v", namespace, pvcName, err))
}

return updateErr
})
return updatePVC, err
}

func (rpc *realPVCControl) recordPVCEvent(verb string, tc *v1alpha1.TidbCluster, pvcName string, err error) {
tcName := tc.GetName()
func (rpc *realPVCControl) recordPVCEvent(verb, kind, name string, object runtime.Object, pvcName string, err error) {
if err == nil {
reason := fmt.Sprintf("Successful%s", strings.Title(verb))
msg := fmt.Sprintf("%s PVC %s in TidbCluster %s successful",
strings.ToLower(verb), pvcName, tcName)
rpc.recorder.Event(tc, corev1.EventTypeNormal, reason, msg)
msg := fmt.Sprintf("%s PVC %s in %s %s successful",
strings.ToLower(verb), pvcName, kind, name)
rpc.recorder.Event(object, corev1.EventTypeNormal, reason, msg)
} else {
reason := fmt.Sprintf("Failed%s", strings.Title(verb))
msg := fmt.Sprintf("%s PVC %s in TidbCluster %s failed error: %s",
strings.ToLower(verb), pvcName, tcName, err)
rpc.recorder.Event(tc, corev1.EventTypeWarning, reason, msg)
msg := fmt.Sprintf("%s PVC %s in %s %s failed error: %s",
strings.ToLower(verb), pvcName, kind, name, err)
rpc.recorder.Event(object, corev1.EventTypeWarning, reason, msg)
}
}

Expand Down Expand Up @@ -209,7 +227,7 @@ func (fpc *FakePVCControl) SetDeletePVCError(err error, after int) {
}

// DeletePVC deletes the pvc
func (fpc *FakePVCControl) DeletePVC(_ *v1alpha1.TidbCluster, pvc *corev1.PersistentVolumeClaim) error {
func (fpc *FakePVCControl) DeletePVC(_ runtime.Object, pvc *corev1.PersistentVolumeClaim) error {
defer fpc.deletePVCTracker.Inc()
if fpc.deletePVCTracker.ErrorReady() {
defer fpc.deletePVCTracker.Reset()
Expand All @@ -220,7 +238,7 @@ func (fpc *FakePVCControl) DeletePVC(_ *v1alpha1.TidbCluster, pvc *corev1.Persis
}

// UpdatePVC updates the annotation, labels and spec of pvc
func (fpc *FakePVCControl) UpdatePVC(_ *v1alpha1.TidbCluster, pvc *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error) {
func (fpc *FakePVCControl) UpdatePVC(_ runtime.Object, pvc *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error) {
defer fpc.updatePVCTracker.Inc()
if fpc.updatePVCTracker.ErrorReady() {
defer fpc.updatePVCTracker.Reset()
Expand All @@ -231,7 +249,7 @@ func (fpc *FakePVCControl) UpdatePVC(_ *v1alpha1.TidbCluster, pvc *corev1.Persis
}

// UpdateMetaInfo updates the meta info of pvc
func (fpc *FakePVCControl) UpdateMetaInfo(_ *v1alpha1.TidbCluster, pvc *corev1.PersistentVolumeClaim, pod *corev1.Pod) (*corev1.PersistentVolumeClaim, error) {
func (fpc *FakePVCControl) UpdateMetaInfo(_ runtime.Object, pvc *corev1.PersistentVolumeClaim, pod *corev1.Pod) (*corev1.PersistentVolumeClaim, error) {
defer fpc.updatePVCTracker.Inc()
if fpc.updatePVCTracker.ErrorReady() {
defer fpc.updatePVCTracker.Reset()
Expand Down
Loading