Skip to content

Commit

Permalink
Merge pull request #57 from volcano-sh/controller
Browse files Browse the repository at this point in the history
some minor cleanup and optmization
  • Loading branch information
Klaus Ma authored Apr 2, 2019
2 parents 55e1549 + 513bdf5 commit be49d98
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 30 deletions.
13 changes: 9 additions & 4 deletions pkg/controllers/job/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"sync"

"github.com/golang/glog"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -268,17 +267,22 @@ func (jc jobCache) TaskCompleted(jobKey, taskName string) bool {
return completed >= taskReplicas
}

func (jc *jobCache) processCleanupJob() {
func (jc *jobCache) worker() {
for jc.processCleanupJob() {
}
}

func (jc *jobCache) processCleanupJob() bool {
obj, shutdown := jc.deletedJobs.Get()
if shutdown {
return
return false
}
defer jc.deletedJobs.Done(obj)

job, ok := obj.(*apis.JobInfo)
if !ok {
glog.Errorf("failed to convert %v to *apis.JobInfo", obj)
return
return true
}

jc.Mutex.Lock()
Expand All @@ -293,6 +297,7 @@ func (jc *jobCache) processCleanupJob() {
// Retry
jc.deleteJob(job)
}
return true
}

func (jc *jobCache) deleteJob(job *apis.JobInfo) {
Expand Down
51 changes: 27 additions & 24 deletions pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
Expand Down Expand Up @@ -60,12 +59,10 @@ type Controller struct {
vkClients *vkver.Clientset
kbClients *kbver.Clientset

jobInformer vkbatchinfo.JobInformer
podInformer coreinformers.PodInformer
pvcInformer coreinformers.PersistentVolumeClaimInformer
pgInformer kbinfo.PodGroupInformer
svcInformer coreinformers.ServiceInformer
cmdInformer vkcoreinfo.CommandInformer
jobInformer vkbatchinfo.JobInformer
pgInformer kbinfo.PodGroupInformer
cmdInformer vkcoreinfo.CommandInformer
sharedInformers informers.SharedInformerFactory

// A store of jobs
jobLister vkbatchlister.JobLister
Expand Down Expand Up @@ -152,9 +149,10 @@ func NewJobController(config *rest.Config) *Controller {
cc.cmdLister = cc.cmdInformer.Lister()
cc.cmdSynced = cc.cmdInformer.Informer().HasSynced

cc.podInformer = informers.NewSharedInformerFactory(cc.kubeClients, 0).Core().V1().Pods()
cc.sharedInformers = informers.NewSharedInformerFactory(cc.kubeClients, 0)
podInformer := cc.sharedInformers.Core().V1().Pods()

cc.podInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
podInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
Expand All @@ -177,16 +175,16 @@ func NewJobController(config *rest.Config) *Controller {
},
})

cc.podLister = cc.podInformer.Lister()
cc.podSynced = cc.podInformer.Informer().HasSynced
cc.podLister = podInformer.Lister()
cc.podSynced = podInformer.Informer().HasSynced

cc.pvcInformer = informers.NewSharedInformerFactory(cc.kubeClients, 0).Core().V1().PersistentVolumeClaims()
cc.pvcLister = cc.pvcInformer.Lister()
cc.pvcSynced = cc.pvcInformer.Informer().HasSynced
pvcInformer := cc.sharedInformers.Core().V1().PersistentVolumeClaims()
cc.pvcLister = pvcInformer.Lister()
cc.pvcSynced = pvcInformer.Informer().HasSynced

cc.svcInformer = informers.NewSharedInformerFactory(cc.kubeClients, 0).Core().V1().Services()
cc.svcLister = cc.svcInformer.Lister()
cc.svcSynced = cc.svcInformer.Informer().HasSynced
svcInformer := cc.sharedInformers.Core().V1().Services()
cc.svcLister = svcInformer.Lister()
cc.svcSynced = svcInformer.Informer().HasSynced

cc.pgInformer = kbinfoext.NewSharedInformerFactory(cc.kbClients, 0).Scheduling().V1alpha1().PodGroups()
cc.pgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand All @@ -205,11 +203,9 @@ func NewJobController(config *rest.Config) *Controller {
// Run start JobController
func (cc *Controller) Run(stopCh <-chan struct{}) {
go cc.jobInformer.Informer().Run(stopCh)
go cc.podInformer.Informer().Run(stopCh)
go cc.pvcInformer.Informer().Run(stopCh)
go cc.pgInformer.Informer().Run(stopCh)
go cc.svcInformer.Informer().Run(stopCh)
go cc.cmdInformer.Informer().Run(stopCh)
go cc.sharedInformers.Start(stopCh)

cache.WaitForCacheSync(stopCh, cc.jobSynced, cc.podSynced, cc.pgSynced,
cc.svcSynced, cc.cmdSynced, cc.pvcSynced)
Expand All @@ -223,10 +219,15 @@ func (cc *Controller) Run(stopCh <-chan struct{}) {
}

func (cc *Controller) worker() {
for cc.processNextReq() {
}
}

func (cc *Controller) processNextReq() bool {
obj, shutdown := cc.queue.Get()
if shutdown {
glog.Errorf("Fail to pop item from queue")
return
return false
}

req := obj.(apis.Request)
Expand All @@ -238,14 +239,14 @@ func (cc *Controller) worker() {
if err != nil {
// TODO(k82cn): ignore not-ready error.
glog.Errorf("Failed to get job by <%v> from cache: %v", req, err)
return
return true
}

st := state.NewState(jobInfo)
if st == nil {
glog.Errorf("Invalid state <%s> of Job <%v/%v>",
jobInfo.Job.Status.State, jobInfo.Job.Namespace, jobInfo.Job.Name)
return
return true
}

action := applyPolicies(jobInfo.Job, &req)
Expand All @@ -257,9 +258,11 @@ func (cc *Controller) worker() {
jobInfo.Job.Namespace, jobInfo.Job.Name, err)
// If any error, requeue it.
cc.queue.AddRateLimited(req)
return
return true
}

// If no error, forget it.
cc.queue.Forget(req)

return true
}
10 changes: 8 additions & 2 deletions pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,17 +301,22 @@ func (cc *Controller) recordJobEvent(namespace, name string, event vkbatchv1.Job
}

func (cc *Controller) handleCommands() {
for cc.processNextCommand() {
}
}

func (cc *Controller) processNextCommand() bool {
obj, shutdown := cc.commandQueue.Get()
if shutdown {
return
return false
}
cmd := obj.(*vkbusv1.Command)
defer cc.commandQueue.Done(cmd)

if err := cc.vkClients.BusV1alpha1().Commands(cmd.Namespace).Delete(cmd.Name, nil); err != nil {
glog.Errorf("Failed to delete Command <%s/%s>.", cmd.Namespace, cmd.Name)
cc.commandQueue.AddRateLimited(cmd)
return
return true
}
cc.recordJobEvent(cmd.Namespace, cmd.TargetObject.Name,
vkbatchv1.CommandIssued,
Expand All @@ -326,6 +331,7 @@ func (cc *Controller) handleCommands() {

cc.queue.Add(req)

return true
}

func (cc *Controller) updatePodGroup(oldObj, newObj interface{}) {
Expand Down

0 comments on commit be49d98

Please sign in to comment.