From 07471c35e8a43dc8ba097384eecfae601f5540fa Mon Sep 17 00:00:00 2001 From: SrinivasChilveri Date: Mon, 20 May 2019 11:01:25 +0530 Subject: [PATCH 1/5] sync job works concurrently --- cmd/controllers/app/options/options.go | 10 ++++++++-- cmd/controllers/app/server.go | 2 +- pkg/controllers/job/job_controller.go | 8 ++++++-- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/cmd/controllers/app/options/options.go b/cmd/controllers/app/options/options.go index d0ea3fe9ca..f07add9160 100644 --- a/cmd/controllers/app/options/options.go +++ b/cmd/controllers/app/options/options.go @@ -23,8 +23,9 @@ import ( ) const ( - defaultQPS = 50.0 - defaultBurst = 100 + defaultQPS = 50.0 + defaultBurst = 100 + defaultWorkers = 3 ) // ServerOption is the main context object for the controller manager. @@ -36,6 +37,9 @@ type ServerOption struct { KubeAPIBurst int KubeAPIQPS float32 PrintVersion bool + // WorkerThreads is the number of threads syncing job operations + // concurrently. Larger number = faster job updating,but more CPU load. + WorkerThreads int32 } // NewServerOption creates a new CMServer with a default config. @@ -54,6 +58,8 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) { fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", defaultQPS, "QPS to use while talking with kubernetes apiserver") fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", defaultBurst, "Burst to use while talking with kubernetes apiserver") fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit") + fs.Int32Var(&s.WorkerThreads, "worker-threads", defaultWorkers, "The number of threads syncing job operations concurrently. "+ + "Larger number = faster job updating, but more CPU load") } // CheckOptionOrDie checks the LockObjectNamespace diff --git a/cmd/controllers/app/server.go b/cmd/controllers/app/server.go index f3748e3555..7da652244a 100644 --- a/cmd/controllers/app/server.go +++ b/cmd/controllers/app/server.go @@ -90,7 +90,7 @@ func Run(opt *options.ServerOption) error { garbageCollector := garbagecollector.New(vkClient) run := func(ctx context.Context) { - go jobController.Run(ctx.Done()) + go jobController.Run((int)(opt.WorkerThreads), ctx.Done()) go queueController.Run(ctx.Done()) go garbageCollector.Run(ctx.Done()) <-ctx.Done() diff --git a/pkg/controllers/job/job_controller.go b/pkg/controllers/job/job_controller.go index 66e600a0db..048d5ada25 100644 --- a/pkg/controllers/job/job_controller.go +++ b/pkg/controllers/job/job_controller.go @@ -21,6 +21,7 @@ import ( "sync" "github.com/golang/glog" + "time" "k8s.io/api/core/v1" "k8s.io/api/scheduling/v1beta1" @@ -191,7 +192,7 @@ func NewJobController( } // Run start JobController -func (cc *Controller) Run(stopCh <-chan struct{}) { +func (cc *Controller) Run(workers int, stopCh <-chan struct{}) { go cc.sharedInformers.Start(stopCh) go cc.jobInformer.Informer().Run(stopCh) go cc.podInformer.Informer().Run(stopCh) @@ -205,7 +206,10 @@ func (cc *Controller) Run(stopCh <-chan struct{}) { cc.svcSynced, cc.cmdSynced, cc.pvcSynced, cc.pcSynced) go wait.Until(cc.handleCommands, 0, stopCh) - go wait.Until(cc.worker, 0, stopCh) + + for i := 0; i < workers; i++ { + go wait.Until(cc.worker, time.Second, stopCh) + } go cc.cache.Run(stopCh) From 752158950138ecfa728f97bcd25e064673c9cc39 Mon Sep 17 00:00:00 2001 From: SrinivasChilveri Date: Wed, 29 May 2019 18:13:29 +0530 Subject: [PATCH 2/5] preventing the same job handling by multiple workers --- pkg/controllers/job/job_controller.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/pkg/controllers/job/job_controller.go b/pkg/controllers/job/job_controller.go index 048d5ada25..da6a16e81e 100644 --- a/pkg/controllers/job/job_controller.go +++ b/pkg/controllers/job/job_controller.go @@ -21,6 +21,7 @@ import ( "sync" "github.com/golang/glog" + "sync" "time" "k8s.io/api/core/v1" @@ -105,6 +106,7 @@ type Controller struct { sync.Mutex errTasks workqueue.RateLimitingInterface + } // NewJobController create new Job Controller @@ -130,6 +132,7 @@ func NewJobController( errTasks: newRateLimitingQueue(), recorder: recorder, priorityClasses: make(map[string]*v1beta1.PriorityClass), + } cc.jobInformer = vkinfoext.NewSharedInformerFactory(cc.vkClients, 0).Batch().V1alpha1().Jobs() @@ -234,6 +237,25 @@ func (cc *Controller) processNextReq() bool { req := obj.(apis.Request) defer cc.queue.Done(req) + key := jobcache.JobKeyByReq(&req) + + // prevent multi threads processing the same job simultaneously. + cc.jobsLock.Lock() + if cc.jobsMap[key] { + // the job is being processed by some other thread + cc.queue.AddRateLimited(req) + cc.jobsLock.Unlock() + return true + } else { + cc.jobsMap[key] = true + cc.jobsLock.Unlock() + defer func() { + cc.jobsLock.Lock() + delete(cc.jobsMap, key) + cc.jobsLock.Unlock() + }() + } + glog.V(3).Infof("Try to handle request <%v>", req) jobInfo, err := cc.cache.Get(jobcache.JobKeyByReq(&req)) From 6a925b9809cf5eac4b82862b69b617108cf02add Mon Sep 17 00:00:00 2001 From: SrinivasChilveri Date: Fri, 14 Jun 2019 17:46:41 +0530 Subject: [PATCH 3/5] added sharding with multiple queues --- cmd/controllers/app/options/options.go | 4 +- cmd/controllers/app/options/options_test.go | 9 +- cmd/controllers/app/server.go | 4 +- pkg/controllers/job/job_controller.go | 90 ++++++++++++++++--- pkg/controllers/job/job_controller_handler.go | 28 ++++-- .../job/job_controller_handler_test.go | 7 +- .../job/job_controller_plugins_test.go | 2 +- 7 files changed, 111 insertions(+), 33 deletions(-) diff --git a/cmd/controllers/app/options/options.go b/cmd/controllers/app/options/options.go index f07add9160..b402cefcb5 100644 --- a/cmd/controllers/app/options/options.go +++ b/cmd/controllers/app/options/options.go @@ -39,7 +39,7 @@ type ServerOption struct { PrintVersion bool // WorkerThreads is the number of threads syncing job operations // concurrently. Larger number = faster job updating,but more CPU load. - WorkerThreads int32 + WorkerThreads uint32 } // NewServerOption creates a new CMServer with a default config. @@ -58,7 +58,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) { fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", defaultQPS, "QPS to use while talking with kubernetes apiserver") fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", defaultBurst, "Burst to use while talking with kubernetes apiserver") fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit") - fs.Int32Var(&s.WorkerThreads, "worker-threads", defaultWorkers, "The number of threads syncing job operations concurrently. "+ + fs.Uint32Var(&s.WorkerThreads, "worker-threads", defaultWorkers, "The number of threads syncing job operations concurrently. "+ "Larger number = faster job updating, but more CPU load") } diff --git a/cmd/controllers/app/options/options_test.go b/cmd/controllers/app/options/options_test.go index df6d5b7f22..8d46d10933 100644 --- a/cmd/controllers/app/options/options_test.go +++ b/cmd/controllers/app/options/options_test.go @@ -35,10 +35,11 @@ func TestAddFlags(t *testing.T) { // This is a snapshot of expected options parsed by args. expected := &ServerOption{ - Master: "127.0.0.1", - KubeAPIQPS: defaultQPS, - KubeAPIBurst: 200, - PrintVersion: false, + Master: "127.0.0.1", + KubeAPIQPS: defaultQPS, + KubeAPIBurst: 200, + PrintVersion: false, + WorkerThreads: defaultWorkers, } if !reflect.DeepEqual(expected, s) { diff --git a/cmd/controllers/app/server.go b/cmd/controllers/app/server.go index 7da652244a..0f8359c303 100644 --- a/cmd/controllers/app/server.go +++ b/cmd/controllers/app/server.go @@ -85,12 +85,12 @@ func Run(opt *options.ServerOption) error { kbClient := kbver.NewForConfigOrDie(config) vkClient := vkclient.NewForConfigOrDie(config) - jobController := job.NewJobController(kubeClient, kbClient, vkClient) + jobController := job.NewJobController(kubeClient, kbClient, vkClient, opt.WorkerThreads) queueController := queue.NewQueueController(kubeClient, kbClient) garbageCollector := garbagecollector.New(vkClient) run := func(ctx context.Context) { - go jobController.Run((int)(opt.WorkerThreads), ctx.Done()) + go jobController.Run(opt.WorkerThreads, ctx.Done()) go queueController.Run(ctx.Done()) go garbageCollector.Run(ctx.Done()) <-ctx.Done() diff --git a/pkg/controllers/job/job_controller.go b/pkg/controllers/job/job_controller.go index da6a16e81e..c5504f17e1 100644 --- a/pkg/controllers/job/job_controller.go +++ b/pkg/controllers/job/job_controller.go @@ -21,6 +21,8 @@ import ( "sync" "github.com/golang/glog" + "hash" + "hash/fnv" "sync" "time" @@ -97,16 +99,22 @@ type Controller struct { pcSynced func() bool // queue that need to sync up - queue workqueue.RateLimitingInterface + queueList []workqueue.RateLimitingInterface commandQueue workqueue.RateLimitingInterface cache jobcache.Cache //Job Event recorder recorder record.EventRecorder priorityClasses map[string]*v1beta1.PriorityClass + sync.Mutex errTasks workqueue.RateLimitingInterface + + + + workers uint32 + } // NewJobController create new Job Controller @@ -114,6 +122,8 @@ func NewJobController( kubeClient kubernetes.Interface, kbClient kbver.Interface, vkClient vkver.Interface, + workers uint32, + ) *Controller { //Initialize event client @@ -126,13 +136,17 @@ func NewJobController( kubeClients: kubeClient, vkClients: vkClient, kbClients: kbClient, - queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + queueList: make([]workqueue.RateLimitingInterface, workers, workers), commandQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), cache: jobcache.New(), errTasks: newRateLimitingQueue(), recorder: recorder, priorityClasses: make(map[string]*v1beta1.PriorityClass), - + workers: workers, + } + var i uint32 + for i = 0; i < workers; i++ { + cc.queueList[i] = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) } cc.jobInformer = vkinfoext.NewSharedInformerFactory(cc.vkClients, 0).Batch().V1alpha1().Jobs() @@ -195,7 +209,8 @@ func NewJobController( } // Run start JobController -func (cc *Controller) Run(workers int, stopCh <-chan struct{}) { +func (cc *Controller) Run(workers uint32, stopCh <-chan struct{}) { + go cc.sharedInformers.Start(stopCh) go cc.jobInformer.Informer().Run(stopCh) go cc.podInformer.Informer().Run(stopCh) @@ -209,9 +224,17 @@ func (cc *Controller) Run(workers int, stopCh <-chan struct{}) { cc.svcSynced, cc.cmdSynced, cc.pvcSynced, cc.pcSynced) go wait.Until(cc.handleCommands, 0, stopCh) + var i uint32 + for i = 0; i < workers; i++ { + go func(num uint32) { + wait.Until( + func() { + cc.worker(num) + }, + time.Second, + stopCh) + }(i) - for i := 0; i < workers; i++ { - go wait.Until(cc.worker, time.Second, stopCh) } go cc.cache.Run(stopCh) @@ -222,28 +245,67 @@ func (cc *Controller) Run(workers int, stopCh <-chan struct{}) { glog.Infof("JobController is running ...... ") } -func (cc *Controller) worker() { - for cc.processNextReq() { +func (cc *Controller) worker(i uint32) { + + glog.Infof("worker %d start ...... ", i) + + for cc.processNextReq(i) { } } -func (cc *Controller) processNextReq() bool { - obj, shutdown := cc.queue.Get() +// TODO we may need to make this sharding more proper if required +func (cc *Controller) belongsToThisRoutine(key string, count uint32) bool { + var hashVal hash.Hash32 + var val uint32 + + hashVal = fnv.New32() + hashVal.Write([]byte(key)) + + val = hashVal.Sum32() + + if val%cc.workers == count { + return true + } + + return false +} + +// TODO we may need to make this sharding more proper if required +func (cc *Controller) getWorkerID(key string) uint32 { + var hashVal hash.Hash32 + var val uint32 + + hashVal = fnv.New32() + hashVal.Write([]byte(key)) + + val = hashVal.Sum32() + + return val % cc.workers +} + +func (cc *Controller) processNextReq(count uint32) bool { + obj, shutdown := cc.queueList[count].Get() if shutdown { glog.Errorf("Fail to pop item from queue") return false } req := obj.(apis.Request) - defer cc.queue.Done(req) + defer cc.queueList[count].Done(req) key := jobcache.JobKeyByReq(&req) + // Later we can remove this code if we want + if !cc.belongsToThisRoutine(key, count) { + glog.Errorf("should not occur The job does not belongs to this routine key:%s, worker:%d...... ", key, count) + cc.queueList[count].AddRateLimited(req) + return true + } // prevent multi threads processing the same job simultaneously. cc.jobsLock.Lock() if cc.jobsMap[key] { // the job is being processed by some other thread - cc.queue.AddRateLimited(req) + cc.queueList[count].AddRateLimited(req) cc.jobsLock.Unlock() return true } else { @@ -285,12 +347,12 @@ func (cc *Controller) processNextReq() bool { glog.Errorf("Failed to handle Job <%s/%s>: %v", jobInfo.Job.Namespace, jobInfo.Job.Name, err) // If any error, requeue it. - cc.queue.AddRateLimited(req) + cc.queueList[count].AddRateLimited(req) return true } // If no error, forget it. - cc.queue.Forget(req) + cc.queueList[count].Forget(req) return true } diff --git a/pkg/controllers/job/job_controller_handler.go b/pkg/controllers/job/job_controller_handler.go index 0f521ee9aa..80d6388f4a 100644 --- a/pkg/controllers/job/job_controller_handler.go +++ b/pkg/controllers/job/job_controller_handler.go @@ -65,7 +65,9 @@ func (cc *Controller) addJob(obj interface{}) { glog.Errorf("Failed to add job <%s/%s>: %v in cache", job.Namespace, job.Name, err) } - cc.queue.Add(req) + key := vkcache.JobKeyByReq(&req) + i := cc.getWorkerID(key) + cc.queueList[i].Add(req) } func (cc *Controller) updateJob(oldObj, newObj interface{}) { @@ -100,7 +102,9 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) { Event: vkbatchv1.OutOfSyncEvent, } - cc.queue.Add(req) + key := vkcache.JobKeyByReq(&req) + i := cc.getWorkerID(key) + cc.queueList[i].Add(req) } func (cc *Controller) deleteJob(obj interface{}) { @@ -165,7 +169,9 @@ func (cc *Controller) addPod(obj interface{}) { glog.Errorf("Failed to add Pod <%s/%s>: %v to cache", pod.Namespace, pod.Name, err) } - cc.queue.Add(req) + key := vkcache.JobKeyByReq(&req) + i := cc.getWorkerID(key) + cc.queueList[i].Add(req) } func (cc *Controller) updatePod(oldObj, newObj interface{}) { @@ -241,7 +247,9 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) { JobVersion: int32(dVersion), } - cc.queue.Add(req) + key := vkcache.JobKeyByReq(&req) + i := cc.getWorkerID(key) + cc.queueList[i].Add(req) } func (cc *Controller) deletePod(obj interface{}) { @@ -302,7 +310,9 @@ func (cc *Controller) deletePod(obj interface{}) { pod.Namespace, pod.Name, err) } - cc.queue.Add(req) + key := vkcache.JobKeyByReq(&req) + i := cc.getWorkerID(key) + cc.queueList[i].Add(req) } func (cc *Controller) recordJobEvent(namespace, name string, event vkbatchv1.JobEvent, message string) { @@ -347,7 +357,9 @@ func (cc *Controller) processNextCommand() bool { Action: vkbatchv1.Action(cmd.Action), } - cc.queue.Add(req) + key := vkcache.JobKeyByReq(&req) + i := cc.getWorkerID(key) + cc.queueList[i].Add(req) return true } @@ -382,7 +394,9 @@ func (cc *Controller) updatePodGroup(oldObj, newObj interface{}) { case kbtype.PodGroupInqueue: req.Action = vkbatchv1.EnqueueAction } - cc.queue.Add(req) + key := vkcache.JobKeyByReq(&req) + i := cc.getWorkerID(key) + cc.queueList[i].Add(req) } } diff --git a/pkg/controllers/job/job_controller_handler_test.go b/pkg/controllers/job/job_controller_handler_test.go index b96efcefef..9df646f5e0 100644 --- a/pkg/controllers/job/job_controller_handler_test.go +++ b/pkg/controllers/job/job_controller_handler_test.go @@ -59,7 +59,8 @@ func newController() *Controller { } vkclient := vkclientset.NewForConfigOrDie(config) - controller := NewJobController(kubeClientSet, kubeBatchClientSet, vkclient) + controller := NewJobController(kubeClientSet, kubeBatchClientSet, vkclient, 1) + controller.workers = 1 return controller } @@ -160,7 +161,7 @@ func TestJobAddFunc(t *testing.T) { if job == nil || err != nil { t.Errorf("Error while Adding Job in case %d with error %s", i, err) } - len := controller.queue.Len() + len := controller.queueList[0].Len() if testcase.ExpectValue != len { t.Errorf("case %d (%s): expected: %v, got %v ", i, testcase.Name, testcase.ExpectValue, len) } @@ -511,7 +512,7 @@ func TestUpdatePodGroupFunc(t *testing.T) { for i, testcase := range testCases { controller := newController() controller.updatePodGroup(testcase.oldPodGroup, testcase.newPodGroup) - len := controller.queue.Len() + len := controller.queueList[0].Len() if testcase.ExpectValue != len { t.Errorf("case %d (%s): expected: %v, got %v ", i, testcase.Name, testcase.ExpectValue, len) } diff --git a/pkg/controllers/job/job_controller_plugins_test.go b/pkg/controllers/job/job_controller_plugins_test.go index 2d3d3141aa..8a15a51580 100644 --- a/pkg/controllers/job/job_controller_plugins_test.go +++ b/pkg/controllers/job/job_controller_plugins_test.go @@ -35,7 +35,7 @@ func newFakeController() *Controller { VolcanoClientSet := volcanoclient.NewSimpleClientset() KubeClientSet := kubeclient.NewSimpleClientset() - controller := NewJobController(KubeClientSet, KubeBatchClientSet, VolcanoClientSet) + controller := NewJobController(KubeClientSet, KubeBatchClientSet, VolcanoClientSet, 1) return controller } From acf01934d1256c1f28cfa4926f6c5303bc5c882d Mon Sep 17 00:00:00 2001 From: SrinivasChilveri Date: Tue, 18 Jun 2019 12:59:22 +0530 Subject: [PATCH 4/5] Fixed comments --- pkg/controllers/job/job_controller.go | 47 +++++++------------ pkg/controllers/job/job_controller_handler.go | 28 +++++------ 2 files changed, 31 insertions(+), 44 deletions(-) diff --git a/pkg/controllers/job/job_controller.go b/pkg/controllers/job/job_controller.go index c5504f17e1..4250af699f 100644 --- a/pkg/controllers/job/job_controller.go +++ b/pkg/controllers/job/job_controller.go @@ -23,7 +23,6 @@ import ( "github.com/golang/glog" "hash" "hash/fnv" - "sync" "time" "k8s.io/api/core/v1" @@ -106,15 +105,12 @@ type Controller struct { recorder record.EventRecorder priorityClasses map[string]*v1beta1.PriorityClass - sync.Mutex errTasks workqueue.RateLimitingInterface - - - + // To protect the queue list by processing multiple workers + lock sync.RWMutex workers uint32 - } // NewJobController create new Job Controller @@ -271,7 +267,7 @@ func (cc *Controller) belongsToThisRoutine(key string, count uint32) bool { } // TODO we may need to make this sharding more proper if required -func (cc *Controller) getWorkerID(key string) uint32 { +func (cc *Controller) getWorkerID(key string) workqueue.RateLimitingInterface { var hashVal hash.Hash32 var val uint32 @@ -280,42 +276,33 @@ func (cc *Controller) getWorkerID(key string) uint32 { val = hashVal.Sum32() - return val % cc.workers + cc.lock.Lock() + queue := cc.queueList[val%cc.workers] + cc.lock.Unlock() + + return queue } func (cc *Controller) processNextReq(count uint32) bool { - obj, shutdown := cc.queueList[count].Get() + cc.lock.Lock() + queue := cc.queueList[count] + cc.lock.Unlock() + obj, shutdown := queue.Get() if shutdown { glog.Errorf("Fail to pop item from queue") return false } req := obj.(apis.Request) - defer cc.queueList[count].Done(req) + defer queue.Done(req) key := jobcache.JobKeyByReq(&req) // Later we can remove this code if we want if !cc.belongsToThisRoutine(key, count) { glog.Errorf("should not occur The job does not belongs to this routine key:%s, worker:%d...... ", key, count) - cc.queueList[count].AddRateLimited(req) - return true - } - - // prevent multi threads processing the same job simultaneously. - cc.jobsLock.Lock() - if cc.jobsMap[key] { - // the job is being processed by some other thread - cc.queueList[count].AddRateLimited(req) - cc.jobsLock.Unlock() + queueLocal := cc.getWorkerID(key) + queueLocal.Add(req) return true - } else { - cc.jobsMap[key] = true - cc.jobsLock.Unlock() - defer func() { - cc.jobsLock.Lock() - delete(cc.jobsMap, key) - cc.jobsLock.Unlock() - }() } glog.V(3).Infof("Try to handle request <%v>", req) @@ -347,12 +334,12 @@ func (cc *Controller) processNextReq(count uint32) bool { glog.Errorf("Failed to handle Job <%s/%s>: %v", jobInfo.Job.Namespace, jobInfo.Job.Name, err) // If any error, requeue it. - cc.queueList[count].AddRateLimited(req) + queue.AddRateLimited(req) return true } // If no error, forget it. - cc.queueList[count].Forget(req) + queue.Forget(req) return true } diff --git a/pkg/controllers/job/job_controller_handler.go b/pkg/controllers/job/job_controller_handler.go index 80d6388f4a..7702b42fe3 100644 --- a/pkg/controllers/job/job_controller_handler.go +++ b/pkg/controllers/job/job_controller_handler.go @@ -66,8 +66,8 @@ func (cc *Controller) addJob(obj interface{}) { job.Namespace, job.Name, err) } key := vkcache.JobKeyByReq(&req) - i := cc.getWorkerID(key) - cc.queueList[i].Add(req) + queue := cc.getWorkerID(key) + queue.Add(req) } func (cc *Controller) updateJob(oldObj, newObj interface{}) { @@ -103,8 +103,8 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) { } key := vkcache.JobKeyByReq(&req) - i := cc.getWorkerID(key) - cc.queueList[i].Add(req) + queue := cc.getWorkerID(key) + queue.Add(req) } func (cc *Controller) deleteJob(obj interface{}) { @@ -170,8 +170,8 @@ func (cc *Controller) addPod(obj interface{}) { pod.Namespace, pod.Name, err) } key := vkcache.JobKeyByReq(&req) - i := cc.getWorkerID(key) - cc.queueList[i].Add(req) + queue := cc.getWorkerID(key) + queue.Add(req) } func (cc *Controller) updatePod(oldObj, newObj interface{}) { @@ -248,8 +248,8 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) { } key := vkcache.JobKeyByReq(&req) - i := cc.getWorkerID(key) - cc.queueList[i].Add(req) + queue := cc.getWorkerID(key) + queue.Add(req) } func (cc *Controller) deletePod(obj interface{}) { @@ -311,8 +311,8 @@ func (cc *Controller) deletePod(obj interface{}) { } key := vkcache.JobKeyByReq(&req) - i := cc.getWorkerID(key) - cc.queueList[i].Add(req) + queue := cc.getWorkerID(key) + queue.Add(req) } func (cc *Controller) recordJobEvent(namespace, name string, event vkbatchv1.JobEvent, message string) { @@ -358,8 +358,8 @@ func (cc *Controller) processNextCommand() bool { } key := vkcache.JobKeyByReq(&req) - i := cc.getWorkerID(key) - cc.queueList[i].Add(req) + queue := cc.getWorkerID(key) + queue.Add(req) return true } @@ -395,8 +395,8 @@ func (cc *Controller) updatePodGroup(oldObj, newObj interface{}) { req.Action = vkbatchv1.EnqueueAction } key := vkcache.JobKeyByReq(&req) - i := cc.getWorkerID(key) - cc.queueList[i].Add(req) + queue := cc.getWorkerID(key) + queue.Add(req) } } From eee74aa251fd91ad54b4f6f18ffd09b5a512a5e6 Mon Sep 17 00:00:00 2001 From: SrinivasChilveri Date: Wed, 10 Jul 2019 12:17:01 +0530 Subject: [PATCH 5/5] Fixed all the comments --- cmd/controllers/app/server.go | 2 +- pkg/controllers/job/helpers/helpers.go | 6 ++++ pkg/controllers/job/job_controller.go | 29 +++++-------------- pkg/controllers/job/job_controller_handler.go | 29 ++++++++++--------- .../job/job_controller_handler_test.go | 10 ++++--- .../job/job_controller_plugins_test.go | 2 +- 6 files changed, 37 insertions(+), 41 deletions(-) diff --git a/cmd/controllers/app/server.go b/cmd/controllers/app/server.go index 0f8359c303..96ff61d978 100644 --- a/cmd/controllers/app/server.go +++ b/cmd/controllers/app/server.go @@ -90,7 +90,7 @@ func Run(opt *options.ServerOption) error { garbageCollector := garbagecollector.New(vkClient) run := func(ctx context.Context) { - go jobController.Run(opt.WorkerThreads, ctx.Done()) + go jobController.Run(ctx.Done()) go queueController.Run(ctx.Done()) go garbageCollector.Run(ctx.Done()) <-ctx.Done() diff --git a/pkg/controllers/job/helpers/helpers.go b/pkg/controllers/job/helpers/helpers.go index 430a846cc1..905376e31c 100644 --- a/pkg/controllers/job/helpers/helpers.go +++ b/pkg/controllers/job/helpers/helpers.go @@ -22,6 +22,7 @@ import ( "math/rand" "strings" "time" + "volcano.sh/volcano/pkg/controllers/apis" ) const ( @@ -61,3 +62,8 @@ func genRandomStr(l int) string { func MakeVolumeClaimName(jobName string) string { return fmt.Sprintf(VolumeClaimFmt, jobName, genRandomStr(12)) } + +// GetJobKeyByReq gets the key for the job request +func GetJobKeyByReq(req *apis.Request) string { + return fmt.Sprintf("%s/%s", req.Namespace, req.JobName) +} diff --git a/pkg/controllers/job/job_controller.go b/pkg/controllers/job/job_controller.go index 4250af699f..e0117b1faa 100644 --- a/pkg/controllers/job/job_controller.go +++ b/pkg/controllers/job/job_controller.go @@ -18,13 +18,13 @@ package job import ( "fmt" - "sync" - - "github.com/golang/glog" "hash" "hash/fnv" + "sync" "time" + "github.com/golang/glog" + "k8s.io/api/core/v1" "k8s.io/api/scheduling/v1beta1" "k8s.io/apimachinery/pkg/util/wait" @@ -107,10 +107,7 @@ type Controller struct { sync.Mutex errTasks workqueue.RateLimitingInterface - - // To protect the queue list by processing multiple workers - lock sync.RWMutex - workers uint32 + workers uint32 } // NewJobController create new Job Controller @@ -119,7 +116,6 @@ func NewJobController( kbClient kbver.Interface, vkClient vkver.Interface, workers uint32, - ) *Controller { //Initialize event client @@ -205,7 +201,7 @@ func NewJobController( } // Run start JobController -func (cc *Controller) Run(workers uint32, stopCh <-chan struct{}) { +func (cc *Controller) Run(stopCh <-chan struct{}) { go cc.sharedInformers.Start(stopCh) go cc.jobInformer.Informer().Run(stopCh) @@ -221,7 +217,7 @@ func (cc *Controller) Run(workers uint32, stopCh <-chan struct{}) { go wait.Until(cc.handleCommands, 0, stopCh) var i uint32 - for i = 0; i < workers; i++ { + for i = 0; i < cc.workers; i++ { go func(num uint32) { wait.Until( func() { @@ -230,7 +226,6 @@ func (cc *Controller) Run(workers uint32, stopCh <-chan struct{}) { time.Second, stopCh) }(i) - } go cc.cache.Run(stopCh) @@ -242,14 +237,12 @@ func (cc *Controller) Run(workers uint32, stopCh <-chan struct{}) { } func (cc *Controller) worker(i uint32) { - glog.Infof("worker %d start ...... ", i) for cc.processNextReq(i) { } } -// TODO we may need to make this sharding more proper if required func (cc *Controller) belongsToThisRoutine(key string, count uint32) bool { var hashVal hash.Hash32 var val uint32 @@ -266,8 +259,7 @@ func (cc *Controller) belongsToThisRoutine(key string, count uint32) bool { return false } -// TODO we may need to make this sharding more proper if required -func (cc *Controller) getWorkerID(key string) workqueue.RateLimitingInterface { +func (cc *Controller) getWorkerQueue(key string) workqueue.RateLimitingInterface { var hashVal hash.Hash32 var val uint32 @@ -276,17 +268,13 @@ func (cc *Controller) getWorkerID(key string) workqueue.RateLimitingInterface { val = hashVal.Sum32() - cc.lock.Lock() queue := cc.queueList[val%cc.workers] - cc.lock.Unlock() return queue } func (cc *Controller) processNextReq(count uint32) bool { - cc.lock.Lock() queue := cc.queueList[count] - cc.lock.Unlock() obj, shutdown := queue.Get() if shutdown { glog.Errorf("Fail to pop item from queue") @@ -297,10 +285,9 @@ func (cc *Controller) processNextReq(count uint32) bool { defer queue.Done(req) key := jobcache.JobKeyByReq(&req) - // Later we can remove this code if we want if !cc.belongsToThisRoutine(key, count) { glog.Errorf("should not occur The job does not belongs to this routine key:%s, worker:%d...... ", key, count) - queueLocal := cc.getWorkerID(key) + queueLocal := cc.getWorkerQueue(key) queueLocal.Add(req) return true } diff --git a/pkg/controllers/job/job_controller_handler.go b/pkg/controllers/job/job_controller_handler.go index 7702b42fe3..b164936ef4 100644 --- a/pkg/controllers/job/job_controller_handler.go +++ b/pkg/controllers/job/job_controller_handler.go @@ -34,6 +34,7 @@ import ( "volcano.sh/volcano/pkg/controllers/apis" vkcache "volcano.sh/volcano/pkg/controllers/cache" + vkjobhelpers "volcano.sh/volcano/pkg/controllers/job/helpers" ) func (cc *Controller) addCommand(obj interface{}) { @@ -65,8 +66,8 @@ func (cc *Controller) addJob(obj interface{}) { glog.Errorf("Failed to add job <%s/%s>: %v in cache", job.Namespace, job.Name, err) } - key := vkcache.JobKeyByReq(&req) - queue := cc.getWorkerID(key) + key := vkjobhelpers.GetJobKeyByReq(&req) + queue := cc.getWorkerQueue(key) queue.Add(req) } @@ -102,8 +103,8 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) { Event: vkbatchv1.OutOfSyncEvent, } - key := vkcache.JobKeyByReq(&req) - queue := cc.getWorkerID(key) + key := vkjobhelpers.GetJobKeyByReq(&req) + queue := cc.getWorkerQueue(key) queue.Add(req) } @@ -169,8 +170,8 @@ func (cc *Controller) addPod(obj interface{}) { glog.Errorf("Failed to add Pod <%s/%s>: %v to cache", pod.Namespace, pod.Name, err) } - key := vkcache.JobKeyByReq(&req) - queue := cc.getWorkerID(key) + key := vkjobhelpers.GetJobKeyByReq(&req) + queue := cc.getWorkerQueue(key) queue.Add(req) } @@ -247,8 +248,8 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) { JobVersion: int32(dVersion), } - key := vkcache.JobKeyByReq(&req) - queue := cc.getWorkerID(key) + key := vkjobhelpers.GetJobKeyByReq(&req) + queue := cc.getWorkerQueue(key) queue.Add(req) } @@ -310,8 +311,8 @@ func (cc *Controller) deletePod(obj interface{}) { pod.Namespace, pod.Name, err) } - key := vkcache.JobKeyByReq(&req) - queue := cc.getWorkerID(key) + key := vkjobhelpers.GetJobKeyByReq(&req) + queue := cc.getWorkerQueue(key) queue.Add(req) } @@ -357,8 +358,8 @@ func (cc *Controller) processNextCommand() bool { Action: vkbatchv1.Action(cmd.Action), } - key := vkcache.JobKeyByReq(&req) - queue := cc.getWorkerID(key) + key := vkjobhelpers.GetJobKeyByReq(&req) + queue := cc.getWorkerQueue(key) queue.Add(req) return true @@ -394,8 +395,8 @@ func (cc *Controller) updatePodGroup(oldObj, newObj interface{}) { case kbtype.PodGroupInqueue: req.Action = vkbatchv1.EnqueueAction } - key := vkcache.JobKeyByReq(&req) - queue := cc.getWorkerID(key) + key := vkjobhelpers.GetJobKeyByReq(&req) + queue := cc.getWorkerQueue(key) queue.Add(req) } } diff --git a/pkg/controllers/job/job_controller_handler_test.go b/pkg/controllers/job/job_controller_handler_test.go index 9df646f5e0..59cb928d6c 100644 --- a/pkg/controllers/job/job_controller_handler_test.go +++ b/pkg/controllers/job/job_controller_handler_test.go @@ -59,8 +59,7 @@ func newController() *Controller { } vkclient := vkclientset.NewForConfigOrDie(config) - controller := NewJobController(kubeClientSet, kubeBatchClientSet, vkclient, 1) - controller.workers = 1 + controller := NewJobController(kubeClientSet, kubeBatchClientSet, vkclient, 3) return controller } @@ -161,7 +160,8 @@ func TestJobAddFunc(t *testing.T) { if job == nil || err != nil { t.Errorf("Error while Adding Job in case %d with error %s", i, err) } - len := controller.queueList[0].Len() + queue := controller.getWorkerQueue(key) + len := queue.Len() if testcase.ExpectValue != len { t.Errorf("case %d (%s): expected: %v, got %v ", i, testcase.Name, testcase.ExpectValue, len) } @@ -512,7 +512,9 @@ func TestUpdatePodGroupFunc(t *testing.T) { for i, testcase := range testCases { controller := newController() controller.updatePodGroup(testcase.oldPodGroup, testcase.newPodGroup) - len := controller.queueList[0].Len() + key := fmt.Sprintf("%s/%s", testcase.oldPodGroup.Namespace, testcase.oldPodGroup.Name) + queue := controller.getWorkerQueue(key) + len := queue.Len() if testcase.ExpectValue != len { t.Errorf("case %d (%s): expected: %v, got %v ", i, testcase.Name, testcase.ExpectValue, len) } diff --git a/pkg/controllers/job/job_controller_plugins_test.go b/pkg/controllers/job/job_controller_plugins_test.go index 8a15a51580..cb5f2dd3de 100644 --- a/pkg/controllers/job/job_controller_plugins_test.go +++ b/pkg/controllers/job/job_controller_plugins_test.go @@ -35,7 +35,7 @@ func newFakeController() *Controller { VolcanoClientSet := volcanoclient.NewSimpleClientset() KubeClientSet := kubeclient.NewSimpleClientset() - controller := NewJobController(KubeClientSet, KubeBatchClientSet, VolcanoClientSet, 1) + controller := NewJobController(KubeClientSet, KubeBatchClientSet, VolcanoClientSet, 3) return controller }