diff --git a/pkg/controllers/job/job_controller.go b/pkg/controllers/job/job_controller.go index c5504f17e1..4250af699f 100644 --- a/pkg/controllers/job/job_controller.go +++ b/pkg/controllers/job/job_controller.go @@ -23,7 +23,6 @@ import ( "github.com/golang/glog" "hash" "hash/fnv" - "sync" "time" "k8s.io/api/core/v1" @@ -106,15 +105,12 @@ type Controller struct { recorder record.EventRecorder priorityClasses map[string]*v1beta1.PriorityClass - sync.Mutex errTasks workqueue.RateLimitingInterface - - - + // To protect the queue list by processing multiple workers + lock sync.RWMutex workers uint32 - } // NewJobController create new Job Controller @@ -271,7 +267,7 @@ func (cc *Controller) belongsToThisRoutine(key string, count uint32) bool { } // TODO we may need to make this sharding more proper if required -func (cc *Controller) getWorkerID(key string) uint32 { +func (cc *Controller) getWorkerID(key string) workqueue.RateLimitingInterface { var hashVal hash.Hash32 var val uint32 @@ -280,42 +276,33 @@ func (cc *Controller) getWorkerID(key string) uint32 { val = hashVal.Sum32() - return val % cc.workers + cc.lock.Lock() + queue := cc.queueList[val%cc.workers] + cc.lock.Unlock() + + return queue } func (cc *Controller) processNextReq(count uint32) bool { - obj, shutdown := cc.queueList[count].Get() + cc.lock.Lock() + queue := cc.queueList[count] + cc.lock.Unlock() + obj, shutdown := queue.Get() if shutdown { glog.Errorf("Fail to pop item from queue") return false } req := obj.(apis.Request) - defer cc.queueList[count].Done(req) + defer queue.Done(req) key := jobcache.JobKeyByReq(&req) // Later we can remove this code if we want if !cc.belongsToThisRoutine(key, count) { glog.Errorf("should not occur The job does not belongs to this routine key:%s, worker:%d...... ", key, count) - cc.queueList[count].AddRateLimited(req) - return true - } - - // prevent multi threads processing the same job simultaneously. - cc.jobsLock.Lock() - if cc.jobsMap[key] { - // the job is being processed by some other thread - cc.queueList[count].AddRateLimited(req) - cc.jobsLock.Unlock() + queueLocal := cc.getWorkerID(key) + queueLocal.Add(req) return true - } else { - cc.jobsMap[key] = true - cc.jobsLock.Unlock() - defer func() { - cc.jobsLock.Lock() - delete(cc.jobsMap, key) - cc.jobsLock.Unlock() - }() } glog.V(3).Infof("Try to handle request <%v>", req) @@ -347,12 +334,12 @@ func (cc *Controller) processNextReq(count uint32) bool { glog.Errorf("Failed to handle Job <%s/%s>: %v", jobInfo.Job.Namespace, jobInfo.Job.Name, err) // If any error, requeue it. - cc.queueList[count].AddRateLimited(req) + queue.AddRateLimited(req) return true } // If no error, forget it. - cc.queueList[count].Forget(req) + queue.Forget(req) return true } diff --git a/pkg/controllers/job/job_controller_handler.go b/pkg/controllers/job/job_controller_handler.go index 80d6388f4a..7702b42fe3 100644 --- a/pkg/controllers/job/job_controller_handler.go +++ b/pkg/controllers/job/job_controller_handler.go @@ -66,8 +66,8 @@ func (cc *Controller) addJob(obj interface{}) { job.Namespace, job.Name, err) } key := vkcache.JobKeyByReq(&req) - i := cc.getWorkerID(key) - cc.queueList[i].Add(req) + queue := cc.getWorkerID(key) + queue.Add(req) } func (cc *Controller) updateJob(oldObj, newObj interface{}) { @@ -103,8 +103,8 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) { } key := vkcache.JobKeyByReq(&req) - i := cc.getWorkerID(key) - cc.queueList[i].Add(req) + queue := cc.getWorkerID(key) + queue.Add(req) } func (cc *Controller) deleteJob(obj interface{}) { @@ -170,8 +170,8 @@ func (cc *Controller) addPod(obj interface{}) { pod.Namespace, pod.Name, err) } key := vkcache.JobKeyByReq(&req) - i := cc.getWorkerID(key) - cc.queueList[i].Add(req) + queue := cc.getWorkerID(key) + queue.Add(req) } func (cc *Controller) updatePod(oldObj, newObj interface{}) { @@ -248,8 +248,8 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) { } key := vkcache.JobKeyByReq(&req) - i := cc.getWorkerID(key) - cc.queueList[i].Add(req) + queue := cc.getWorkerID(key) + queue.Add(req) } func (cc *Controller) deletePod(obj interface{}) { @@ -311,8 +311,8 @@ func (cc *Controller) deletePod(obj interface{}) { } key := vkcache.JobKeyByReq(&req) - i := cc.getWorkerID(key) - cc.queueList[i].Add(req) + queue := cc.getWorkerID(key) + queue.Add(req) } func (cc *Controller) recordJobEvent(namespace, name string, event vkbatchv1.JobEvent, message string) { @@ -358,8 +358,8 @@ func (cc *Controller) processNextCommand() bool { } key := vkcache.JobKeyByReq(&req) - i := cc.getWorkerID(key) - cc.queueList[i].Add(req) + queue := cc.getWorkerID(key) + queue.Add(req) return true } @@ -395,8 +395,8 @@ func (cc *Controller) updatePodGroup(oldObj, newObj interface{}) { req.Action = vkbatchv1.EnqueueAction } key := vkcache.JobKeyByReq(&req) - i := cc.getWorkerID(key) - cc.queueList[i].Add(req) + queue := cc.getWorkerID(key) + queue.Add(req) } }