From 27648212f5c15067ebbd891fdabfe0e08a3f43dd Mon Sep 17 00:00:00 2001 From: Zhang Jinghui Date: Thu, 7 Nov 2019 00:05:25 +0800 Subject: [PATCH] add queue controller about state --- pkg/controllers/queue/queue_controller.go | 299 ++++++++++-------- .../queue/queue_controller_action.go | 167 ++++++++++ .../queue/queue_controller_handler.go | 186 +++++++++++ .../queue/queue_controller_test.go | 2 +- .../queue/queue_controller_util.go | 40 +++ pkg/controllers/queue/state/closed.go | 58 ++++ pkg/controllers/queue/state/closing.go | 68 ++++ pkg/controllers/queue/state/factory.go | 58 ++++ pkg/controllers/queue/state/open.go | 68 ++++ pkg/controllers/queue/state/unknown.go | 66 ++++ 10 files changed, 882 insertions(+), 130 deletions(-) create mode 100644 pkg/controllers/queue/queue_controller_action.go create mode 100644 pkg/controllers/queue/queue_controller_handler.go create mode 100644 pkg/controllers/queue/queue_controller_util.go create mode 100644 pkg/controllers/queue/state/closed.go create mode 100644 pkg/controllers/queue/state/closing.go create mode 100644 pkg/controllers/queue/state/factory.go create mode 100644 pkg/controllers/queue/state/open.go create mode 100644 pkg/controllers/queue/state/unknown.go diff --git a/pkg/controllers/queue/queue_controller.go b/pkg/controllers/queue/queue_controller.go index 5c8dadbb49..bcf839e71a 100644 --- a/pkg/controllers/queue/queue_controller.go +++ b/pkg/controllers/queue/queue_controller.go @@ -18,21 +18,38 @@ package queue import ( "fmt" - "reflect" "sync" + "time" - "k8s.io/apimachinery/pkg/api/errors" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog" + busv1alpha1 "volcano.sh/volcano/pkg/apis/bus/v1alpha1" schedulingv1alpha2 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" vcclientset "volcano.sh/volcano/pkg/client/clientset/versioned" + versionedscheme "volcano.sh/volcano/pkg/client/clientset/versioned/scheme" informerfactory "volcano.sh/volcano/pkg/client/informers/externalversions" + busv1alpha1informer "volcano.sh/volcano/pkg/client/informers/externalversions/bus/v1alpha1" schedulinginformer "volcano.sh/volcano/pkg/client/informers/externalversions/scheduling/v1alpha2" + busv1alpha1lister "volcano.sh/volcano/pkg/client/listers/bus/v1alpha1" schedulinglister "volcano.sh/volcano/pkg/client/listers/scheduling/v1alpha2" + queuestate "volcano.sh/volcano/pkg/controllers/queue/state" +) + +const ( + // maxRetries is the number of times a queue or command will be retried before it is dropped out of the queue. + // With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times + // a queue or command is going to be requeued: + // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s + maxRetries = 15 ) // Controller manages queue status. @@ -52,12 +69,24 @@ type Controller struct { pgLister schedulinglister.PodGroupLister pgSynced cache.InformerSynced + cmdInformer busv1alpha1informer.CommandInformer + cmdLister busv1alpha1lister.CommandLister + cmdSynced cache.InformerSynced + // queues that need to be updated. - queue workqueue.RateLimitingInterface + queue workqueue.RateLimitingInterface + commandQueue workqueue.RateLimitingInterface pgMutex sync.RWMutex // queue name -> podgroup namespace/name podGroups map[string]map[string]struct{} + + syncHandler func(req *schedulingv1alpha2.QueueRequest) error + syncCommandHandler func(cmd *busv1alpha1.Command) error + + enqueueQueue func(req *schedulingv1alpha2.QueueRequest) + + recorder record.EventRecorder } // NewQueueController creates a QueueController @@ -68,6 +97,11 @@ func NewQueueController( factory := informerfactory.NewSharedInformerFactory(vcClient, 0) queueInformer := factory.Scheduling().V1alpha2().Queues() pgInformer := factory.Scheduling().V1alpha2().PodGroups() + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(klog.Infof) + eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + c := &Controller{ kubeClient: kubeClient, vcClient: vcClient, @@ -81,12 +115,17 @@ func NewQueueController( pgLister: pgInformer.Lister(), pgSynced: pgInformer.Informer().HasSynced, - queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + commandQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + podGroups: make(map[string]map[string]struct{}), + + recorder: eventBroadcaster.NewRecorder(versionedscheme.Scheme, v1.EventSource{Component: "vc-controllers"}), } queueInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.addQueue, + UpdateFunc: c.updateQueue, DeleteFunc: c.deleteQueue, }) @@ -96,22 +135,58 @@ func NewQueueController( DeleteFunc: c.deletePodGroup, }) + c.cmdInformer = informerfactory.NewSharedInformerFactory(c.vcClient, 0).Bus().V1alpha1().Commands() + c.cmdInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + switch obj.(type) { + case *busv1alpha1.Command: + cmd := obj.(*busv1alpha1.Command) + return IsQueueReference(cmd.TargetObject) + default: + return false + } + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: c.addCommand, + }, + }) + c.cmdLister = c.cmdInformer.Lister() + c.cmdSynced = c.cmdInformer.Informer().HasSynced + + queuestate.SyncQueue = c.syncQueue + queuestate.OpenQueue = c.openQueue + queuestate.CloseQueue = c.closeQueue + + c.syncHandler = c.handleQueue + c.syncCommandHandler = c.handleCommand + + c.enqueueQueue = c.enqueue + return c } // Run starts QueueController func (c *Controller) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + defer c.commandQueue.ShutDown() + + klog.Infof("Starting queue controller.") + defer klog.Infof("Shutting down queue controller.") go c.queueInformer.Informer().Run(stopCh) go c.pgInformer.Informer().Run(stopCh) + go c.cmdInformer.Informer().Run(stopCh) - if !cache.WaitForCacheSync(stopCh, c.queueSynced, c.pgSynced) { - klog.Errorf("unable to sync caches for queue controller") + if !cache.WaitForCacheSync(stopCh, c.queueSynced, c.pgSynced, c.cmdSynced) { + klog.Errorf("unable to sync caches for queue controller.") return } go wait.Until(c.worker, 0, stopCh) - klog.Infof("QueueController is running ...... ") + go wait.Until(c.commandWorker, 0, stopCh) + + <-stopCh } // worker runs a worker thread that just dequeues items, processes them, and @@ -124,168 +199,134 @@ func (c *Controller) worker() { } func (c *Controller) processNextWorkItem() bool { - eKey, quit := c.queue.Get() - if quit { + obj, shutdown := c.queue.Get() + if shutdown { return false } - defer c.queue.Done(eKey) + defer c.queue.Done(obj) - if err := c.syncQueue(eKey.(string)); err != nil { - klog.V(2).Infof("Error syncing queues %q, retrying. Error: %v", eKey, err) - c.queue.AddRateLimited(eKey) + req, ok := obj.(*schedulingv1alpha2.QueueRequest) + if !ok { + klog.Errorf("%v is not a valid queue request struct.", obj) return true } - c.queue.Forget(eKey) - return true -} - -func (c *Controller) getPodGroups(key string) ([]string, error) { - c.pgMutex.RLock() - defer c.pgMutex.RUnlock() + err := c.syncHandler(req) + c.handleQueueErr(err, obj) - if c.podGroups[key] == nil { - return nil, fmt.Errorf("queue %s has not been seen or deleted", key) - } - podGroups := make([]string, 0, len(c.podGroups[key])) - for pgKey := range c.podGroups[key] { - podGroups = append(podGroups, pgKey) - } - - return podGroups, nil + return true } -func (c *Controller) syncQueue(key string) error { - klog.V(4).Infof("Begin sync queue %s", key) +func (c *Controller) handleQueue(req *schedulingv1alpha2.QueueRequest) error { + startTime := time.Now() + defer func() { + klog.V(4).Infof("Finished syncing queue %s (%v).", req.Name, time.Since(startTime)) + }() - podGroups, err := c.getPodGroups(key) + queue, err := c.queueLister.Get(req.Name) if err != nil { - return err - } - - queueStatus := schedulingv1alpha2.QueueStatus{} - - for _, pgKey := range podGroups { - // Ignore error here, tt can not occur. - ns, name, _ := cache.SplitMetaNamespaceKey(pgKey) - - // TODO: check NotFound error and sync local cache. - pg, err := c.pgLister.PodGroups(ns).Get(name) - if err != nil { - return err + if apierrors.IsNotFound(err) { + klog.V(4).Infof("Queue %s has been deleted.", req.Name) + return nil } - switch pg.Status.Phase { - case schedulingv1alpha2.PodGroupPending: - queueStatus.Pending++ - case schedulingv1alpha2.PodGroupRunning: - queueStatus.Running++ - case schedulingv1alpha2.PodGroupUnknown: - queueStatus.Unknown++ - case schedulingv1alpha2.PodGroupInqueue: - queueStatus.Inqueue++ - } + return fmt.Errorf("get queue %s failed for %v", req.Name, err) } - queue, err := c.queueLister.Get(key) - if err != nil { - if errors.IsNotFound(err) { - klog.V(2).Infof("queue %s has been deleted", key) - return nil - } - // TODO: do not retry to syncQueue for this error - return err + queueState := queuestate.NewState(queue) + if queueState == nil { + return fmt.Errorf("queue %s state %s is invalid", queue.Name, queue.Status.State) } - // ignore update when status does not change - if reflect.DeepEqual(queueStatus, queue.Status) { - return nil + if err := queueState.Execute(req.Action); err != nil { + return fmt.Errorf("sync queue %s failed for %v, event is %v, action is %s", + req.Name, err, req.Event, req.Action) } - newQueue := queue.DeepCopy() - newQueue.Status = queueStatus + return nil +} + +func (c *Controller) handleQueueErr(err error, obj interface{}) { + if err == nil { + c.queue.Forget(obj) + return + } - if _, err := c.vcClient.SchedulingV1alpha2().Queues().UpdateStatus(newQueue); err != nil { - klog.Errorf("Failed to update status of Queue %s: %v", newQueue.Name, err) - return err + if c.queue.NumRequeues(obj) < maxRetries { + klog.V(4).Infof("Error syncing queue request %v for %v.", obj, err) + c.queue.AddRateLimited(obj) + return } - return nil + req, _ := obj.(*schedulingv1alpha2.QueueRequest) + c.recordEventsForQueue(req.Name, v1.EventTypeWarning, string(req.Action), + fmt.Sprintf("%v queue failed for %v", req.Action, err)) + klog.V(2).Infof("Dropping queue request %v out of the queue for %v.", obj, err) + c.queue.Forget(obj) } -func (c *Controller) addQueue(obj interface{}) { - queue := obj.(*schedulingv1alpha2.Queue) - c.queue.Add(queue.Name) +func (c *Controller) commandWorker() { + for c.processNextCommand() { + } } -func (c *Controller) deleteQueue(obj interface{}) { - queue, ok := obj.(*schedulingv1alpha2.Queue) +func (c *Controller) processNextCommand() bool { + obj, shutdown := c.commandQueue.Get() + if shutdown { + return false + } + defer c.commandQueue.Done(obj) + + cmd, ok := obj.(*busv1alpha1.Command) if !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - klog.Errorf("Couldn't get object from tombstone %#v", obj) - return - } - queue, ok = tombstone.Obj.(*schedulingv1alpha2.Queue) - if !ok { - klog.Errorf("Tombstone contained object that is not a Queue: %#v", obj) - return - } + klog.Errorf("%v is not a valid Command struct.", obj) + return true } - c.pgMutex.Lock() - defer c.pgMutex.Unlock() - delete(c.podGroups, queue.Name) + err := c.syncCommandHandler(cmd) + c.handleCommandErr(err, obj) + + return true } -func (c *Controller) addPodGroup(obj interface{}) { - pg := obj.(*schedulingv1alpha2.PodGroup) - key, _ := cache.MetaNamespaceKeyFunc(obj) +func (c *Controller) handleCommand(cmd *busv1alpha1.Command) error { + startTime := time.Now() + defer func() { + klog.V(4).Infof("Finished syncing command %s/%s (%v).", cmd.Namespace, cmd.Name, time.Since(startTime)) + }() - c.pgMutex.Lock() - defer c.pgMutex.Unlock() + err := c.vcClient.BusV1alpha1().Commands(cmd.Namespace).Delete(cmd.Name, nil) + if err != nil { + if true == apierrors.IsNotFound(err) { + return nil + } - if c.podGroups[pg.Spec.Queue] == nil { - c.podGroups[pg.Spec.Queue] = make(map[string]struct{}) + return fmt.Errorf("failed to delete command <%s/%s> for %v", cmd.Namespace, cmd.Name, err) } - c.podGroups[pg.Spec.Queue][key] = struct{}{} - // enqueue - c.queue.Add(pg.Spec.Queue) -} + req := &schedulingv1alpha2.QueueRequest{ + Name: cmd.TargetObject.Name, + Event: schedulingv1alpha2.QueueCommandIssuedEvent, + Action: schedulingv1alpha2.QueueAction(cmd.Action), + } -func (c *Controller) updatePodGroup(old, new interface{}) { - oldPG := old.(*schedulingv1alpha2.PodGroup) - newPG := new.(*schedulingv1alpha2.PodGroup) + c.enqueueQueue(req) - // Note: we have no use case update PodGroup.Spec.Queue - // So do not consider it here. - if oldPG.Status.Phase != newPG.Status.Phase { - c.queue.Add(newPG.Spec.Queue) - } + return nil } -func (c *Controller) deletePodGroup(obj interface{}) { - pg, ok := obj.(*schedulingv1alpha2.PodGroup) - if !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - klog.Errorf("Couldn't get object from tombstone %#v", obj) - return - } - pg, ok = tombstone.Obj.(*schedulingv1alpha2.PodGroup) - if !ok { - klog.Errorf("Tombstone contained object that is not a PodGroup: %#v", obj) - return - } +func (c *Controller) handleCommandErr(err error, obj interface{}) { + if err == nil { + c.commandQueue.Forget(obj) + return } - key, _ := cache.MetaNamespaceKeyFunc(obj) - - c.pgMutex.Lock() - defer c.pgMutex.Unlock() - - delete(c.podGroups[pg.Spec.Queue], key) + if c.commandQueue.NumRequeues(obj) < maxRetries { + klog.V(4).Infof("Error syncing command %v for %v.", obj, err) + c.commandQueue.AddRateLimited(obj) + return + } - c.queue.Add(pg.Spec.Queue) + klog.V(2).Infof("Dropping command %v out of the queue for %v.", obj, err) + c.commandQueue.Forget(obj) } diff --git a/pkg/controllers/queue/queue_controller_action.go b/pkg/controllers/queue/queue_controller_action.go new file mode 100644 index 0000000000..c405f58f48 --- /dev/null +++ b/pkg/controllers/queue/queue_controller_action.go @@ -0,0 +1,167 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "fmt" + "reflect" + + schedulingv1alpha2 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" + "volcano.sh/volcano/pkg/controllers/queue/state" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" + + "k8s.io/klog" +) + +func (c *Controller) syncQueue(queue *schedulingv1alpha2.Queue, updateStateFn state.UpdateQueueStatusFn) error { + klog.V(4).Infof("Begin to sync queue %s.", queue.Name) + + podGroups := c.getPodGroups(queue.Name) + queueStatus := schedulingv1alpha2.QueueStatus{} + + for _, pgKey := range podGroups { + // Ignore error here, tt can not occur. + ns, name, _ := cache.SplitMetaNamespaceKey(pgKey) + + // TODO: check NotFound error and sync local cache. + pg, err := c.pgLister.PodGroups(ns).Get(name) + if err != nil { + return err + } + + switch pg.Status.Phase { + case schedulingv1alpha2.PodGroupPending: + queueStatus.Pending++ + case schedulingv1alpha2.PodGroupRunning: + queueStatus.Running++ + case schedulingv1alpha2.PodGroupUnknown: + queueStatus.Unknown++ + case schedulingv1alpha2.PodGroupInqueue: + queueStatus.Inqueue++ + } + } + + if updateStateFn != nil { + updateStateFn(&queueStatus, podGroups) + } else { + queueStatus.State = queue.Status.State + } + + // ignore update when status does not change + if reflect.DeepEqual(queueStatus, queue.Status) { + return nil + } + + newQueue := queue.DeepCopy() + newQueue.Status = queueStatus + if _, err := c.vcClient.SchedulingV1alpha2().Queues().UpdateStatus(newQueue); err != nil { + klog.Errorf("Failed to update status of Queue %s: %v.", newQueue.Name, err) + return err + } + + return nil +} + +func (c *Controller) openQueue(queue *schedulingv1alpha2.Queue, updateStateFn state.UpdateQueueStatusFn) error { + klog.V(4).Infof("Begin to open queue %s.", queue.Name) + + newQueue := queue.DeepCopy() + newQueue.Spec.State = schedulingv1alpha2.QueueStateOpen + + if queue.Spec.State != newQueue.Spec.State { + if _, err := c.vcClient.SchedulingV1alpha2().Queues().Update(newQueue); err != nil { + c.recorder.Event(newQueue, v1.EventTypeWarning, string(schedulingv1alpha2.OpenQueueAction), + fmt.Sprintf("Open queue failed for %v", err)) + return err + } + + c.recorder.Event(newQueue, v1.EventTypeNormal, string(schedulingv1alpha2.OpenQueueAction), + fmt.Sprintf("Open queue succeed")) + } else { + return nil + } + + q, err := c.vcClient.SchedulingV1alpha2().Queues().Get(newQueue.Name, metav1.GetOptions{}) + if err != nil { + return err + } + + newQueue = q.DeepCopy() + if updateStateFn != nil { + updateStateFn(&newQueue.Status, nil) + } else { + return fmt.Errorf("internal error, update state function should be provided") + } + + if queue.Status.State != newQueue.Status.State { + if _, err := c.vcClient.SchedulingV1alpha2().Queues().UpdateStatus(newQueue); err != nil { + c.recorder.Event(newQueue, v1.EventTypeWarning, string(schedulingv1alpha2.OpenQueueAction), + fmt.Sprintf("Update queue status from %s to %s failed for %v", + queue.Status.State, newQueue.Status.State, err)) + return err + } + } + + return nil +} + +func (c *Controller) closeQueue(queue *schedulingv1alpha2.Queue, updateStateFn state.UpdateQueueStatusFn) error { + klog.V(4).Infof("Begin to close queue %s.", queue.Name) + + newQueue := queue.DeepCopy() + newQueue.Spec.State = schedulingv1alpha2.QueueStateClosed + + if queue.Spec.State != newQueue.Spec.State { + if _, err := c.vcClient.SchedulingV1alpha2().Queues().Update(newQueue); err != nil { + c.recorder.Event(newQueue, v1.EventTypeWarning, string(schedulingv1alpha2.CloseQueueAction), + fmt.Sprintf("Close queue failed for %v", err)) + return err + } + + c.recorder.Event(newQueue, v1.EventTypeNormal, string(schedulingv1alpha2.CloseQueueAction), + fmt.Sprintf("Close queue succeed")) + } else { + return nil + } + + q, err := c.vcClient.SchedulingV1alpha2().Queues().Get(newQueue.Name, metav1.GetOptions{}) + if err != nil { + return err + } + + newQueue = q.DeepCopy() + podGroups := c.getPodGroups(newQueue.Name) + if updateStateFn != nil { + updateStateFn(&newQueue.Status, podGroups) + } else { + return fmt.Errorf("internal error, update state function should be provided") + } + + if queue.Status.State != newQueue.Status.State { + if _, err := c.vcClient.SchedulingV1alpha2().Queues().UpdateStatus(newQueue); err != nil { + c.recorder.Event(newQueue, v1.EventTypeWarning, string(schedulingv1alpha2.CloseQueueAction), + fmt.Sprintf("Update queue status from %s to %s failed for %v", + queue.Status.State, newQueue.Status.State, err)) + return err + } + } + + return nil +} diff --git a/pkg/controllers/queue/queue_controller_handler.go b/pkg/controllers/queue/queue_controller_handler.go new file mode 100644 index 0000000000..7165f8a215 --- /dev/null +++ b/pkg/controllers/queue/queue_controller_handler.go @@ -0,0 +1,186 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + busv1alpha1 "volcano.sh/volcano/pkg/apis/bus/v1alpha1" + schedulingv1alpha2 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" + + "k8s.io/client-go/tools/cache" + + "k8s.io/klog" +) + +func (c *Controller) enqueue(req *schedulingv1alpha2.QueueRequest) { + c.queue.Add(req) +} + +func (c *Controller) addQueue(obj interface{}) { + queue := obj.(*schedulingv1alpha2.Queue) + + req := &schedulingv1alpha2.QueueRequest{ + Name: queue.Name, + + Event: schedulingv1alpha2.QueueOutOfSyncEvent, + Action: schedulingv1alpha2.SyncQueueAction, + } + + c.enqueue(req) +} + +func (c *Controller) deleteQueue(obj interface{}) { + queue, ok := obj.(*schedulingv1alpha2.Queue) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + klog.Errorf("Couldn't get object from tombstone %#v.", obj) + return + } + queue, ok = tombstone.Obj.(*schedulingv1alpha2.Queue) + if !ok { + klog.Errorf("Tombstone contained object that is not a Queue: %#v.", obj) + return + } + } + + c.pgMutex.Lock() + defer c.pgMutex.Unlock() + delete(c.podGroups, queue.Name) +} + +func (c *Controller) updateQueue(old, new interface{}) { + oldQueue, ok := old.(*schedulingv1alpha2.Queue) + if !ok { + klog.Errorf("Can not covert old object %v to queues.scheduling.sigs.dev.", old) + return + } + + newQueue, ok := new.(*schedulingv1alpha2.Queue) + if !ok { + klog.Errorf("Can not covert new object %v to queues.scheduling.sigs.dev.", old) + return + } + + if oldQueue.ResourceVersion == newQueue.ResourceVersion { + return + } + + c.addQueue(newQueue) + + return +} + +func (c *Controller) addPodGroup(obj interface{}) { + pg := obj.(*schedulingv1alpha2.PodGroup) + key, _ := cache.MetaNamespaceKeyFunc(obj) + + c.pgMutex.Lock() + defer c.pgMutex.Unlock() + + if c.podGroups[pg.Spec.Queue] == nil { + c.podGroups[pg.Spec.Queue] = make(map[string]struct{}) + } + c.podGroups[pg.Spec.Queue][key] = struct{}{} + + req := &schedulingv1alpha2.QueueRequest{ + Name: pg.Spec.Queue, + + Event: schedulingv1alpha2.QueueOutOfSyncEvent, + Action: schedulingv1alpha2.SyncQueueAction, + } + + c.enqueue(req) +} + +func (c *Controller) updatePodGroup(old, new interface{}) { + oldPG := old.(*schedulingv1alpha2.PodGroup) + newPG := new.(*schedulingv1alpha2.PodGroup) + + // Note: we have no use case update PodGroup.Spec.Queue + // So do not consider it here. + if oldPG.Status.Phase != newPG.Status.Phase { + c.addPodGroup(newPG) + } +} + +func (c *Controller) deletePodGroup(obj interface{}) { + pg, ok := obj.(*schedulingv1alpha2.PodGroup) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + klog.Errorf("Couldn't get object from tombstone %#v.", obj) + return + } + pg, ok = tombstone.Obj.(*schedulingv1alpha2.PodGroup) + if !ok { + klog.Errorf("Tombstone contained object that is not a PodGroup: %#v.", obj) + return + } + } + + key, _ := cache.MetaNamespaceKeyFunc(obj) + + c.pgMutex.Lock() + defer c.pgMutex.Unlock() + + delete(c.podGroups[pg.Spec.Queue], key) + + req := &schedulingv1alpha2.QueueRequest{ + Name: pg.Spec.Queue, + + Event: schedulingv1alpha2.QueueOutOfSyncEvent, + Action: schedulingv1alpha2.SyncQueueAction, + } + + c.enqueue(req) +} + +func (c *Controller) addCommand(obj interface{}) { + cmd, ok := obj.(*busv1alpha1.Command) + if !ok { + klog.Errorf("Obj %v is not command.", obj) + return + } + + c.commandQueue.Add(cmd) +} + +func (c *Controller) getPodGroups(key string) []string { + c.pgMutex.RLock() + defer c.pgMutex.RUnlock() + + if c.podGroups[key] == nil { + return nil + } + podGroups := make([]string, 0, len(c.podGroups[key])) + for pgKey := range c.podGroups[key] { + podGroups = append(podGroups, pgKey) + } + + return podGroups +} + +func (c *Controller) recordEventsForQueue(name, eventType, reason, message string) { + queue, err := c.queueLister.Get(name) + if err != nil { + klog.Errorf("Get queue %s failed for %v.", name, err) + return + } + + c.recorder.Event(queue, eventType, reason, message) + return +} diff --git a/pkg/controllers/queue/queue_controller_test.go b/pkg/controllers/queue/queue_controller_test.go index e1e7806a61..9c4a068456 100644 --- a/pkg/controllers/queue/queue_controller_test.go +++ b/pkg/controllers/queue/queue_controller_test.go @@ -271,7 +271,7 @@ func TestSyncQueue(t *testing.T) { c.queueInformer.Informer().GetIndexer().Add(testcase.queue) c.vcClient.SchedulingV1alpha2().Queues().Create(testcase.queue) - err := c.syncQueue(testcase.queue.Name) + err := c.syncQueue(testcase.queue, nil) item, _ := c.vcClient.SchedulingV1alpha2().Queues().Get(testcase.queue.Name, metav1.GetOptions{}) if err != nil && testcase.ExpectValue != item.Status.Pending { t.Errorf("case %d (%s): expected: %v, got %v ", i, testcase.Name, testcase.ExpectValue, c.queue.Len()) diff --git a/pkg/controllers/queue/queue_controller_util.go b/pkg/controllers/queue/queue_controller_util.go new file mode 100644 index 0000000000..28ce7a261b --- /dev/null +++ b/pkg/controllers/queue/queue_controller_util.go @@ -0,0 +1,40 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + schedulingv1alpha2 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// IsQueueReference return if ownerReference is Queue Kind +func IsQueueReference(ref *metav1.OwnerReference) bool { + if ref == nil { + return false + } + + if ref.APIVersion != schedulingv1alpha2.SchemeGroupVersion.String() { + return false + } + + if ref.Kind != "Queue" { + return false + } + + return true +} diff --git a/pkg/controllers/queue/state/closed.go b/pkg/controllers/queue/state/closed.go new file mode 100644 index 0000000000..91e402d87c --- /dev/null +++ b/pkg/controllers/queue/state/closed.go @@ -0,0 +1,58 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" +) + +type closedState struct { + queue *v1alpha2.Queue +} + +func (cs *closedState) Execute(action v1alpha2.QueueAction) error { + switch action { + case v1alpha2.OpenQueueAction: + return OpenQueue(cs.queue, func(status *v1alpha2.QueueStatus, podGroupList []string) { + status.State = v1alpha2.QueueStateOpen + return + }) + case v1alpha2.CloseQueueAction: + return SyncQueue(cs.queue, func(status *v1alpha2.QueueStatus, podGroupList []string) { + status.State = v1alpha2.QueueStateClosed + return + }) + default: + return SyncQueue(cs.queue, func(status *v1alpha2.QueueStatus, podGroupList []string) { + specState := cs.queue.Spec.State + if specState == v1alpha2.QueueStateOpen { + status.State = v1alpha2.QueueStateOpen + return + } + + if specState == v1alpha2.QueueStateClosed { + status.State = v1alpha2.QueueStateClosed + return + } + + status.State = v1alpha2.QueueStateUnknown + return + }) + } + + return nil +} diff --git a/pkg/controllers/queue/state/closing.go b/pkg/controllers/queue/state/closing.go new file mode 100644 index 0000000000..afcbdd6b37 --- /dev/null +++ b/pkg/controllers/queue/state/closing.go @@ -0,0 +1,68 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" +) + +type closingState struct { + queue *v1alpha2.Queue +} + +func (cs *closingState) Execute(action v1alpha2.QueueAction) error { + switch action { + case v1alpha2.OpenQueueAction: + return OpenQueue(cs.queue, func(status *v1alpha2.QueueStatus, podGroupList []string) { + status.State = v1alpha2.QueueStateOpen + return + }) + case v1alpha2.CloseQueueAction: + return SyncQueue(cs.queue, func(status *v1alpha2.QueueStatus, podGroupList []string) { + if len(podGroupList) == 0 { + status.State = v1alpha2.QueueStateClosed + return + } + status.State = v1alpha2.QueueStateClosing + + return + }) + default: + return SyncQueue(cs.queue, func(status *v1alpha2.QueueStatus, podGroupList []string) { + specState := cs.queue.Spec.State + if specState == v1alpha2.QueueStateOpen { + status.State = v1alpha2.QueueStateOpen + return + } + + if specState == v1alpha2.QueueStateClosed { + if len(podGroupList) == 0 { + status.State = v1alpha2.QueueStateClosed + return + } + + status.State = v1alpha2.QueueStateClosing + return + } + + status.State = v1alpha2.QueueStateUnknown + return + }) + } + + return nil +} diff --git a/pkg/controllers/queue/state/factory.go b/pkg/controllers/queue/state/factory.go new file mode 100644 index 0000000000..07212a075e --- /dev/null +++ b/pkg/controllers/queue/state/factory.go @@ -0,0 +1,58 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" +) + +// State interface +type State interface { + // Execute executes the actions based on current state. + Execute(action v1alpha2.QueueAction) error +} + +// UpdateQueueStatusFn updates the queue status +type UpdateQueueStatusFn func(status *v1alpha2.QueueStatus, podGroupList []string) + +// QueueActionFn will open, close or sync queue. +type QueueActionFn func(queue *v1alpha2.Queue, fn UpdateQueueStatusFn) error + +var ( + // SyncQueue will sync queue status. + SyncQueue QueueActionFn + // OpenQueue will set state of queue to open + OpenQueue QueueActionFn + // CloseQueue will set state of queue to close + CloseQueue QueueActionFn +) + +// NewState gets the state from queue status +func NewState(queue *v1alpha2.Queue) State { + switch queue.Status.State { + case "", v1alpha2.QueueStateOpen: + return &openState{queue: queue} + case v1alpha2.QueueStateClosed: + return &closedState{queue: queue} + case v1alpha2.QueueStateClosing: + return &closingState{queue: queue} + case v1alpha2.QueueStateUnknown: + return &unknownState{queue: queue} + } + + return nil +} diff --git a/pkg/controllers/queue/state/open.go b/pkg/controllers/queue/state/open.go new file mode 100644 index 0000000000..4d7ad898f9 --- /dev/null +++ b/pkg/controllers/queue/state/open.go @@ -0,0 +1,68 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" +) + +type openState struct { + queue *v1alpha2.Queue +} + +func (os *openState) Execute(action v1alpha2.QueueAction) error { + switch action { + case v1alpha2.OpenQueueAction: + return SyncQueue(os.queue, func(status *v1alpha2.QueueStatus, podGroupList []string) { + status.State = v1alpha2.QueueStateOpen + return + }) + case v1alpha2.CloseQueueAction: + return CloseQueue(os.queue, func(status *v1alpha2.QueueStatus, podGroupList []string) { + if len(podGroupList) == 0 { + status.State = v1alpha2.QueueStateClosed + return + } + status.State = v1alpha2.QueueStateClosing + + return + }) + default: + return SyncQueue(os.queue, func(status *v1alpha2.QueueStatus, podGroupList []string) { + specState := os.queue.Spec.State + if len(specState) == 0 || specState == v1alpha2.QueueStateOpen { + status.State = v1alpha2.QueueStateOpen + return + } + + if specState == v1alpha2.QueueStateClosed { + if len(podGroupList) == 0 { + status.State = v1alpha2.QueueStateClosed + return + } + status.State = v1alpha2.QueueStateClosing + + return + } + + status.State = v1alpha2.QueueStateUnknown + return + }) + } + + return nil +} diff --git a/pkg/controllers/queue/state/unknown.go b/pkg/controllers/queue/state/unknown.go new file mode 100644 index 0000000000..89bf6bebd9 --- /dev/null +++ b/pkg/controllers/queue/state/unknown.go @@ -0,0 +1,66 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" +) + +type unknownState struct { + queue *v1alpha2.Queue +} + +func (us *unknownState) Execute(action v1alpha2.QueueAction) error { + switch action { + case v1alpha2.OpenQueueAction: + return OpenQueue(us.queue, func(status *v1alpha2.QueueStatus, podGroupList []string) { + status.State = v1alpha2.QueueStateOpen + return + }) + case v1alpha2.CloseQueueAction: + return CloseQueue(us.queue, func(status *v1alpha2.QueueStatus, podGroupList []string) { + if len(podGroupList) == 0 { + status.State = v1alpha2.QueueStateClosed + return + } + status.State = v1alpha2.QueueStateClosing + + return + }) + default: + return SyncQueue(us.queue, func(status *v1alpha2.QueueStatus, podGroupList []string) { + specState := us.queue.Spec.State + if specState == v1alpha2.QueueStateOpen { + status.State = v1alpha2.QueueStateOpen + return + } + + if specState == v1alpha2.QueueStateClosed { + if len(podGroupList) == 0 { + status.State = v1alpha2.QueueStateClosed + return + } + status.State = v1alpha2.QueueStateClosing + + return + } + + status.State = v1alpha2.QueueStateUnknown + return + }) + } +}