Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

some minor cleanup and optmization #57

Merged
merged 1 commit into from
Apr 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions pkg/controllers/job/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"sync"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move sync close to fmt.


"github.com/golang/glog"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -268,17 +267,22 @@ func (jc jobCache) TaskCompleted(jobKey, taskName string) bool {
return completed >= taskReplicas
}

func (jc *jobCache) processCleanupJob() {
func (jc *jobCache) worker() {
for jc.processCleanupJob() {
}
}

func (jc *jobCache) processCleanupJob() bool {
obj, shutdown := jc.deletedJobs.Get()
if shutdown {
return
return false
}
defer jc.deletedJobs.Done(obj)

job, ok := obj.(*apis.JobInfo)
if !ok {
glog.Errorf("failed to convert %v to *apis.JobInfo", obj)
return
return true
}

jc.Mutex.Lock()
Expand All @@ -293,6 +297,7 @@ func (jc *jobCache) processCleanupJob() {
// Retry
jc.deleteJob(job)
}
return true
}

func (jc *jobCache) deleteJob(job *apis.JobInfo) {
Expand Down
51 changes: 27 additions & 24 deletions pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
Expand Down Expand Up @@ -60,12 +59,10 @@ type Controller struct {
vkClients *vkver.Clientset
kbClients *kbver.Clientset

jobInformer vkbatchinfo.JobInformer
podInformer coreinformers.PodInformer
pvcInformer coreinformers.PersistentVolumeClaimInformer
pgInformer kbinfo.PodGroupInformer
svcInformer coreinformers.ServiceInformer
cmdInformer vkcoreinfo.CommandInformer
jobInformer vkbatchinfo.JobInformer
pgInformer kbinfo.PodGroupInformer
cmdInformer vkcoreinfo.CommandInformer
sharedInformers informers.SharedInformerFactory

// A store of jobs
jobLister vkbatchlister.JobLister
Expand Down Expand Up @@ -152,9 +149,10 @@ func NewJobController(config *rest.Config) *Controller {
cc.cmdLister = cc.cmdInformer.Lister()
cc.cmdSynced = cc.cmdInformer.Informer().HasSynced

cc.podInformer = informers.NewSharedInformerFactory(cc.kubeClients, 0).Core().V1().Pods()
cc.sharedInformers = informers.NewSharedInformerFactory(cc.kubeClients, 0)
podInformer := cc.sharedInformers.Core().V1().Pods()

cc.podInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
podInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
Expand All @@ -177,16 +175,16 @@ func NewJobController(config *rest.Config) *Controller {
},
})

cc.podLister = cc.podInformer.Lister()
cc.podSynced = cc.podInformer.Informer().HasSynced
cc.podLister = podInformer.Lister()
cc.podSynced = podInformer.Informer().HasSynced

cc.pvcInformer = informers.NewSharedInformerFactory(cc.kubeClients, 0).Core().V1().PersistentVolumeClaims()
cc.pvcLister = cc.pvcInformer.Lister()
cc.pvcSynced = cc.pvcInformer.Informer().HasSynced
pvcInformer := cc.sharedInformers.Core().V1().PersistentVolumeClaims()
cc.pvcLister = pvcInformer.Lister()
cc.pvcSynced = pvcInformer.Informer().HasSynced

cc.svcInformer = informers.NewSharedInformerFactory(cc.kubeClients, 0).Core().V1().Services()
cc.svcLister = cc.svcInformer.Lister()
cc.svcSynced = cc.svcInformer.Informer().HasSynced
svcInformer := cc.sharedInformers.Core().V1().Services()
cc.svcLister = svcInformer.Lister()
cc.svcSynced = svcInformer.Informer().HasSynced

cc.pgInformer = kbinfoext.NewSharedInformerFactory(cc.kbClients, 0).Scheduling().V1alpha1().PodGroups()
cc.pgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand All @@ -205,11 +203,9 @@ func NewJobController(config *rest.Config) *Controller {
// Run start JobController
func (cc *Controller) Run(stopCh <-chan struct{}) {
go cc.jobInformer.Informer().Run(stopCh)
go cc.podInformer.Informer().Run(stopCh)
go cc.pvcInformer.Informer().Run(stopCh)
go cc.pgInformer.Informer().Run(stopCh)
go cc.svcInformer.Informer().Run(stopCh)
go cc.cmdInformer.Informer().Run(stopCh)
go cc.sharedInformers.Start(stopCh)

cache.WaitForCacheSync(stopCh, cc.jobSynced, cc.podSynced, cc.pgSynced,
cc.svcSynced, cc.cmdSynced, cc.pvcSynced)
Expand All @@ -223,10 +219,15 @@ func (cc *Controller) Run(stopCh <-chan struct{}) {
}

func (cc *Controller) worker() {
for cc.processNextReq() {
}
}

func (cc *Controller) processNextReq() bool {
obj, shutdown := cc.queue.Get()
if shutdown {
glog.Errorf("Fail to pop item from queue")
return
return false
}

req := obj.(apis.Request)
Expand All @@ -238,14 +239,14 @@ func (cc *Controller) worker() {
if err != nil {
// TODO(k82cn): ignore not-ready error.
glog.Errorf("Failed to get job by <%v> from cache: %v", req, err)
return
return true
}

st := state.NewState(jobInfo)
if st == nil {
glog.Errorf("Invalid state <%s> of Job <%v/%v>",
jobInfo.Job.Status.State, jobInfo.Job.Namespace, jobInfo.Job.Name)
return
return true
}

action := applyPolicies(jobInfo.Job, &req)
Expand All @@ -257,9 +258,11 @@ func (cc *Controller) worker() {
jobInfo.Job.Namespace, jobInfo.Job.Name, err)
// If any error, requeue it.
cc.queue.AddRateLimited(req)
return
return true
}

// If no error, forget it.
cc.queue.Forget(req)

return true
}
10 changes: 8 additions & 2 deletions pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,17 +301,22 @@ func (cc *Controller) recordJobEvent(namespace, name string, event vkbatchv1.Job
}

func (cc *Controller) handleCommands() {
for cc.processNextCommand() {
}
}

func (cc *Controller) processNextCommand() bool {
obj, shutdown := cc.commandQueue.Get()
if shutdown {
return
return false
}
cmd := obj.(*vkbusv1.Command)
defer cc.commandQueue.Done(cmd)

if err := cc.vkClients.BusV1alpha1().Commands(cmd.Namespace).Delete(cmd.Name, nil); err != nil {
glog.Errorf("Failed to delete Command <%s/%s>.", cmd.Namespace, cmd.Name)
cc.commandQueue.AddRateLimited(cmd)
return
return true
}
cc.recordJobEvent(cmd.Namespace, cmd.TargetObject.Name,
vkbatchv1.CommandIssued,
Expand All @@ -326,6 +331,7 @@ func (cc *Controller) handleCommands() {

cc.queue.Add(req)

return true
}

func (cc *Controller) updatePodGroup(oldObj, newObj interface{}) {
Expand Down