Skip to content

Commit

Permalink
added sharding with multiple queues
Browse files Browse the repository at this point in the history
  • Loading branch information
SrinivasChilveri committed Jun 17, 2019
1 parent 9247fc8 commit c00d12c
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 32 deletions.
4 changes: 2 additions & 2 deletions cmd/controllers/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type ServerOption struct {
PrintVersion bool
// WorkerThreads is the number of threads syncing job operations
// concurrently. Larger number = faster job updating,but more CPU load.
WorkerThreads int32
WorkerThreads uint32
}

// NewServerOption creates a new CMServer with a default config.
Expand All @@ -58,7 +58,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", defaultQPS, "QPS to use while talking with kubernetes apiserver")
fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", defaultBurst, "Burst to use while talking with kubernetes apiserver")
fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit")
fs.Int32Var(&s.WorkerThreads, "worker-threads", defaultWorkers, "The number of threads syncing job operations concurrently. "+
fs.Uint32Var(&s.WorkerThreads, "worker-threads", defaultWorkers, "The number of threads syncing job operations concurrently. "+
"Larger number = faster job updating, but more CPU load")
}

Expand Down
9 changes: 5 additions & 4 deletions cmd/controllers/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ func TestAddFlags(t *testing.T) {

// This is a snapshot of expected options parsed by args.
expected := &ServerOption{
Master: "127.0.0.1",
KubeAPIQPS: defaultQPS,
KubeAPIBurst: 200,
PrintVersion: false,
Master: "127.0.0.1",
KubeAPIQPS: defaultQPS,
KubeAPIBurst: 200,
PrintVersion: false,
WorkerThreads: defaultWorkers,
}

if !reflect.DeepEqual(expected, s) {
Expand Down
4 changes: 2 additions & 2 deletions cmd/controllers/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ func Run(opt *options.ServerOption) error {
kbClient := kbver.NewForConfigOrDie(config)
vkClient := vkclient.NewForConfigOrDie(config)

jobController := job.NewJobController(kubeClient, kbClient, vkClient)
jobController := job.NewJobController(kubeClient, kbClient, vkClient, opt.WorkerThreads)
queueController := queue.NewQueueController(kubeClient, kbClient)
garbageCollector := garbagecollector.New(vkClient)

run := func(ctx context.Context) {
go jobController.Run((int)(opt.WorkerThreads), ctx.Done())
go jobController.Run(opt.WorkerThreads, ctx.Done())
go queueController.Run(ctx.Done())
go garbageCollector.Run(ctx.Done())
<-ctx.Done()
Expand Down
85 changes: 72 additions & 13 deletions pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package job

import (
"github.com/golang/glog"
"hash"
"hash/fnv"
"sync"
"time"

Expand Down Expand Up @@ -93,7 +95,7 @@ type Controller struct {
pcSynced func() bool

// queue that need to sync up
queue workqueue.RateLimitingInterface
queueList []workqueue.RateLimitingInterface
commandQueue workqueue.RateLimitingInterface
cache jobcache.Cache
//Job Event recorder
Expand All @@ -104,13 +106,17 @@ type Controller struct {
jobsLock sync.RWMutex
// The jobsMap stores the flag of the jobs that are being handled by the workers
jobsMap map[string]bool

workers uint32
}

// NewJobController create new Job Controller
func NewJobController(
kubeClient kubernetes.Interface,
kbClient kbver.Interface,
vkClient vkver.Interface,
workers uint32,

) *Controller {

//Initialize event client
Expand All @@ -123,12 +129,17 @@ func NewJobController(
kubeClients: kubeClient,
vkClients: vkClient,
kbClients: kbClient,
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
queueList: make([]workqueue.RateLimitingInterface, workers, workers),
commandQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
cache: jobcache.New(),
recorder: recorder,
priorityClasses: make(map[string]*v1beta1.PriorityClass),
jobsMap: map[string]bool{},
workers: workers,
}
var i uint32
for i = 0; i < workers; i++ {
cc.queueList[i] = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
}

cc.jobInformer = vkinfoext.NewSharedInformerFactory(cc.vkClients, 0).Batch().V1alpha1().Jobs()
Expand Down Expand Up @@ -191,7 +202,8 @@ func NewJobController(
}

// Run start JobController
func (cc *Controller) Run(workers int, stopCh <-chan struct{}) {
func (cc *Controller) Run(workers uint32, stopCh <-chan struct{}) {

go cc.sharedInformers.Start(stopCh)
go cc.jobInformer.Informer().Run(stopCh)
go cc.podInformer.Informer().Run(stopCh)
Expand All @@ -205,38 +217,85 @@ func (cc *Controller) Run(workers int, stopCh <-chan struct{}) {
cc.svcSynced, cc.cmdSynced, cc.pvcSynced, cc.pcSynced)

go wait.Until(cc.handleCommands, 0, stopCh)
var i uint32
for i = 0; i < workers; i++ {
go func(num uint32) {
wait.Until(
func() {
cc.worker(num)
},
time.Second,
stopCh)
}(i)

for i := 0; i < workers; i++ {
go wait.Until(cc.worker, time.Second, stopCh)
}

go cc.cache.Run(stopCh)

glog.Infof("JobController is running ...... ")
}

func (cc *Controller) worker() {
for cc.processNextReq() {
func (cc *Controller) worker(i uint32) {

glog.Infof("worker %d start ...... ", i)

for cc.processNextReq(i) {
}
}

func (cc *Controller) processNextReq() bool {
obj, shutdown := cc.queue.Get()
// TODO we may need to make this sharding more proper if required
func (cc *Controller) belongsToThisRoutine(key string, count uint32) bool {
var hashVal hash.Hash32
var val uint32

hashVal = fnv.New32()
hashVal.Write([]byte(key))

val = hashVal.Sum32()

if val%cc.workers == count {
return true
}

return false
}

// TODO we may need to make this sharding more proper if required
func (cc *Controller) getWorkerID(key string) uint32 {
var hashVal hash.Hash32
var val uint32

hashVal = fnv.New32()
hashVal.Write([]byte(key))

val = hashVal.Sum32()

return val % cc.workers
}

func (cc *Controller) processNextReq(count uint32) bool {
obj, shutdown := cc.queueList[count].Get()
if shutdown {
glog.Errorf("Fail to pop item from queue")
return false
}

req := obj.(apis.Request)
defer cc.queue.Done(req)
defer cc.queueList[count].Done(req)

key := jobcache.JobKeyByReq(&req)
// Later we can remove this code if we want
if !cc.belongsToThisRoutine(key, count) {
glog.Errorf("should not occur The job does not belongs to this routine key:%s, worker:%d...... ", key, count)
cc.queueList[count].AddRateLimited(req)
return true
}

// prevent multi threads processing the same job simultaneously.
cc.jobsLock.Lock()
if cc.jobsMap[key] {
// the job is being processed by some other thread
cc.queue.AddRateLimited(req)
cc.queueList[count].AddRateLimited(req)
cc.jobsLock.Unlock()
return true
} else {
Expand Down Expand Up @@ -273,12 +332,12 @@ func (cc *Controller) processNextReq() bool {
glog.Errorf("Failed to handle Job <%s/%s>: %v",
jobInfo.Job.Namespace, jobInfo.Job.Name, err)
// If any error, requeue it.
cc.queue.AddRateLimited(req)
cc.queueList[count].AddRateLimited(req)
return true
}

// If no error, forget it.
cc.queue.Forget(req)
cc.queueList[count].Forget(req)

return true
}
28 changes: 21 additions & 7 deletions pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ func (cc *Controller) addJob(obj interface{}) {
glog.Errorf("Failed to add job <%s/%s>: %v in cache",
job.Namespace, job.Name, err)
}
cc.queue.Add(req)
key := vkcache.JobKeyByReq(&req)
i := cc.getWorkerID(key)
cc.queueList[i].Add(req)
}

func (cc *Controller) updateJob(oldObj, newObj interface{}) {
Expand Down Expand Up @@ -100,7 +102,9 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) {
Event: vkbatchv1.OutOfSyncEvent,
}

cc.queue.Add(req)
key := vkcache.JobKeyByReq(&req)
i := cc.getWorkerID(key)
cc.queueList[i].Add(req)
}

func (cc *Controller) deleteJob(obj interface{}) {
Expand Down Expand Up @@ -165,7 +169,9 @@ func (cc *Controller) addPod(obj interface{}) {
glog.Errorf("Failed to add Pod <%s/%s>: %v to cache",
pod.Namespace, pod.Name, err)
}
cc.queue.Add(req)
key := vkcache.JobKeyByReq(&req)
i := cc.getWorkerID(key)
cc.queueList[i].Add(req)
}

func (cc *Controller) updatePod(oldObj, newObj interface{}) {
Expand Down Expand Up @@ -241,7 +247,9 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) {
JobVersion: int32(dVersion),
}

cc.queue.Add(req)
key := vkcache.JobKeyByReq(&req)
i := cc.getWorkerID(key)
cc.queueList[i].Add(req)
}

func (cc *Controller) deletePod(obj interface{}) {
Expand Down Expand Up @@ -302,7 +310,9 @@ func (cc *Controller) deletePod(obj interface{}) {
pod.Namespace, pod.Name, err)
}

cc.queue.Add(req)
key := vkcache.JobKeyByReq(&req)
i := cc.getWorkerID(key)
cc.queueList[i].Add(req)
}

func (cc *Controller) recordJobEvent(namespace, name string, event vkbatchv1.JobEvent, message string) {
Expand Down Expand Up @@ -347,7 +357,9 @@ func (cc *Controller) processNextCommand() bool {
Action: vkbatchv1.Action(cmd.Action),
}

cc.queue.Add(req)
key := vkcache.JobKeyByReq(&req)
i := cc.getWorkerID(key)
cc.queueList[i].Add(req)

return true
}
Expand Down Expand Up @@ -382,7 +394,9 @@ func (cc *Controller) updatePodGroup(oldObj, newObj interface{}) {
case kbtype.PodGroupInqueue:
req.Action = vkbatchv1.EnqueueAction
}
cc.queue.Add(req)
key := vkcache.JobKeyByReq(&req)
i := cc.getWorkerID(key)
cc.queueList[i].Add(req)
}
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/controllers/job/job_controller_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ func newController() *Controller {
}

vkclient := vkclientset.NewForConfigOrDie(config)
controller := NewJobController(kubeClientSet, kubeBatchClientSet, vkclient)
controller := NewJobController(kubeClientSet, kubeBatchClientSet, vkclient, 1)
controller.workers = 1

return controller
}
Expand Down Expand Up @@ -160,7 +161,7 @@ func TestJobAddFunc(t *testing.T) {
if job == nil || err != nil {
t.Errorf("Error while Adding Job in case %d with error %s", i, err)
}
len := controller.queue.Len()
len := controller.queueList[0].Len()
if testcase.ExpectValue != len {
t.Errorf("case %d (%s): expected: %v, got %v ", i, testcase.Name, testcase.ExpectValue, len)
}
Expand Down Expand Up @@ -511,7 +512,7 @@ func TestUpdatePodGroupFunc(t *testing.T) {
for i, testcase := range testCases {
controller := newController()
controller.updatePodGroup(testcase.oldPodGroup, testcase.newPodGroup)
len := controller.queue.Len()
len := controller.queueList[0].Len()
if testcase.ExpectValue != len {
t.Errorf("case %d (%s): expected: %v, got %v ", i, testcase.Name, testcase.ExpectValue, len)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/job_controller_plugins_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func newFakeController() *Controller {
VolcanoClientSet := volcanoclient.NewSimpleClientset()
KubeClientSet := kubeclient.NewSimpleClientset()

controller := NewJobController(KubeClientSet, KubeBatchClientSet, VolcanoClientSet)
controller := NewJobController(KubeClientSet, KubeBatchClientSet, VolcanoClientSet, 1)
return controller
}

Expand Down

0 comments on commit c00d12c

Please sign in to comment.