Skip to content

Commit

Permalink
Fixed all the comments
Browse files Browse the repository at this point in the history
  • Loading branch information
SrinivasChilveri committed Jul 10, 2019
1 parent acf0193 commit eee74aa
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 41 deletions.
2 changes: 1 addition & 1 deletion cmd/controllers/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func Run(opt *options.ServerOption) error {
garbageCollector := garbagecollector.New(vkClient)

run := func(ctx context.Context) {
go jobController.Run(opt.WorkerThreads, ctx.Done())
go jobController.Run(ctx.Done())
go queueController.Run(ctx.Done())
go garbageCollector.Run(ctx.Done())
<-ctx.Done()
Expand Down
6 changes: 6 additions & 0 deletions pkg/controllers/job/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"math/rand"
"strings"
"time"
"volcano.sh/volcano/pkg/controllers/apis"
)

const (
Expand Down Expand Up @@ -61,3 +62,8 @@ func genRandomStr(l int) string {
func MakeVolumeClaimName(jobName string) string {
return fmt.Sprintf(VolumeClaimFmt, jobName, genRandomStr(12))
}

// GetJobKeyByReq gets the key for the job request
func GetJobKeyByReq(req *apis.Request) string {
return fmt.Sprintf("%s/%s", req.Namespace, req.JobName)
}
29 changes: 8 additions & 21 deletions pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ package job

import (
"fmt"
"sync"

"github.com/golang/glog"
"hash"
"hash/fnv"
"sync"
"time"

"github.com/golang/glog"

"k8s.io/api/core/v1"
"k8s.io/api/scheduling/v1beta1"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -107,10 +107,7 @@ type Controller struct {

sync.Mutex
errTasks workqueue.RateLimitingInterface

// To protect the queue list by processing multiple workers
lock sync.RWMutex
workers uint32
workers uint32
}

// NewJobController create new Job Controller
Expand All @@ -119,7 +116,6 @@ func NewJobController(
kbClient kbver.Interface,
vkClient vkver.Interface,
workers uint32,

) *Controller {

//Initialize event client
Expand Down Expand Up @@ -205,7 +201,7 @@ func NewJobController(
}

// Run start JobController
func (cc *Controller) Run(workers uint32, stopCh <-chan struct{}) {
func (cc *Controller) Run(stopCh <-chan struct{}) {

go cc.sharedInformers.Start(stopCh)
go cc.jobInformer.Informer().Run(stopCh)
Expand All @@ -221,7 +217,7 @@ func (cc *Controller) Run(workers uint32, stopCh <-chan struct{}) {

go wait.Until(cc.handleCommands, 0, stopCh)
var i uint32
for i = 0; i < workers; i++ {
for i = 0; i < cc.workers; i++ {
go func(num uint32) {
wait.Until(
func() {
Expand All @@ -230,7 +226,6 @@ func (cc *Controller) Run(workers uint32, stopCh <-chan struct{}) {
time.Second,
stopCh)
}(i)

}

go cc.cache.Run(stopCh)
Expand All @@ -242,14 +237,12 @@ func (cc *Controller) Run(workers uint32, stopCh <-chan struct{}) {
}

func (cc *Controller) worker(i uint32) {

glog.Infof("worker %d start ...... ", i)

for cc.processNextReq(i) {
}
}

// TODO we may need to make this sharding more proper if required
func (cc *Controller) belongsToThisRoutine(key string, count uint32) bool {
var hashVal hash.Hash32
var val uint32
Expand All @@ -266,8 +259,7 @@ func (cc *Controller) belongsToThisRoutine(key string, count uint32) bool {
return false
}

// TODO we may need to make this sharding more proper if required
func (cc *Controller) getWorkerID(key string) workqueue.RateLimitingInterface {
func (cc *Controller) getWorkerQueue(key string) workqueue.RateLimitingInterface {
var hashVal hash.Hash32
var val uint32

Expand All @@ -276,17 +268,13 @@ func (cc *Controller) getWorkerID(key string) workqueue.RateLimitingInterface {

val = hashVal.Sum32()

cc.lock.Lock()
queue := cc.queueList[val%cc.workers]
cc.lock.Unlock()

return queue
}

func (cc *Controller) processNextReq(count uint32) bool {
cc.lock.Lock()
queue := cc.queueList[count]
cc.lock.Unlock()
obj, shutdown := queue.Get()
if shutdown {
glog.Errorf("Fail to pop item from queue")
Expand All @@ -297,10 +285,9 @@ func (cc *Controller) processNextReq(count uint32) bool {
defer queue.Done(req)

key := jobcache.JobKeyByReq(&req)
// Later we can remove this code if we want
if !cc.belongsToThisRoutine(key, count) {
glog.Errorf("should not occur The job does not belongs to this routine key:%s, worker:%d...... ", key, count)
queueLocal := cc.getWorkerID(key)
queueLocal := cc.getWorkerQueue(key)
queueLocal.Add(req)
return true
}
Expand Down
29 changes: 15 additions & 14 deletions pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"volcano.sh/volcano/pkg/controllers/apis"
vkcache "volcano.sh/volcano/pkg/controllers/cache"
vkjobhelpers "volcano.sh/volcano/pkg/controllers/job/helpers"
)

func (cc *Controller) addCommand(obj interface{}) {
Expand Down Expand Up @@ -65,8 +66,8 @@ func (cc *Controller) addJob(obj interface{}) {
glog.Errorf("Failed to add job <%s/%s>: %v in cache",
job.Namespace, job.Name, err)
}
key := vkcache.JobKeyByReq(&req)
queue := cc.getWorkerID(key)
key := vkjobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
}

Expand Down Expand Up @@ -102,8 +103,8 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) {
Event: vkbatchv1.OutOfSyncEvent,
}

key := vkcache.JobKeyByReq(&req)
queue := cc.getWorkerID(key)
key := vkjobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
}

Expand Down Expand Up @@ -169,8 +170,8 @@ func (cc *Controller) addPod(obj interface{}) {
glog.Errorf("Failed to add Pod <%s/%s>: %v to cache",
pod.Namespace, pod.Name, err)
}
key := vkcache.JobKeyByReq(&req)
queue := cc.getWorkerID(key)
key := vkjobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
}

Expand Down Expand Up @@ -247,8 +248,8 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) {
JobVersion: int32(dVersion),
}

key := vkcache.JobKeyByReq(&req)
queue := cc.getWorkerID(key)
key := vkjobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
}

Expand Down Expand Up @@ -310,8 +311,8 @@ func (cc *Controller) deletePod(obj interface{}) {
pod.Namespace, pod.Name, err)
}

key := vkcache.JobKeyByReq(&req)
queue := cc.getWorkerID(key)
key := vkjobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
}

Expand Down Expand Up @@ -357,8 +358,8 @@ func (cc *Controller) processNextCommand() bool {
Action: vkbatchv1.Action(cmd.Action),
}

key := vkcache.JobKeyByReq(&req)
queue := cc.getWorkerID(key)
key := vkjobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)

return true
Expand Down Expand Up @@ -394,8 +395,8 @@ func (cc *Controller) updatePodGroup(oldObj, newObj interface{}) {
case kbtype.PodGroupInqueue:
req.Action = vkbatchv1.EnqueueAction
}
key := vkcache.JobKeyByReq(&req)
queue := cc.getWorkerID(key)
key := vkjobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
}
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/controllers/job/job_controller_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ func newController() *Controller {
}

vkclient := vkclientset.NewForConfigOrDie(config)
controller := NewJobController(kubeClientSet, kubeBatchClientSet, vkclient, 1)
controller.workers = 1
controller := NewJobController(kubeClientSet, kubeBatchClientSet, vkclient, 3)

return controller
}
Expand Down Expand Up @@ -161,7 +160,8 @@ func TestJobAddFunc(t *testing.T) {
if job == nil || err != nil {
t.Errorf("Error while Adding Job in case %d with error %s", i, err)
}
len := controller.queueList[0].Len()
queue := controller.getWorkerQueue(key)
len := queue.Len()
if testcase.ExpectValue != len {
t.Errorf("case %d (%s): expected: %v, got %v ", i, testcase.Name, testcase.ExpectValue, len)
}
Expand Down Expand Up @@ -512,7 +512,9 @@ func TestUpdatePodGroupFunc(t *testing.T) {
for i, testcase := range testCases {
controller := newController()
controller.updatePodGroup(testcase.oldPodGroup, testcase.newPodGroup)
len := controller.queueList[0].Len()
key := fmt.Sprintf("%s/%s", testcase.oldPodGroup.Namespace, testcase.oldPodGroup.Name)
queue := controller.getWorkerQueue(key)
len := queue.Len()
if testcase.ExpectValue != len {
t.Errorf("case %d (%s): expected: %v, got %v ", i, testcase.Name, testcase.ExpectValue, len)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/job_controller_plugins_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func newFakeController() *Controller {
VolcanoClientSet := volcanoclient.NewSimpleClientset()
KubeClientSet := kubeclient.NewSimpleClientset()

controller := NewJobController(KubeClientSet, KubeBatchClientSet, VolcanoClientSet, 1)
controller := NewJobController(KubeClientSet, KubeBatchClientSet, VolcanoClientSet, 3)
return controller
}

Expand Down

0 comments on commit eee74aa

Please sign in to comment.