Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync job works concurrently #190

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions cmd/controllers/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
)

const (
defaultQPS = 50.0
defaultBurst = 100
defaultQPS = 50.0
defaultBurst = 100
defaultWorkers = 3
)

// ServerOption is the main context object for the controller manager.
Expand All @@ -36,6 +37,9 @@ type ServerOption struct {
KubeAPIBurst int
KubeAPIQPS float32
PrintVersion bool
// WorkerThreads is the number of threads syncing job operations
// concurrently. Larger number = faster job updating,but more CPU load.
WorkerThreads uint32
}

// NewServerOption creates a new CMServer with a default config.
Expand All @@ -54,6 +58,8 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", defaultQPS, "QPS to use while talking with kubernetes apiserver")
fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", defaultBurst, "Burst to use while talking with kubernetes apiserver")
fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit")
fs.Uint32Var(&s.WorkerThreads, "worker-threads", defaultWorkers, "The number of threads syncing job operations concurrently. "+
"Larger number = faster job updating, but more CPU load")
}

// CheckOptionOrDie checks the LockObjectNamespace
Expand Down
9 changes: 5 additions & 4 deletions cmd/controllers/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ func TestAddFlags(t *testing.T) {

// This is a snapshot of expected options parsed by args.
expected := &ServerOption{
Master: "127.0.0.1",
KubeAPIQPS: defaultQPS,
KubeAPIBurst: 200,
PrintVersion: false,
Master: "127.0.0.1",
KubeAPIQPS: defaultQPS,
KubeAPIBurst: 200,
PrintVersion: false,
WorkerThreads: defaultWorkers,
}

if !reflect.DeepEqual(expected, s) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/controllers/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func Run(opt *options.ServerOption) error {
kbClient := kbver.NewForConfigOrDie(config)
vkClient := vkclient.NewForConfigOrDie(config)

jobController := job.NewJobController(kubeClient, kbClient, vkClient)
jobController := job.NewJobController(kubeClient, kbClient, vkClient, opt.WorkerThreads)
queueController := queue.NewQueueController(kubeClient, kbClient)
garbageCollector := garbagecollector.New(vkClient)

Expand Down
6 changes: 6 additions & 0 deletions pkg/controllers/job/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"math/rand"
"strings"
"time"
"volcano.sh/volcano/pkg/controllers/apis"
)

const (
Expand Down Expand Up @@ -61,3 +62,8 @@ func genRandomStr(l int) string {
func MakeVolumeClaimName(jobName string) string {
return fmt.Sprintf(VolumeClaimFmt, jobName, genRandomStr(12))
}

// GetJobKeyByReq gets the key for the job request
func GetJobKeyByReq(req *apis.Request) string {
return fmt.Sprintf("%s/%s", req.Namespace, req.JobName)
}
82 changes: 72 additions & 10 deletions pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ package job

import (
"fmt"
"hash"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close to other system libs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

"hash/fnv"
"sync"
"time"

"github.com/golang/glog"

Expand Down Expand Up @@ -95,7 +98,7 @@ type Controller struct {
pcSynced func() bool

// queue that need to sync up
queue workqueue.RateLimitingInterface
queueList []workqueue.RateLimitingInterface
commandQueue workqueue.RateLimitingInterface
cache jobcache.Cache
//Job Event recorder
Expand All @@ -104,13 +107,15 @@ type Controller struct {

sync.Mutex
errTasks workqueue.RateLimitingInterface
workers uint32
}

// NewJobController create new Job Controller
func NewJobController(
kubeClient kubernetes.Interface,
kbClient kbver.Interface,
vkClient vkver.Interface,
workers uint32,
) *Controller {

//Initialize event client
Expand All @@ -123,12 +128,17 @@ func NewJobController(
kubeClients: kubeClient,
vkClients: vkClient,
kbClients: kbClient,
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
queueList: make([]workqueue.RateLimitingInterface, workers, workers),
commandQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
cache: jobcache.New(),
errTasks: newRateLimitingQueue(),
recorder: recorder,
priorityClasses: make(map[string]*v1beta1.PriorityClass),
workers: workers,
}
var i uint32
for i = 0; i < workers; i++ {
cc.queueList[i] = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
}

cc.jobInformer = vkinfoext.NewSharedInformerFactory(cc.vkClients, 0).Batch().V1alpha1().Jobs()
Expand Down Expand Up @@ -192,6 +202,7 @@ func NewJobController(

// Run start JobController
func (cc *Controller) Run(stopCh <-chan struct{}) {

go cc.sharedInformers.Start(stopCh)
go cc.jobInformer.Informer().Run(stopCh)
go cc.podInformer.Informer().Run(stopCh)
Expand All @@ -205,7 +216,17 @@ func (cc *Controller) Run(stopCh <-chan struct{}) {
cc.svcSynced, cc.cmdSynced, cc.pvcSynced, cc.pcSynced)

go wait.Until(cc.handleCommands, 0, stopCh)
go wait.Until(cc.worker, 0, stopCh)
var i uint32
for i = 0; i < cc.workers; i++ {
go func(num uint32) {
wait.Until(
func() {
cc.worker(num)
},
time.Second,
stopCh)
}(i)
}

go cc.cache.Run(stopCh)

Expand All @@ -215,20 +236,61 @@ func (cc *Controller) Run(stopCh <-chan struct{}) {
glog.Infof("JobController is running ...... ")
}

func (cc *Controller) worker() {
for cc.processNextReq() {
func (cc *Controller) worker(i uint32) {
glog.Infof("worker %d start ...... ", i)

for cc.processNextReq(i) {
}
}

func (cc *Controller) processNextReq() bool {
obj, shutdown := cc.queue.Get()
func (cc *Controller) belongsToThisRoutine(key string, count uint32) bool {
var hashVal hash.Hash32
var val uint32

hashVal = fnv.New32()
hashVal.Write([]byte(key))

val = hashVal.Sum32()

if val%cc.workers == count {
return true
}

return false
}

func (cc *Controller) getWorkerQueue(key string) workqueue.RateLimitingInterface {
var hashVal hash.Hash32
var val uint32

hashVal = fnv.New32()
hashVal.Write([]byte(key))

val = hashVal.Sum32()

queue := cc.queueList[val%cc.workers]

return queue
}

func (cc *Controller) processNextReq(count uint32) bool {
queue := cc.queueList[count]
hzxuzhonghu marked this conversation as resolved.
Show resolved Hide resolved
obj, shutdown := queue.Get()
if shutdown {
glog.Errorf("Fail to pop item from queue")
return false
}

req := obj.(apis.Request)
defer cc.queue.Done(req)
defer queue.Done(req)

key := jobcache.JobKeyByReq(&req)
if !cc.belongsToThisRoutine(key, count) {
glog.Errorf("should not occur The job does not belongs to this routine key:%s, worker:%d...... ", key, count)
queueLocal := cc.getWorkerQueue(key)
queueLocal.Add(req)
return true
}

glog.V(3).Infof("Try to handle request <%v>", req)

Expand Down Expand Up @@ -259,12 +321,12 @@ func (cc *Controller) processNextReq() bool {
glog.Errorf("Failed to handle Job <%s/%s>: %v",
jobInfo.Job.Namespace, jobInfo.Job.Name, err)
// If any error, requeue it.
cc.queue.AddRateLimited(req)
queue.AddRateLimited(req)
return true
}

// If no error, forget it.
cc.queue.Forget(req)
queue.Forget(req)

return true
}
29 changes: 22 additions & 7 deletions pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"volcano.sh/volcano/pkg/controllers/apis"
vkcache "volcano.sh/volcano/pkg/controllers/cache"
vkjobhelpers "volcano.sh/volcano/pkg/controllers/job/helpers"
)

func (cc *Controller) addCommand(obj interface{}) {
Expand Down Expand Up @@ -65,7 +66,9 @@ func (cc *Controller) addJob(obj interface{}) {
glog.Errorf("Failed to add job <%s/%s>: %v in cache",
job.Namespace, job.Name, err)
}
cc.queue.Add(req)
key := vkjobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
}

func (cc *Controller) updateJob(oldObj, newObj interface{}) {
Expand Down Expand Up @@ -100,7 +103,9 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) {
Event: vkbatchv1.OutOfSyncEvent,
}

cc.queue.Add(req)
key := vkjobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
}

func (cc *Controller) deleteJob(obj interface{}) {
Expand Down Expand Up @@ -165,7 +170,9 @@ func (cc *Controller) addPod(obj interface{}) {
glog.Errorf("Failed to add Pod <%s/%s>: %v to cache",
pod.Namespace, pod.Name, err)
}
cc.queue.Add(req)
key := vkjobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
}

func (cc *Controller) updatePod(oldObj, newObj interface{}) {
Expand Down Expand Up @@ -241,7 +248,9 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) {
JobVersion: int32(dVersion),
}

cc.queue.Add(req)
key := vkjobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
}

func (cc *Controller) deletePod(obj interface{}) {
Expand Down Expand Up @@ -302,7 +311,9 @@ func (cc *Controller) deletePod(obj interface{}) {
pod.Namespace, pod.Name, err)
}

cc.queue.Add(req)
key := vkjobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
}

func (cc *Controller) recordJobEvent(namespace, name string, event vkbatchv1.JobEvent, message string) {
Expand Down Expand Up @@ -347,7 +358,9 @@ func (cc *Controller) processNextCommand() bool {
Action: vkbatchv1.Action(cmd.Action),
}

cc.queue.Add(req)
key := vkjobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)

return true
}
Expand Down Expand Up @@ -382,7 +395,9 @@ func (cc *Controller) updatePodGroup(oldObj, newObj interface{}) {
case kbtype.PodGroupInqueue:
req.Action = vkbatchv1.EnqueueAction
}
cc.queue.Add(req)
key := vkjobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
}
}

Expand Down
9 changes: 6 additions & 3 deletions pkg/controllers/job/job_controller_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func newController() *Controller {
}

vkclient := vkclientset.NewForConfigOrDie(config)
controller := NewJobController(kubeClientSet, kubeBatchClientSet, vkclient)
controller := NewJobController(kubeClientSet, kubeBatchClientSet, vkclient, 3)

return controller
}
Expand Down Expand Up @@ -160,7 +160,8 @@ func TestJobAddFunc(t *testing.T) {
if job == nil || err != nil {
t.Errorf("Error while Adding Job in case %d with error %s", i, err)
}
len := controller.queue.Len()
queue := controller.getWorkerQueue(key)
len := queue.Len()
if testcase.ExpectValue != len {
t.Errorf("case %d (%s): expected: %v, got %v ", i, testcase.Name, testcase.ExpectValue, len)
}
Expand Down Expand Up @@ -511,7 +512,9 @@ func TestUpdatePodGroupFunc(t *testing.T) {
for i, testcase := range testCases {
controller := newController()
controller.updatePodGroup(testcase.oldPodGroup, testcase.newPodGroup)
len := controller.queue.Len()
key := fmt.Sprintf("%s/%s", testcase.oldPodGroup.Namespace, testcase.oldPodGroup.Name)
queue := controller.getWorkerQueue(key)
len := queue.Len()
if testcase.ExpectValue != len {
t.Errorf("case %d (%s): expected: %v, got %v ", i, testcase.Name, testcase.ExpectValue, len)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/job_controller_plugins_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func newFakeController() *Controller {
VolcanoClientSet := volcanoclient.NewSimpleClientset()
KubeClientSet := kubeclient.NewSimpleClientset()

controller := NewJobController(KubeClientSet, KubeBatchClientSet, VolcanoClientSet)
controller := NewJobController(KubeClientSet, KubeBatchClientSet, VolcanoClientSet, 3)
return controller
}

Expand Down