Skip to content

Commit

Permalink
Fixed comments
Browse files Browse the repository at this point in the history
  • Loading branch information
SrinivasChilveri committed Jun 27, 2019
1 parent 6a925b9 commit acf0193
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 44 deletions.
47 changes: 17 additions & 30 deletions pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/golang/glog"
"hash"
"hash/fnv"
"sync"
"time"

"k8s.io/api/core/v1"
Expand Down Expand Up @@ -106,15 +105,12 @@ type Controller struct {
recorder record.EventRecorder
priorityClasses map[string]*v1beta1.PriorityClass


sync.Mutex
errTasks workqueue.RateLimitingInterface




// To protect the queue list by processing multiple workers
lock sync.RWMutex
workers uint32

}

// NewJobController create new Job Controller
Expand Down Expand Up @@ -271,7 +267,7 @@ func (cc *Controller) belongsToThisRoutine(key string, count uint32) bool {
}

// TODO we may need to make this sharding more proper if required
func (cc *Controller) getWorkerID(key string) uint32 {
func (cc *Controller) getWorkerID(key string) workqueue.RateLimitingInterface {
var hashVal hash.Hash32
var val uint32

Expand All @@ -280,42 +276,33 @@ func (cc *Controller) getWorkerID(key string) uint32 {

val = hashVal.Sum32()

return val % cc.workers
cc.lock.Lock()
queue := cc.queueList[val%cc.workers]
cc.lock.Unlock()

return queue
}

func (cc *Controller) processNextReq(count uint32) bool {
obj, shutdown := cc.queueList[count].Get()
cc.lock.Lock()
queue := cc.queueList[count]
cc.lock.Unlock()
obj, shutdown := queue.Get()
if shutdown {
glog.Errorf("Fail to pop item from queue")
return false
}

req := obj.(apis.Request)
defer cc.queueList[count].Done(req)
defer queue.Done(req)

key := jobcache.JobKeyByReq(&req)
// Later we can remove this code if we want
if !cc.belongsToThisRoutine(key, count) {
glog.Errorf("should not occur The job does not belongs to this routine key:%s, worker:%d...... ", key, count)
cc.queueList[count].AddRateLimited(req)
return true
}

// prevent multi threads processing the same job simultaneously.
cc.jobsLock.Lock()
if cc.jobsMap[key] {
// the job is being processed by some other thread
cc.queueList[count].AddRateLimited(req)
cc.jobsLock.Unlock()
queueLocal := cc.getWorkerID(key)
queueLocal.Add(req)
return true
} else {
cc.jobsMap[key] = true
cc.jobsLock.Unlock()
defer func() {
cc.jobsLock.Lock()
delete(cc.jobsMap, key)
cc.jobsLock.Unlock()
}()
}

glog.V(3).Infof("Try to handle request <%v>", req)
Expand Down Expand Up @@ -347,12 +334,12 @@ func (cc *Controller) processNextReq(count uint32) bool {
glog.Errorf("Failed to handle Job <%s/%s>: %v",
jobInfo.Job.Namespace, jobInfo.Job.Name, err)
// If any error, requeue it.
cc.queueList[count].AddRateLimited(req)
queue.AddRateLimited(req)
return true
}

// If no error, forget it.
cc.queueList[count].Forget(req)
queue.Forget(req)

return true
}
28 changes: 14 additions & 14 deletions pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func (cc *Controller) addJob(obj interface{}) {
job.Namespace, job.Name, err)
}
key := vkcache.JobKeyByReq(&req)
i := cc.getWorkerID(key)
cc.queueList[i].Add(req)
queue := cc.getWorkerID(key)
queue.Add(req)
}

func (cc *Controller) updateJob(oldObj, newObj interface{}) {
Expand Down Expand Up @@ -103,8 +103,8 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) {
}

key := vkcache.JobKeyByReq(&req)
i := cc.getWorkerID(key)
cc.queueList[i].Add(req)
queue := cc.getWorkerID(key)
queue.Add(req)
}

func (cc *Controller) deleteJob(obj interface{}) {
Expand Down Expand Up @@ -170,8 +170,8 @@ func (cc *Controller) addPod(obj interface{}) {
pod.Namespace, pod.Name, err)
}
key := vkcache.JobKeyByReq(&req)
i := cc.getWorkerID(key)
cc.queueList[i].Add(req)
queue := cc.getWorkerID(key)
queue.Add(req)
}

func (cc *Controller) updatePod(oldObj, newObj interface{}) {
Expand Down Expand Up @@ -248,8 +248,8 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) {
}

key := vkcache.JobKeyByReq(&req)
i := cc.getWorkerID(key)
cc.queueList[i].Add(req)
queue := cc.getWorkerID(key)
queue.Add(req)
}

func (cc *Controller) deletePod(obj interface{}) {
Expand Down Expand Up @@ -311,8 +311,8 @@ func (cc *Controller) deletePod(obj interface{}) {
}

key := vkcache.JobKeyByReq(&req)
i := cc.getWorkerID(key)
cc.queueList[i].Add(req)
queue := cc.getWorkerID(key)
queue.Add(req)
}

func (cc *Controller) recordJobEvent(namespace, name string, event vkbatchv1.JobEvent, message string) {
Expand Down Expand Up @@ -358,8 +358,8 @@ func (cc *Controller) processNextCommand() bool {
}

key := vkcache.JobKeyByReq(&req)
i := cc.getWorkerID(key)
cc.queueList[i].Add(req)
queue := cc.getWorkerID(key)
queue.Add(req)

return true
}
Expand Down Expand Up @@ -395,8 +395,8 @@ func (cc *Controller) updatePodGroup(oldObj, newObj interface{}) {
req.Action = vkbatchv1.EnqueueAction
}
key := vkcache.JobKeyByReq(&req)
i := cc.getWorkerID(key)
cc.queueList[i].Add(req)
queue := cc.getWorkerID(key)
queue.Add(req)
}
}

Expand Down

0 comments on commit acf0193

Please sign in to comment.