Skip to content

Commit

Permalink
simplify resource event handler
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxuzhonghu committed Apr 18, 2019
1 parent 0252cff commit 987df41
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 61 deletions.
52 changes: 6 additions & 46 deletions pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@ limitations under the License.
package job

import (
"fmt"

"github.com/golang/glog"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
Expand All @@ -38,8 +35,6 @@ import (
kbinfo "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/scheduling/v1alpha1"
kblister "github.com/kubernetes-sigs/kube-batch/pkg/client/listers/scheduling/v1alpha1"

v1corev1 "volcano.sh/volcano/pkg/apis/bus/v1alpha1"
"volcano.sh/volcano/pkg/apis/helpers"
vkver "volcano.sh/volcano/pkg/client/clientset/versioned"
vkscheme "volcano.sh/volcano/pkg/client/clientset/versioned/scheme"
vkinfoext "volcano.sh/volcano/pkg/client/informers/externalversions"
Expand Down Expand Up @@ -127,53 +122,18 @@ func NewJobController(config *rest.Config) *Controller {
cc.jobSynced = cc.jobInformer.Informer().HasSynced

cc.cmdInformer = vkinfoext.NewSharedInformerFactory(cc.vkClients, 0).Bus().V1alpha1().Commands()
cc.cmdInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1corev1.Command:
return helpers.ControlledBy(t, helpers.JobKind)
case cache.DeletedFinalStateUnknown:
if cmd, ok := t.Obj.(*v1corev1.Command); ok {
return helpers.ControlledBy(cmd, helpers.JobKind)
}
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Command", obj))
return false
default:
runtime.HandleError(fmt.Errorf("unable to handle object %T", obj))
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: cc.addCommand,
},
cc.cmdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cc.addCommand,
})
cc.cmdLister = cc.cmdInformer.Lister()
cc.cmdSynced = cc.cmdInformer.Informer().HasSynced

cc.sharedInformers = informers.NewSharedInformerFactory(cc.kubeClients, 0)
podInformer := cc.sharedInformers.Core().V1().Pods()

podInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return helpers.ControlledBy(t, helpers.JobKind)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
return helpers.ControlledBy(pod, helpers.JobKind)
}
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod", obj))
return false
default:
runtime.HandleError(fmt.Errorf("unable to handle object %T", obj))
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: cc.addPod,
UpdateFunc: cc.updatePod,
DeleteFunc: cc.deletePod,
},
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cc.addPod,
UpdateFunc: cc.updatePod,
DeleteFunc: cc.deletePod,
})

cc.podLister = podInformer.Lister()
Expand Down
41 changes: 26 additions & 15 deletions pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/golang/glog"

"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/cache"

kbtype "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
Expand Down Expand Up @@ -103,8 +104,17 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) {
func (cc *Controller) deleteJob(obj interface{}) {
job, ok := obj.(*vkbatchv1.Job)
if !ok {
glog.Errorf("obj is not Job")
return
// If we reached here it means the Job was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %#v", obj)
return
}
job, ok = tombstone.Obj.(*vkbatchv1.Job)
if !ok {
glog.Errorf("Tombstone contained object that is not a volcano Job: %#v", obj)
return
}
}

if err := cc.cache.Delete(job); err != nil {
Expand Down Expand Up @@ -228,20 +238,19 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) {
}

func (cc *Controller) deletePod(obj interface{}) {
var pod *v1.Pod
switch t := obj.(type) {
case *v1.Pod:
pod = t
case cache.DeletedFinalStateUnknown:
var ok bool
pod, ok = t.Obj.(*v1.Pod)
pod, ok := obj.(*v1.Pod)
if !ok {
// If we reached here it means the pod was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Cannot convert to *v1.Pod: %v", t.Obj)
glog.Errorf("Couldn't get object from tombstone %#v", obj)
return
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
glog.Errorf("Tombstone contained object that is not a Pod: %#v", obj)
return
}
default:
glog.Errorf("Cannot convert to *v1.Pod: %v", t)
return
}

taskName, found := pod.Annotations[vkbatchv1.TaskSpecKey]
Expand Down Expand Up @@ -314,8 +323,10 @@ func (cc *Controller) processNextCommand() bool {
defer cc.commandQueue.Done(cmd)

if err := cc.vkClients.BusV1alpha1().Commands(cmd.Namespace).Delete(cmd.Name, nil); err != nil {
glog.Errorf("Failed to delete Command <%s/%s>.", cmd.Namespace, cmd.Name)
cc.commandQueue.AddRateLimited(cmd)
if !apierrors.IsNotFound(err) {
glog.Errorf("Failed to delete Command <%s/%s>.", cmd.Namespace, cmd.Name)
cc.commandQueue.AddRateLimited(cmd)
}
return true
}
cc.recordJobEvent(cmd.Namespace, cmd.TargetObject.Name,
Expand Down

0 comments on commit 987df41

Please sign in to comment.