diff --git a/pkg/controller/pod_control.go b/pkg/controller/pod_control.go index c28e77df05..b101eea501 100644 --- a/pkg/controller/pod_control.go +++ b/pkg/controller/pod_control.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb-operator/pkg/pdapi" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" @@ -37,8 +38,8 @@ import ( type PodControlInterface interface { // TODO change this to UpdatePod UpdateMetaInfo(*v1alpha1.TidbCluster, *corev1.Pod) (*corev1.Pod, error) - DeletePod(*v1alpha1.TidbCluster, *corev1.Pod) error - UpdatePod(*v1alpha1.TidbCluster, *corev1.Pod) (*corev1.Pod, error) + DeletePod(runtime.Object, *corev1.Pod) error + UpdatePod(runtime.Object, *corev1.Pod) (*corev1.Pod, error) } type realPodControl struct { @@ -63,9 +64,14 @@ func NewRealPodControl( } } -func (rpc *realPodControl) UpdatePod(tc *v1alpha1.TidbCluster, pod *corev1.Pod) (*corev1.Pod, error) { - ns := tc.GetNamespace() - tcName := tc.GetName() +func (rpc *realPodControl) UpdatePod(controller runtime.Object, pod *corev1.Pod) (*corev1.Pod, error) { + controllerMo, ok := controller.(metav1.Object) + if !ok { + return nil, fmt.Errorf("%T is not a metav1.Object, cannot call setControllerReference", controller) + } + kind := controller.GetObjectKind().GroupVersionKind().Kind + name := controllerMo.GetName() + namespace := controllerMo.GetNamespace() podName := pod.GetName() labels := pod.GetLabels() @@ -75,20 +81,20 @@ func (rpc *realPodControl) UpdatePod(tc *v1alpha1.TidbCluster, pod *corev1.Pod) // don't wait due to limited number of clients, but backoff after the default number of steps err := retry.RetryOnConflict(retry.DefaultRetry, func() error { var updateErr error - updatePod, updateErr = rpc.kubeCli.CoreV1().Pods(ns).Update(pod) + updatePod, updateErr = rpc.kubeCli.CoreV1().Pods(namespace).Update(pod) if updateErr == nil { - klog.Infof("Pod: [%s/%s] updated successfully, TidbCluster: [%s/%s]", ns, podName, ns, tcName) + klog.Infof("Pod: [%s/%s] updated successfully, %s: [%s/%s]", namespace, podName, kind, namespace, name) return nil } - klog.Errorf("failed to update Pod: [%s/%s], error: %v", ns, podName, updateErr) + klog.Errorf("failed to update Pod: [%s/%s], error: %v", namespace, podName, updateErr) - if updated, err := rpc.podLister.Pods(ns).Get(podName); err == nil { + if updated, err := rpc.podLister.Pods(namespace).Get(podName); err == nil { // make a copy so we don't mutate the shared cache pod = updated.DeepCopy() pod.Labels = labels pod.Annotations = ann } else { - utilruntime.HandleError(fmt.Errorf("error getting updated Pod %s/%s from lister: %v", ns, podName, err)) + utilruntime.HandleError(fmt.Errorf("error getting updated Pod %s/%s from lister: %v", namespace, podName, err)) } return updateErr @@ -187,34 +193,39 @@ func (rpc *realPodControl) UpdateMetaInfo(tc *v1alpha1.TidbCluster, pod *corev1. return updatePod, err } -func (rpc *realPodControl) DeletePod(tc *v1alpha1.TidbCluster, pod *corev1.Pod) error { - ns := tc.GetNamespace() - tcName := tc.GetName() +func (rpc *realPodControl) DeletePod(controller runtime.Object, pod *corev1.Pod) error { + controllerMo, ok := controller.(metav1.Object) + if !ok { + return fmt.Errorf("%T is not a metav1.Object, cannot call setControllerReference", controller) + } + kind := controller.GetObjectKind().GroupVersionKind().Kind + name := controllerMo.GetName() + namespace := controllerMo.GetNamespace() + podName := pod.GetName() preconditions := metav1.Preconditions{UID: &pod.UID, ResourceVersion: &pod.ResourceVersion} deleteOptions := metav1.DeleteOptions{Preconditions: &preconditions} - err := rpc.kubeCli.CoreV1().Pods(ns).Delete(podName, &deleteOptions) + err := rpc.kubeCli.CoreV1().Pods(namespace).Delete(podName, &deleteOptions) if err != nil { - klog.Errorf("failed to delete Pod: [%s/%s], TidbCluster: %s, %v", ns, podName, tcName, err) + klog.Errorf("failed to delete Pod: [%s/%s], %s: %s, %v", namespace, podName, kind, namespace, err) } else { - klog.V(4).Infof("delete Pod: [%s/%s] successfully, TidbCluster: %s", ns, podName, tcName) + klog.V(4).Infof("delete Pod: [%s/%s] successfully, %s: %s", namespace, podName, kind, namespace) } - rpc.recordPodEvent("delete", tc, podName, err) + rpc.recordPodEvent("delete", kind, name, controller, podName, err) return err } -func (rpc *realPodControl) recordPodEvent(verb string, tc *v1alpha1.TidbCluster, podName string, err error) { - tcName := tc.GetName() +func (rpc *realPodControl) recordPodEvent(verb, kind, name string, object runtime.Object, podName string, err error) { if err == nil { reason := fmt.Sprintf("Successful%s", strings.Title(verb)) - msg := fmt.Sprintf("%s Pod %s in TidbCluster %s successful", - strings.ToLower(verb), podName, tcName) - rpc.recorder.Event(tc, corev1.EventTypeNormal, reason, msg) + msg := fmt.Sprintf("%s Pod %s in %s %s successful", + strings.ToLower(verb), podName, kind, name) + rpc.recorder.Event(object, corev1.EventTypeNormal, reason, msg) } else { reason := fmt.Sprintf("Failed%s", strings.Title(verb)) - msg := fmt.Sprintf("%s Pod %s in TidbCluster %s failed error: %s", - strings.ToLower(verb), podName, tcName, err) - rpc.recorder.Event(tc, corev1.EventTypeWarning, reason, msg) + msg := fmt.Sprintf("%s Pod %s in %s %s failed error: %s", + strings.ToLower(verb), podName, kind, name, err) + rpc.recorder.Event(object, corev1.EventTypeWarning, reason, msg) } } @@ -313,7 +324,7 @@ func (fpc *FakePodControl) UpdateMetaInfo(_ *v1alpha1.TidbCluster, pod *corev1.P return pod, fpc.PodIndexer.Update(pod) } -func (fpc *FakePodControl) DeletePod(_ *v1alpha1.TidbCluster, pod *corev1.Pod) error { +func (fpc *FakePodControl) DeletePod(_ runtime.Object, pod *corev1.Pod) error { defer fpc.deletePodTracker.Inc() if fpc.deletePodTracker.ErrorReady() { defer fpc.deletePodTracker.Reset() @@ -323,7 +334,7 @@ func (fpc *FakePodControl) DeletePod(_ *v1alpha1.TidbCluster, pod *corev1.Pod) e return fpc.PodIndexer.Delete(pod) } -func (fpc *FakePodControl) UpdatePod(_ *v1alpha1.TidbCluster, pod *corev1.Pod) (*corev1.Pod, error) { +func (fpc *FakePodControl) UpdatePod(_ runtime.Object, pod *corev1.Pod) (*corev1.Pod, error) { defer fpc.updatePodTracker.Inc() if fpc.updatePodTracker.ErrorReady() { defer fpc.updatePodTracker.Reset() diff --git a/pkg/controller/pvc_control.go b/pkg/controller/pvc_control.go index 899eaa3c05..e85701edcb 100644 --- a/pkg/controller/pvc_control.go +++ b/pkg/controller/pvc_control.go @@ -17,9 +17,10 @@ import ( "fmt" "strings" - "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" "github.com/pingcap/tidb-operator/pkg/label" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" @@ -34,9 +35,9 @@ import ( // PVCControlInterface manages PVCs used in TidbCluster type PVCControlInterface interface { - UpdateMetaInfo(*v1alpha1.TidbCluster, *corev1.PersistentVolumeClaim, *corev1.Pod) (*corev1.PersistentVolumeClaim, error) - UpdatePVC(*v1alpha1.TidbCluster, *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error) - DeletePVC(*v1alpha1.TidbCluster, *corev1.PersistentVolumeClaim) error + UpdateMetaInfo(runtime.Object, *corev1.PersistentVolumeClaim, *corev1.Pod) (*corev1.PersistentVolumeClaim, error) + UpdatePVC(runtime.Object, *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error) + DeletePVC(runtime.Object, *corev1.PersistentVolumeClaim) error GetPVC(name, namespace string) (*corev1.PersistentVolumeClaim, error) } @@ -62,22 +63,34 @@ func (rpc *realPVCControl) GetPVC(name, namespace string) (*corev1.PersistentVol return rpc.pvcLister.PersistentVolumeClaims(namespace).Get(name) } -func (rpc *realPVCControl) DeletePVC(tc *v1alpha1.TidbCluster, pvc *corev1.PersistentVolumeClaim) error { - ns := tc.GetNamespace() - tcName := tc.GetName() +func (rpc *realPVCControl) DeletePVC(controller runtime.Object, pvc *corev1.PersistentVolumeClaim) error { + controllerMo, ok := controller.(metav1.Object) + if !ok { + return fmt.Errorf("%T is not a metav1.Object, cannot call setControllerReference", controller) + } + kind := controller.GetObjectKind().GroupVersionKind().Kind + name := controllerMo.GetName() + namespace := controllerMo.GetNamespace() + pvcName := pvc.GetName() - err := rpc.kubeCli.CoreV1().PersistentVolumeClaims(tc.GetNamespace()).Delete(pvcName, nil) + err := rpc.kubeCli.CoreV1().PersistentVolumeClaims(namespace).Delete(pvcName, nil) if err != nil { - klog.Errorf("failed to delete PVC: [%s/%s], TidbCluster: %s, %v", ns, pvcName, tcName, err) + klog.Errorf("failed to delete PVC: [%s/%s], %s: %s, %v", namespace, pvcName, kind, name, err) } - klog.V(4).Infof("delete PVC: [%s/%s] successfully, TidbCluster: %s", ns, pvcName, tcName) - rpc.recordPVCEvent("delete", tc, pvcName, err) + klog.V(4).Infof("delete PVC: [%s/%s] successfully, %s: %s", namespace, pvcName, kind, name) + rpc.recordPVCEvent("delete", kind, name, controller, pvcName, err) return err } -func (rpc *realPVCControl) UpdatePVC(tc *v1alpha1.TidbCluster, pvc *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error) { - ns := tc.GetNamespace() - tcName := tc.GetName() +func (rpc *realPVCControl) UpdatePVC(controller runtime.Object, pvc *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error) { + controllerMo, ok := controller.(metav1.Object) + if !ok { + return nil, fmt.Errorf("%T is not a metav1.Object, cannot call setControllerReference", controller) + } + kind := controller.GetObjectKind().GroupVersionKind().Kind + name := controllerMo.GetName() + namespace := controllerMo.GetNamespace() + pvcName := pvc.GetName() labels := pvc.GetLabels() @@ -85,20 +98,20 @@ func (rpc *realPVCControl) UpdatePVC(tc *v1alpha1.TidbCluster, pvc *corev1.Persi var updatePVC *corev1.PersistentVolumeClaim err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { var updateErr error - updatePVC, updateErr = rpc.kubeCli.CoreV1().PersistentVolumeClaims(ns).Update(pvc) + updatePVC, updateErr = rpc.kubeCli.CoreV1().PersistentVolumeClaims(namespace).Update(pvc) if updateErr == nil { - klog.Infof("update PVC: [%s/%s] successfully, TidbCluster: %s", ns, pvcName, tcName) + klog.Infof("update PVC: [%s/%s] successfully, %s: %s", namespace, pvcName, kind, name) return nil } - klog.Errorf("failed to update PVC: [%s/%s], TidbCluster: %s, error: %v", ns, pvcName, tcName, updateErr) + klog.Errorf("failed to update PVC: [%s/%s], %s: %s, error: %v", namespace, pvcName, kind, name, updateErr) - if updated, err := rpc.pvcLister.PersistentVolumeClaims(ns).Get(pvcName); err == nil { + if updated, err := rpc.pvcLister.PersistentVolumeClaims(namespace).Get(pvcName); err == nil { // make a copy so we don't mutate the shared cache pvc = updated.DeepCopy() pvc.Labels = labels pvc.Annotations = ann } else { - utilruntime.HandleError(fmt.Errorf("error getting updated PVC %s/%s from lister: %v", ns, pvcName, err)) + utilruntime.HandleError(fmt.Errorf("error getting updated PVC %s/%s from lister: %v", namespace, pvcName, err)) } return updateErr @@ -106,9 +119,15 @@ func (rpc *realPVCControl) UpdatePVC(tc *v1alpha1.TidbCluster, pvc *corev1.Persi return updatePVC, err } -func (rpc *realPVCControl) UpdateMetaInfo(tc *v1alpha1.TidbCluster, pvc *corev1.PersistentVolumeClaim, pod *corev1.Pod) (*corev1.PersistentVolumeClaim, error) { - ns := tc.GetNamespace() - tcName := tc.GetName() +func (rpc *realPVCControl) UpdateMetaInfo(controller runtime.Object, pvc *corev1.PersistentVolumeClaim, pod *corev1.Pod) (*corev1.PersistentVolumeClaim, error) { + controllerMo, ok := controller.(metav1.Object) + if !ok { + return nil, fmt.Errorf("%T is not a metav1.Object, cannot call setControllerReference", controller) + } + kind := controller.GetObjectKind().GroupVersionKind().Kind + name := controllerMo.GetName() + namespace := controllerMo.GetNamespace() + pvcName := pvc.GetName() podName := pod.GetName() @@ -129,7 +148,7 @@ func (rpc *realPVCControl) UpdateMetaInfo(tc *v1alpha1.TidbCluster, pvc *corev1. pvc.Labels[label.StoreIDLabelKey] == storeID && pvc.Labels[label.AnnPodNameKey] == podName && pvc.Annotations[label.AnnPodNameKey] == podName { - klog.V(4).Infof("pvc %s/%s already has labels and annotations synced, skipping, TidbCluster: %s", ns, pvcName, tcName) + klog.V(4).Infof("pvc %s/%s already has labels and annotations synced, skipping, %s: %s", namespace, pvcName, kind, name) return pvc, nil } @@ -144,20 +163,20 @@ func (rpc *realPVCControl) UpdateMetaInfo(tc *v1alpha1.TidbCluster, pvc *corev1. var updatePVC *corev1.PersistentVolumeClaim err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { var updateErr error - updatePVC, updateErr = rpc.kubeCli.CoreV1().PersistentVolumeClaims(ns).Update(pvc) + updatePVC, updateErr = rpc.kubeCli.CoreV1().PersistentVolumeClaims(namespace).Update(pvc) if updateErr == nil { - klog.V(4).Infof("update PVC: [%s/%s] successfully, TidbCluster: %s", ns, pvcName, tcName) + klog.V(4).Infof("update PVC: [%s/%s] successfully, %s: %s", namespace, pvcName, kind, name) return nil } - klog.Errorf("failed to update PVC: [%s/%s], TidbCluster: %s, error: %v", ns, pvcName, tcName, updateErr) + klog.Errorf("failed to update PVC: [%s/%s], %s: %s, error: %v", namespace, pvcName, kind, name, updateErr) - if updated, err := rpc.pvcLister.PersistentVolumeClaims(ns).Get(pvcName); err == nil { + if updated, err := rpc.pvcLister.PersistentVolumeClaims(namespace).Get(pvcName); err == nil { // make a copy so we don't mutate the shared cache pvc = updated.DeepCopy() pvc.Labels = labels pvc.Annotations = ann } else { - utilruntime.HandleError(fmt.Errorf("error getting updated PVC %s/%s from lister: %v", ns, pvcName, err)) + utilruntime.HandleError(fmt.Errorf("error getting updated PVC %s/%s from lister: %v", namespace, pvcName, err)) } return updateErr @@ -165,18 +184,17 @@ func (rpc *realPVCControl) UpdateMetaInfo(tc *v1alpha1.TidbCluster, pvc *corev1. return updatePVC, err } -func (rpc *realPVCControl) recordPVCEvent(verb string, tc *v1alpha1.TidbCluster, pvcName string, err error) { - tcName := tc.GetName() +func (rpc *realPVCControl) recordPVCEvent(verb, kind, name string, object runtime.Object, pvcName string, err error) { if err == nil { reason := fmt.Sprintf("Successful%s", strings.Title(verb)) - msg := fmt.Sprintf("%s PVC %s in TidbCluster %s successful", - strings.ToLower(verb), pvcName, tcName) - rpc.recorder.Event(tc, corev1.EventTypeNormal, reason, msg) + msg := fmt.Sprintf("%s PVC %s in %s %s successful", + strings.ToLower(verb), pvcName, kind, name) + rpc.recorder.Event(object, corev1.EventTypeNormal, reason, msg) } else { reason := fmt.Sprintf("Failed%s", strings.Title(verb)) - msg := fmt.Sprintf("%s PVC %s in TidbCluster %s failed error: %s", - strings.ToLower(verb), pvcName, tcName, err) - rpc.recorder.Event(tc, corev1.EventTypeWarning, reason, msg) + msg := fmt.Sprintf("%s PVC %s in %s %s failed error: %s", + strings.ToLower(verb), pvcName, kind, name, err) + rpc.recorder.Event(object, corev1.EventTypeWarning, reason, msg) } } @@ -209,7 +227,7 @@ func (fpc *FakePVCControl) SetDeletePVCError(err error, after int) { } // DeletePVC deletes the pvc -func (fpc *FakePVCControl) DeletePVC(_ *v1alpha1.TidbCluster, pvc *corev1.PersistentVolumeClaim) error { +func (fpc *FakePVCControl) DeletePVC(_ runtime.Object, pvc *corev1.PersistentVolumeClaim) error { defer fpc.deletePVCTracker.Inc() if fpc.deletePVCTracker.ErrorReady() { defer fpc.deletePVCTracker.Reset() @@ -220,7 +238,7 @@ func (fpc *FakePVCControl) DeletePVC(_ *v1alpha1.TidbCluster, pvc *corev1.Persis } // UpdatePVC updates the annotation, labels and spec of pvc -func (fpc *FakePVCControl) UpdatePVC(_ *v1alpha1.TidbCluster, pvc *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error) { +func (fpc *FakePVCControl) UpdatePVC(_ runtime.Object, pvc *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error) { defer fpc.updatePVCTracker.Inc() if fpc.updatePVCTracker.ErrorReady() { defer fpc.updatePVCTracker.Reset() @@ -231,7 +249,7 @@ func (fpc *FakePVCControl) UpdatePVC(_ *v1alpha1.TidbCluster, pvc *corev1.Persis } // UpdateMetaInfo updates the meta info of pvc -func (fpc *FakePVCControl) UpdateMetaInfo(_ *v1alpha1.TidbCluster, pvc *corev1.PersistentVolumeClaim, pod *corev1.Pod) (*corev1.PersistentVolumeClaim, error) { +func (fpc *FakePVCControl) UpdateMetaInfo(_ runtime.Object, pvc *corev1.PersistentVolumeClaim, pod *corev1.Pod) (*corev1.PersistentVolumeClaim, error) { defer fpc.updatePVCTracker.Inc() if fpc.updatePVCTracker.ErrorReady() { defer fpc.updatePVCTracker.Reset() diff --git a/pkg/controller/service_control.go b/pkg/controller/service_control.go index 5a0dd1cf56..d8ca9e54eb 100644 --- a/pkg/controller/service_control.go +++ b/pkg/controller/service_control.go @@ -17,11 +17,11 @@ import ( "fmt" "strings" - "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" tcinformers "github.com/pingcap/tidb-operator/pkg/client/informers/externalversions/pingcap/v1alpha1" v1listers "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap/v1alpha1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" @@ -37,9 +37,9 @@ var ExternalTrafficPolicy string // ServiceControlInterface manages Services used in TidbCluster type ServiceControlInterface interface { - CreateService(*v1alpha1.TidbCluster, *corev1.Service) error - UpdateService(*v1alpha1.TidbCluster, *corev1.Service) (*corev1.Service, error) - DeleteService(*v1alpha1.TidbCluster, *corev1.Service) error + CreateService(runtime.Object, *corev1.Service) error + UpdateService(runtime.Object, *corev1.Service) (*corev1.Service, error) + DeleteService(runtime.Object, *corev1.Service) error } type realServiceControl struct { @@ -57,29 +57,41 @@ func NewRealServiceControl(kubeCli kubernetes.Interface, svcLister corelisters.S } } -func (sc *realServiceControl) CreateService(tc *v1alpha1.TidbCluster, svc *corev1.Service) error { - _, err := sc.kubeCli.CoreV1().Services(tc.Namespace).Create(svc) - sc.recordServiceEvent("create", tc, svc, err) +func (sc *realServiceControl) CreateService(controller runtime.Object, svc *corev1.Service) error { + controllerMo, ok := controller.(metav1.Object) + if !ok { + return fmt.Errorf("%T is not a metav1.Object, cannot call setControllerReference", controller) + } + kind := controller.GetObjectKind().GroupVersionKind().Kind + name := controllerMo.GetName() + namespace := controllerMo.GetNamespace() + _, err := sc.kubeCli.CoreV1().Services(namespace).Create(svc) + sc.recordServiceEvent("create", name, kind, controller, svc, err) return err } -func (sc *realServiceControl) UpdateService(tc *v1alpha1.TidbCluster, svc *corev1.Service) (*corev1.Service, error) { - ns := tc.GetNamespace() - tcName := tc.GetName() +func (sc *realServiceControl) UpdateService(controller runtime.Object, svc *corev1.Service) (*corev1.Service, error) { + controllerMo, ok := controller.(metav1.Object) + if !ok { + return nil, fmt.Errorf("%T is not a metav1.Object, cannot call setControllerReference", controller) + } + kind := controller.GetObjectKind().GroupVersionKind().Kind + name := controllerMo.GetName() + namespace := controllerMo.GetNamespace() svcName := svc.GetName() svcSpec := svc.Spec.DeepCopy() var updateSvc *corev1.Service err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { var updateErr error - updateSvc, updateErr = sc.kubeCli.CoreV1().Services(ns).Update(svc) + updateSvc, updateErr = sc.kubeCli.CoreV1().Services(namespace).Update(svc) if updateErr == nil { - klog.Infof("update Service: [%s/%s] successfully, TidbCluster: %s", ns, svcName, tcName) + klog.Infof("update Service: [%s/%s] successfully, kind: %s, name: %s", namespace, svcName, kind, name) return nil } - if updated, err := sc.svcLister.Services(tc.Namespace).Get(svcName); err != nil { - utilruntime.HandleError(fmt.Errorf("error getting updated Service %s/%s from lister: %v", ns, svcName, err)) + if updated, err := sc.svcLister.Services(namespace).Get(svcName); err != nil { + utilruntime.HandleError(fmt.Errorf("error getting updated Service %s/%s from lister: %v", namespace, svcName, err)) } else { svc = updated.DeepCopy() svc.Spec = *svcSpec @@ -90,25 +102,32 @@ func (sc *realServiceControl) UpdateService(tc *v1alpha1.TidbCluster, svc *corev return updateSvc, err } -func (sc *realServiceControl) DeleteService(tc *v1alpha1.TidbCluster, svc *corev1.Service) error { - err := sc.kubeCli.CoreV1().Services(tc.Namespace).Delete(svc.Name, nil) - sc.recordServiceEvent("delete", tc, svc, err) +func (sc *realServiceControl) DeleteService(controller runtime.Object, svc *corev1.Service) error { + controllerMo, ok := controller.(metav1.Object) + if !ok { + return fmt.Errorf("%T is not a metav1.Object, cannot call setControllerReference", controller) + } + kind := controller.GetObjectKind().GroupVersionKind().Kind + name := controllerMo.GetName() + namespace := controllerMo.GetNamespace() + + err := sc.kubeCli.CoreV1().Services(namespace).Delete(svc.Name, nil) + sc.recordServiceEvent("delete", name, kind, controller, svc, err) return err } -func (sc *realServiceControl) recordServiceEvent(verb string, tc *v1alpha1.TidbCluster, svc *corev1.Service, err error) { - tcName := tc.GetName() +func (sc *realServiceControl) recordServiceEvent(verb, name, kind string, object runtime.Object, svc *corev1.Service, err error) { svcName := svc.GetName() if err == nil { reason := fmt.Sprintf("Successful%s", strings.Title(verb)) - msg := fmt.Sprintf("%s Service %s in TidbCluster %s successful", - strings.ToLower(verb), svcName, tcName) - sc.recorder.Event(tc, corev1.EventTypeNormal, reason, msg) + msg := fmt.Sprintf("%s Service %s in %s %s successful", + strings.ToLower(verb), svcName, kind, name) + sc.recorder.Event(object, corev1.EventTypeNormal, reason, msg) } else { reason := fmt.Sprintf("Failed%s", strings.Title(verb)) - msg := fmt.Sprintf("%s Service %s in TidbCluster %s failed error: %s", - strings.ToLower(verb), svcName, tcName, err) - sc.recorder.Event(tc, corev1.EventTypeWarning, reason, msg) + msg := fmt.Sprintf("%s Service %s in %s %s failed error: %s", + strings.ToLower(verb), svcName, kind, name, err) + sc.recorder.Event(object, corev1.EventTypeWarning, reason, msg) } } @@ -156,7 +175,7 @@ func (ssc *FakeServiceControl) SetDeleteServiceError(err error, after int) { } // CreateService adds the service to SvcIndexer -func (ssc *FakeServiceControl) CreateService(_ *v1alpha1.TidbCluster, svc *corev1.Service) error { +func (ssc *FakeServiceControl) CreateService(_ runtime.Object, svc *corev1.Service) error { defer ssc.createServiceTracker.Inc() if ssc.createServiceTracker.ErrorReady() { defer ssc.createServiceTracker.Reset() @@ -181,7 +200,7 @@ func (ssc *FakeServiceControl) CreateService(_ *v1alpha1.TidbCluster, svc *corev } // UpdateService updates the service of SvcIndexer -func (ssc *FakeServiceControl) UpdateService(_ *v1alpha1.TidbCluster, svc *corev1.Service) (*corev1.Service, error) { +func (ssc *FakeServiceControl) UpdateService(_ runtime.Object, svc *corev1.Service) (*corev1.Service, error) { defer ssc.updateServiceTracker.Inc() if ssc.updateServiceTracker.ErrorReady() { defer ssc.updateServiceTracker.Reset() @@ -204,7 +223,7 @@ func (ssc *FakeServiceControl) UpdateService(_ *v1alpha1.TidbCluster, svc *corev } // DeleteService deletes the service of SvcIndexer -func (ssc *FakeServiceControl) DeleteService(_ *v1alpha1.TidbCluster, _ *corev1.Service) error { +func (ssc *FakeServiceControl) DeleteService(_ runtime.Object, _ *corev1.Service) error { return nil } diff --git a/pkg/controller/stateful_set_control.go b/pkg/controller/stateful_set_control.go index cbe83a02e2..a1fbdd95a2 100644 --- a/pkg/controller/stateful_set_control.go +++ b/pkg/controller/stateful_set_control.go @@ -17,12 +17,13 @@ import ( "fmt" "strings" - "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" tcinformers "github.com/pingcap/tidb-operator/pkg/client/informers/externalversions/pingcap/v1alpha1" v1listers "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap/v1alpha1" apps "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" appsinformers "k8s.io/client-go/informers/apps/v1" "k8s.io/client-go/kubernetes" @@ -35,12 +36,9 @@ import ( // StatefulSetControlInterface defines the interface that uses to create, update, and delete StatefulSets, type StatefulSetControlInterface interface { - // CreateStatefulSet creates a StatefulSet in a TidbCluster. - CreateStatefulSet(*v1alpha1.TidbCluster, *apps.StatefulSet) error - // UpdateStatefulSet updates a StatefulSet in a TidbCluster. - UpdateStatefulSet(*v1alpha1.TidbCluster, *apps.StatefulSet) (*apps.StatefulSet, error) - // DeleteStatefulSet deletes a StatefulSet in a TidbCluster. - DeleteStatefulSet(*v1alpha1.TidbCluster, *apps.StatefulSet) error + CreateStatefulSet(runtime.Object, *apps.StatefulSet) error + UpdateStatefulSet(runtime.Object, *apps.StatefulSet) (*apps.StatefulSet, error) + DeleteStatefulSet(runtime.Object, *apps.StatefulSet) error } type realStatefulSetControl struct { @@ -54,21 +52,35 @@ func NewRealStatefuSetControl(kubeCli kubernetes.Interface, setLister appslister return &realStatefulSetControl{kubeCli, setLister, recorder} } -// CreateStatefulSet create a StatefulSet in a TidbCluster. -func (sc *realStatefulSetControl) CreateStatefulSet(tc *v1alpha1.TidbCluster, set *apps.StatefulSet) error { - _, err := sc.kubeCli.AppsV1().StatefulSets(tc.Namespace).Create(set) +// CreateStatefulSet create a StatefulSet for a controller +func (sc *realStatefulSetControl) CreateStatefulSet(controller runtime.Object, set *apps.StatefulSet) error { + controllerMo, ok := controller.(metav1.Object) + if !ok { + return fmt.Errorf("%T is not a metav1.Object, cannot call setControllerReference", controller) + } + kind := controller.GetObjectKind().GroupVersionKind().Kind + name := controllerMo.GetName() + namespace := controllerMo.GetNamespace() + + _, err := sc.kubeCli.AppsV1().StatefulSets(namespace).Create(set) // sink already exists errors if apierrors.IsAlreadyExists(err) { return err } - sc.recordStatefulSetEvent("create", tc, set, err) + sc.recordStatefulSetEvent("create", kind, name, controller, set, err) return err } // UpdateStatefulSet update a StatefulSet in a TidbCluster. -func (sc *realStatefulSetControl) UpdateStatefulSet(tc *v1alpha1.TidbCluster, set *apps.StatefulSet) (*apps.StatefulSet, error) { - ns := tc.GetNamespace() - tcName := tc.GetName() +func (sc *realStatefulSetControl) UpdateStatefulSet(controller runtime.Object, set *apps.StatefulSet) (*apps.StatefulSet, error) { + controllerMo, ok := controller.(metav1.Object) + if !ok { + return nil, fmt.Errorf("%T is not a metav1.Object, cannot call setControllerReference", controller) + } + kind := controller.GetObjectKind().GroupVersionKind().Kind + name := controllerMo.GetName() + namespace := controllerMo.GetNamespace() + setName := set.GetName() setSpec := set.Spec.DeepCopy() var updatedSS *apps.StatefulSet @@ -76,19 +88,19 @@ func (sc *realStatefulSetControl) UpdateStatefulSet(tc *v1alpha1.TidbCluster, se err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { // TODO: verify if StatefulSet identity(name, namespace, labels) matches TidbCluster var updateErr error - updatedSS, updateErr = sc.kubeCli.AppsV1().StatefulSets(ns).Update(set) + updatedSS, updateErr = sc.kubeCli.AppsV1().StatefulSets(namespace).Update(set) if updateErr == nil { - klog.Infof("TidbCluster: [%s/%s]'s StatefulSet: [%s/%s] updated successfully", ns, tcName, ns, setName) + klog.Infof("%s: [%s/%s]'s StatefulSet: [%s/%s] updated successfully", kind, namespace, name, namespace, setName) return nil } - klog.Errorf("failed to update TidbCluster: [%s/%s]'s StatefulSet: [%s/%s], error: %v", ns, tcName, ns, setName, updateErr) + klog.Errorf("failed to update %s: [%s/%s]'s StatefulSet: [%s/%s], error: %v", kind, namespace, name, namespace, setName, updateErr) - if updated, err := sc.setLister.StatefulSets(ns).Get(setName); err == nil { + if updated, err := sc.setLister.StatefulSets(namespace).Get(setName); err == nil { // make a copy so we don't mutate the shared cache set = updated.DeepCopy() set.Spec = *setSpec } else { - utilruntime.HandleError(fmt.Errorf("error getting updated StatefulSet %s/%s from lister: %v", ns, setName, err)) + utilruntime.HandleError(fmt.Errorf("error getting updated StatefulSet %s/%s from lister: %v", namespace, setName, err)) } return updateErr }) @@ -97,25 +109,32 @@ func (sc *realStatefulSetControl) UpdateStatefulSet(tc *v1alpha1.TidbCluster, se } // DeleteStatefulSet delete a StatefulSet in a TidbCluster. -func (sc *realStatefulSetControl) DeleteStatefulSet(tc *v1alpha1.TidbCluster, set *apps.StatefulSet) error { - err := sc.kubeCli.AppsV1().StatefulSets(tc.Namespace).Delete(set.Name, nil) - sc.recordStatefulSetEvent("delete", tc, set, err) +func (sc *realStatefulSetControl) DeleteStatefulSet(controller runtime.Object, set *apps.StatefulSet) error { + controllerMo, ok := controller.(metav1.Object) + if !ok { + return fmt.Errorf("%T is not a metav1.Object, cannot call setControllerReference", controller) + } + kind := controller.GetObjectKind().GroupVersionKind().Kind + name := controllerMo.GetName() + namespace := controllerMo.GetNamespace() + + err := sc.kubeCli.AppsV1().StatefulSets(namespace).Delete(set.Name, nil) + sc.recordStatefulSetEvent("delete", kind, name, controller, set, err) return err } -func (sc *realStatefulSetControl) recordStatefulSetEvent(verb string, tc *v1alpha1.TidbCluster, set *apps.StatefulSet, err error) { - tcName := tc.Name +func (sc *realStatefulSetControl) recordStatefulSetEvent(verb, kind, name string, object runtime.Object, set *apps.StatefulSet, err error) { setName := set.Name if err == nil { reason := fmt.Sprintf("Successful%s", strings.Title(verb)) - message := fmt.Sprintf("%s StatefulSet %s in TidbCluster %s successful", - strings.ToLower(verb), setName, tcName) - sc.recorder.Event(tc, corev1.EventTypeNormal, reason, message) + message := fmt.Sprintf("%s StatefulSet %s in %s %s successful", + strings.ToLower(verb), setName, kind, name) + sc.recorder.Event(object, corev1.EventTypeNormal, reason, message) } else { reason := fmt.Sprintf("Failed%s", strings.Title(verb)) - message := fmt.Sprintf("%s StatefulSet %s in TidbCluster %s failed error: %s", - strings.ToLower(verb), setName, tcName, err) - sc.recorder.Event(tc, corev1.EventTypeWarning, reason, message) + message := fmt.Sprintf("%s StatefulSet %s in %s %s failed error: %s", + strings.ToLower(verb), setName, kind, name, err) + sc.recorder.Event(object, corev1.EventTypeWarning, reason, message) } } @@ -167,7 +186,7 @@ func (ssc *FakeStatefulSetControl) SetStatusChange(fn func(*apps.StatefulSet)) { } // CreateStatefulSet adds the statefulset to SetIndexer -func (ssc *FakeStatefulSetControl) CreateStatefulSet(_ *v1alpha1.TidbCluster, set *apps.StatefulSet) error { +func (ssc *FakeStatefulSetControl) CreateStatefulSet(_ runtime.Object, set *apps.StatefulSet) error { defer func() { ssc.createStatefulSetTracker.Inc() ssc.statusChange = nil @@ -186,7 +205,7 @@ func (ssc *FakeStatefulSetControl) CreateStatefulSet(_ *v1alpha1.TidbCluster, se } // UpdateStatefulSet updates the statefulset of SetIndexer -func (ssc *FakeStatefulSetControl) UpdateStatefulSet(_ *v1alpha1.TidbCluster, set *apps.StatefulSet) (*apps.StatefulSet, error) { +func (ssc *FakeStatefulSetControl) UpdateStatefulSet(_ runtime.Object, set *apps.StatefulSet) (*apps.StatefulSet, error) { defer func() { ssc.updateStatefulSetTracker.Inc() ssc.statusChange = nil @@ -204,7 +223,7 @@ func (ssc *FakeStatefulSetControl) UpdateStatefulSet(_ *v1alpha1.TidbCluster, se } // DeleteStatefulSet deletes the statefulset of SetIndexer -func (ssc *FakeStatefulSetControl) DeleteStatefulSet(_ *v1alpha1.TidbCluster, _ *apps.StatefulSet) error { +func (ssc *FakeStatefulSetControl) DeleteStatefulSet(_ runtime.Object, _ *apps.StatefulSet) error { return nil }