From 987df416451681ea2ad1ad681d0e7729416fae71 Mon Sep 17 00:00:00 2001 From: xuzhonghu Date: Thu, 18 Apr 2019 15:33:26 +0800 Subject: [PATCH] simplify resource event handler --- pkg/controllers/job/job_controller.go | 52 +++---------------- pkg/controllers/job/job_controller_handler.go | 41 +++++++++------ 2 files changed, 32 insertions(+), 61 deletions(-) diff --git a/pkg/controllers/job/job_controller.go b/pkg/controllers/job/job_controller.go index c3b7715a5d..bbe28ea905 100644 --- a/pkg/controllers/job/job_controller.go +++ b/pkg/controllers/job/job_controller.go @@ -17,12 +17,9 @@ limitations under the License. package job import ( - "fmt" - "github.com/golang/glog" "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" @@ -38,8 +35,6 @@ import ( kbinfo "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/scheduling/v1alpha1" kblister "github.com/kubernetes-sigs/kube-batch/pkg/client/listers/scheduling/v1alpha1" - v1corev1 "volcano.sh/volcano/pkg/apis/bus/v1alpha1" - "volcano.sh/volcano/pkg/apis/helpers" vkver "volcano.sh/volcano/pkg/client/clientset/versioned" vkscheme "volcano.sh/volcano/pkg/client/clientset/versioned/scheme" vkinfoext "volcano.sh/volcano/pkg/client/informers/externalversions" @@ -127,53 +122,18 @@ func NewJobController(config *rest.Config) *Controller { cc.jobSynced = cc.jobInformer.Informer().HasSynced cc.cmdInformer = vkinfoext.NewSharedInformerFactory(cc.vkClients, 0).Bus().V1alpha1().Commands() - cc.cmdInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: func(obj interface{}) bool { - switch t := obj.(type) { - case *v1corev1.Command: - return helpers.ControlledBy(t, helpers.JobKind) - case cache.DeletedFinalStateUnknown: - if cmd, ok := t.Obj.(*v1corev1.Command); ok { - return helpers.ControlledBy(cmd, helpers.JobKind) - } - runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Command", obj)) - return false - default: - runtime.HandleError(fmt.Errorf("unable to handle object %T", obj)) - return false - } - }, - Handler: cache.ResourceEventHandlerFuncs{ - AddFunc: cc.addCommand, - }, + cc.cmdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: cc.addCommand, }) cc.cmdLister = cc.cmdInformer.Lister() cc.cmdSynced = cc.cmdInformer.Informer().HasSynced cc.sharedInformers = informers.NewSharedInformerFactory(cc.kubeClients, 0) podInformer := cc.sharedInformers.Core().V1().Pods() - - podInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: func(obj interface{}) bool { - switch t := obj.(type) { - case *v1.Pod: - return helpers.ControlledBy(t, helpers.JobKind) - case cache.DeletedFinalStateUnknown: - if pod, ok := t.Obj.(*v1.Pod); ok { - return helpers.ControlledBy(pod, helpers.JobKind) - } - runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod", obj)) - return false - default: - runtime.HandleError(fmt.Errorf("unable to handle object %T", obj)) - return false - } - }, - Handler: cache.ResourceEventHandlerFuncs{ - AddFunc: cc.addPod, - UpdateFunc: cc.updatePod, - DeleteFunc: cc.deletePod, - }, + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: cc.addPod, + UpdateFunc: cc.updatePod, + DeleteFunc: cc.deletePod, }) cc.podLister = podInformer.Lister() diff --git a/pkg/controllers/job/job_controller_handler.go b/pkg/controllers/job/job_controller_handler.go index 4b9197d506..a32af324c1 100644 --- a/pkg/controllers/job/job_controller_handler.go +++ b/pkg/controllers/job/job_controller_handler.go @@ -24,6 +24,7 @@ import ( "github.com/golang/glog" "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/tools/cache" kbtype "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" @@ -103,8 +104,17 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) { func (cc *Controller) deleteJob(obj interface{}) { job, ok := obj.(*vkbatchv1.Job) if !ok { - glog.Errorf("obj is not Job") - return + // If we reached here it means the Job was deleted but its final state is unrecorded. + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("Couldn't get object from tombstone %#v", obj) + return + } + job, ok = tombstone.Obj.(*vkbatchv1.Job) + if !ok { + glog.Errorf("Tombstone contained object that is not a volcano Job: %#v", obj) + return + } } if err := cc.cache.Delete(job); err != nil { @@ -228,20 +238,19 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) { } func (cc *Controller) deletePod(obj interface{}) { - var pod *v1.Pod - switch t := obj.(type) { - case *v1.Pod: - pod = t - case cache.DeletedFinalStateUnknown: - var ok bool - pod, ok = t.Obj.(*v1.Pod) + pod, ok := obj.(*v1.Pod) + if !ok { + // If we reached here it means the pod was deleted but its final state is unrecorded. + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - glog.Errorf("Cannot convert to *v1.Pod: %v", t.Obj) + glog.Errorf("Couldn't get object from tombstone %#v", obj) + return + } + pod, ok = tombstone.Obj.(*v1.Pod) + if !ok { + glog.Errorf("Tombstone contained object that is not a Pod: %#v", obj) return } - default: - glog.Errorf("Cannot convert to *v1.Pod: %v", t) - return } taskName, found := pod.Annotations[vkbatchv1.TaskSpecKey] @@ -314,8 +323,10 @@ func (cc *Controller) processNextCommand() bool { defer cc.commandQueue.Done(cmd) if err := cc.vkClients.BusV1alpha1().Commands(cmd.Namespace).Delete(cmd.Name, nil); err != nil { - glog.Errorf("Failed to delete Command <%s/%s>.", cmd.Namespace, cmd.Name) - cc.commandQueue.AddRateLimited(cmd) + if !apierrors.IsNotFound(err) { + glog.Errorf("Failed to delete Command <%s/%s>.", cmd.Namespace, cmd.Name) + cc.commandQueue.AddRateLimited(cmd) + } return true } cc.recordJobEvent(cmd.Namespace, cmd.TargetObject.Name,