Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add queue controller #128

Merged
merged 16 commits into from
May 10, 2019
15 changes: 13 additions & 2 deletions cmd/controllers/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@ import (
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"

kbver "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"

"volcano.sh/volcano/cmd/controllers/app/options"
vkclient "volcano.sh/volcano/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/controllers/job"
"volcano.sh/volcano/pkg/controllers/queue"
)

const (
Expand Down Expand Up @@ -72,10 +76,17 @@ func Run(opt *options.ServerOption) error {
return err
}

jobController := job.NewJobController(config)
// TODO: add user agent for different controllers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is user agent?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is just for http server to know the client identity.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A http header

kubeClient := clientset.NewForConfigOrDie(config)
kbClient := kbver.NewForConfigOrDie(config)
vkClient := vkclient.NewForConfigOrDie(config)

jobController := job.NewJobController(kubeClient, kbClient, vkClient)
queueController := queue.NewQueueController(kubeClient, kbClient)

run := func(ctx context.Context) {
jobController.Run(ctx.Done())
go jobController.Run(ctx.Done())
go queueController.Run(ctx.Done())
<-ctx.Done()
}

Expand Down
4 changes: 2 additions & 2 deletions installer/chart/volcano/templates/controllers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ rules:
resources: ["services", "configmaps"]
verbs: ["get", "list", "watch", "create", "delete"]
- apiGroups: ["scheduling.incubator.k8s.io"]
resources: ["podgroups"]
verbs: ["get", "list", "watch", "create", "delete"]
resources: ["podgroups", "queues", "queues/status"]
verbs: ["get", "list", "watch", "create", "delete", "update"]
- apiGroups: ["scheduling.k8s.io"]
resources: ["priorityclasses"]
verbs: ["get", "list", "watch", "create", "delete"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,11 @@ spec:
type: object
type: object
version: v1alpha1
subresources:
status: {}
status:
acceptedNames:
kind: ""
plural: ""
conditions: []
storedVersions: []
19 changes: 9 additions & 10 deletions pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
pclister "k8s.io/client-go/listers/scheduling/v1beta1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
Expand All @@ -53,7 +52,6 @@ import (

// Controller the Job Controller type
type Controller struct {
config *rest.Config
kubeClients *kubernetes.Clientset
vkClients *vkver.Clientset
kbClients *kbver.Clientset
Expand Down Expand Up @@ -102,21 +100,22 @@ type Controller struct {
}

// NewJobController create new Job Controller
func NewJobController(config *rest.Config) *Controller {

kubeClients := kubernetes.NewForConfigOrDie(config)
func NewJobController(
kubeClient *kubernetes.Clientset,
kbClient *kbver.Clientset,
vkClient *vkver.Clientset,
) *Controller {

//Initialize event client
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: kubeClients.CoreV1().Events("")})
eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(vkscheme.Scheme, v1.EventSource{Component: "vk-controller"})

cc := &Controller{
config: config,
kubeClients: kubeClients,
vkClients: vkver.NewForConfigOrDie(config),
kbClients: kbver.NewForConfigOrDie(config),
kubeClients: kubeClient,
vkClients: vkClient,
kbClients: kbClient,
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
commandQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
cache: jobcache.New(),
Expand Down
278 changes: 278 additions & 0 deletions pkg/controllers/queue/queue_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
/*
Copyright 2019 The Volcano Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package queue

import (
"sync"

"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

kbv1alpha1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
kbclientset "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
kbinformerfactory "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions"
kbinformer "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/scheduling/v1alpha1"
kblister "github.com/kubernetes-sigs/kube-batch/pkg/client/listers/scheduling/v1alpha1"
)

// Controller manages queue status.
type Controller struct {
kubeClient kubernetes.Interface
kbClient kbclientset.Interface

// informer
queueInformer kbinformer.QueueInformer
pgInformer kbinformer.PodGroupInformer

// queueLister
queueLister kblister.QueueLister
queueSynced cache.InformerSynced

// podGroup lister
pgLister kblister.PodGroupLister
pgSynced cache.InformerSynced

// queues that need to be updated.
queue workqueue.RateLimitingInterface

pgMutex sync.RWMutex
podGroups map[string]map[string]struct{}
}

// NewQueueController creates a QueueController
func NewQueueController(
kubeClient kubernetes.Interface,
kbClient kbclientset.Interface,
) *Controller {
factory := kbinformerfactory.NewSharedInformerFactory(kbClient, 0)
queueInformer := factory.Scheduling().V1alpha1().Queues()
pgInformer := factory.Scheduling().V1alpha1().PodGroups()
c := &Controller{
kubeClient: kubeClient,
kbClient: kbClient,

queueInformer: queueInformer,
pgInformer: pgInformer,

queueLister: queueInformer.Lister(),
queueSynced: queueInformer.Informer().HasSynced,

pgLister: pgInformer.Lister(),
pgSynced: pgInformer.Informer().HasSynced,

queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
podGroups: make(map[string]map[string]struct{}),
}

queueInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addQueue,
DeleteFunc: c.deleteQueue,
})

pgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addPodGroup,
UpdateFunc: c.updatePodGroup,
DeleteFunc: c.deletePodGroup,
})

return c
}

// Run starts QueueController
func (c *Controller) Run(stopCh <-chan struct{}) {

go c.queueInformer.Informer().Run(stopCh)
go c.pgInformer.Informer().Run(stopCh)

if !cache.WaitForCacheSync(stopCh, c.queueSynced, c.pgSynced) {
glog.Errorf("unable to sync caches for queue controller")
return
}

go wait.Until(c.worker, 0, stopCh)
glog.Infof("QueueController is running ...... ")
}

// worker runs a worker thread that just dequeues items, processes them, and
// marks them done. You may run as many of these in parallel as you wish; the
// workqueue guarantees that they will not end up processing the same `queue`
// at the same time.
func (c *Controller) worker() {
for c.processNextWorkItem() {
}
}

func (c *Controller) processNextWorkItem() bool {
eKey, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(eKey)

if err := c.syncQueue(eKey.(string)); err != nil {
glog.V(2).Infof("Error syncing queues %q, retrying. Error: %v", eKey, err)
c.queue.AddRateLimited(eKey)
return true
}

c.queue.Forget(eKey)
return true
}

func (c *Controller) syncQueue(key string) error {
glog.V(4).Infof("Begin sync queue %s", key)

var pending, running, unknown int32
c.pgMutex.RLock()
if c.podGroups[key] == nil {
c.pgMutex.RUnlock()
glog.V(2).Infof("queue %s has not been seen or deleted", key)
return nil
}
podGroups := make([]string, 0, len(c.podGroups[key]))
for pgKey := range c.podGroups[key] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer to handle this in Cache: Queue -> Job -> PodGroup.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I donot want to overlap with job controller. For queue controller, it just care about the podgroups, not care about Job or anything else.

podGroups is keyed by queue name and value is the podgroups that are using this queue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pkg/cache is a common component in controller, and we need to clean up jobs when queue was deleted.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IC, then we need to record jobs too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But have a doubt: why would user delete their queues before jobs? Is there a use case? If there is a malicious user who does this, we should not delete the running jobs by any reason.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this, I can think of a more simple way to do this: I remember there will be an admission controller to validate queue existence before create jobs. Then we can also set the job's owner reference. Then when the queue is deleted, k8s garbage collector can reclaim all children resources automatically.

So we have two choice:

  1. do it in controller, need to record all dependents relationship.

  2. setting ownerreference, do nothing else, leave it to GC.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally prefer not adding this into cache even we need considering deleting jobs when queue deleted.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setting ownerreference, do nothing else, leave it to GC.

owerReference should be reserved for JobGroup :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OwnerReferences is an array, can be more than one.

owerReference should be reserved for JobGroup :)

Sorry, there is no JobGroup now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure whether GC will check controlled owner reference.

there is no JobGroup now.

so "reserved" it.

podGroups = append(podGroups, pgKey)
}
c.pgMutex.RUnlock()

for _, pgKey := range podGroups {
// Ignore error here, tt can not occur.
ns, name, _ := cache.SplitMetaNamespaceKey(pgKey)

pg, err := c.pgLister.PodGroups(ns).Get(name)
if err != nil {
return err
}

switch pg.Status.Phase {
case kbv1alpha1.PodGroupPending:
pending++
case kbv1alpha1.PodGroupRunning:
running++
case kbv1alpha1.PodGroupUnknown:
unknown++
}
}

queue, err := c.queueLister.Get(key)
if err != nil {
if errors.IsNotFound(err) {
glog.V(2).Infof("queue %s has been deleted", queue)
return nil
}
return err
}

glog.V(4).Infof("queue %s jobs pending %d, running %d, unknown %d", key, pending, running, unknown)
// ignore update when status doesnot change
if pending == queue.Status.Pending && running == queue.Status.Running && unknown == queue.Status.Unknown {
return nil
}

newQueue := queue.DeepCopy()
newQueue.Status.Pending = pending
newQueue.Status.Running = running
newQueue.Status.Unknown = unknown

if _, err := c.kbClient.SchedulingV1alpha1().Queues().UpdateStatus(newQueue); err != nil {
glog.Errorf("Failed to update status of Queue %s: %v", newQueue.Name, err)
return err
}

return nil
}

func (c *Controller) addQueue(obj interface{}) {
queue := obj.(*kbv1alpha1.Queue)
c.queue.Add(queue.Name)
}

func (c *Controller) deleteQueue(obj interface{}) {
queue, ok := obj.(*kbv1alpha1.Queue)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %#v", obj)
return
}
queue, ok = tombstone.Obj.(*kbv1alpha1.Queue)
if !ok {
glog.Errorf("Tombstone contained object that is not a Queue: %#v", obj)
return
}
}

c.pgMutex.Lock()
delete(c.podGroups, queue.Name)
c.pgMutex.Unlock()
}

func (c *Controller) addPodGroup(obj interface{}) {
pg := obj.(*kbv1alpha1.PodGroup)
key, _ := cache.MetaNamespaceKeyFunc(obj)

c.pgMutex.Lock()
if c.podGroups[pg.Spec.Queue] == nil {
c.podGroups[pg.Spec.Queue] = make(map[string]struct{})
}
c.podGroups[pg.Spec.Queue][key] = struct{}{}
c.pgMutex.Unlock()

// enqueue
c.queue.Add(pg.Spec.Queue)
}

func (c *Controller) updatePodGroup(old, new interface{}) {
oldPG := old.(*kbv1alpha1.PodGroup)
newPG := new.(*kbv1alpha1.PodGroup)

// Note: we have no use case update PodGroup.Spec.Queue
// So do not consider it here.
if oldPG.Status.Phase != newPG.Status.Phase {
// enqueue
c.queue.Add(newPG.Spec.Queue)
}

}

func (c *Controller) deletePodGroup(obj interface{}) {
pg, ok := obj.(*kbv1alpha1.PodGroup)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %#v", obj)
return
}
pg, ok = tombstone.Obj.(*kbv1alpha1.PodGroup)
if !ok {
glog.Errorf("Tombstone contained object that is not a PodGroup: %#v", obj)
return
}
}

key, _ := cache.MetaNamespaceKeyFunc(obj)

c.pgMutex.Lock()
delete(c.podGroups[pg.Spec.Queue], key)
c.pgMutex.Unlock()

c.queue.Add(pg.Spec.Queue)
}
Loading