diff --git a/pkg/apis/scheduling/v1alpha2/types.go b/pkg/apis/scheduling/v1alpha2/types.go index 9e22531aa1a..6482467ccb8 100644 --- a/pkg/apis/scheduling/v1alpha2/types.go +++ b/pkg/apis/scheduling/v1alpha2/types.go @@ -98,6 +98,28 @@ const ( NotEnoughPodsReason string = "NotEnoughTasks" ) +// QueueEvent represent the phase of queue +type QueueEvent string + +const ( + // QueueOutOfSyncEvent is triggered if PodGroup/Queue were updated + QueueOutOfSyncEvent QueueEvent = "OutOfSync" + // QueueCommandIssuedEvent is triggered if a command is raised by user + QueueCommandIssuedEvent QueueEvent = "CommandIssued" +) + +// QueueAction is the action that queue controller will take according to the event. +type QueueAction string + +const ( + // SyncQueueAction is the action to sync queue status. + SyncQueueAction QueueAction = "SyncQueue" + // OpenQueueAction is the action to open queue + OpenQueueAction QueueAction = "OpenQueue" + // CloseQueueAction is the action to close queue + CloseQueueAction QueueAction = "CloseQueue" +) + // +genclient // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/controllers/job/job_controller.go b/pkg/controllers/job/job_controller.go index c93047895b9..10559e3d502 100644 --- a/pkg/controllers/job/job_controller.go +++ b/pkg/controllers/job/job_controller.go @@ -40,6 +40,7 @@ import ( "k8s.io/client-go/util/workqueue" batchv1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + busv1alpha1 "volcano.sh/volcano/pkg/apis/bus/v1alpha1" vcclientset "volcano.sh/volcano/pkg/client/clientset/versioned" vcscheme "volcano.sh/volcano/pkg/client/clientset/versioned/scheme" informerfactory "volcano.sh/volcano/pkg/client/informers/externalversions" @@ -126,7 +127,7 @@ func NewJobController( eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) - recorder := eventBroadcaster.NewRecorder(vcscheme.Scheme, v1.EventSource{Component: "vc-controller"}) + recorder := eventBroadcaster.NewRecorder(vcscheme.Scheme, v1.EventSource{Component: "vc-controllers"}) cc := &Controller{ kubeClient: kubeClient, @@ -155,9 +156,27 @@ func NewJobController( cc.jobSynced = cc.jobInformer.Informer().HasSynced cc.cmdInformer = informerfactory.NewSharedInformerFactory(cc.vcClient, 0).Bus().V1alpha1().Commands() - cc.cmdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: cc.addCommand, - }) + cc.cmdInformer.Informer().AddEventHandler( + cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + switch obj.(type) { + case *busv1alpha1.Command: + cmd := obj.(*busv1alpha1.Command) + if cmd.TargetObject != nil && + cmd.TargetObject.APIVersion == batchv1alpha1.SchemeGroupVersion.String() && + cmd.TargetObject.Kind == "Job" { + return true + } + + return false + default: + return false + } + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: cc.addCommand, + }, + }) cc.cmdLister = cc.cmdInformer.Lister() cc.cmdSynced = cc.cmdInformer.Informer().HasSynced diff --git a/pkg/controllers/queue/queue_controller.go b/pkg/controllers/queue/queue_controller.go index 2697d5af47f..b7f22f2f8ab 100644 --- a/pkg/controllers/queue/queue_controller.go +++ b/pkg/controllers/queue/queue_controller.go @@ -18,21 +18,40 @@ package queue import ( "fmt" - "reflect" "sync" + "time" "github.com/golang/glog" - "k8s.io/apimachinery/pkg/api/errors" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" + busv1alpha1 "volcano.sh/volcano/pkg/apis/bus/v1alpha1" schedulingv1alpha2 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" vcclientset "volcano.sh/volcano/pkg/client/clientset/versioned" + versionedscheme "volcano.sh/volcano/pkg/client/clientset/versioned/scheme" informerfactory "volcano.sh/volcano/pkg/client/informers/externalversions" + busv1alpha1informer "volcano.sh/volcano/pkg/client/informers/externalversions/bus/v1alpha1" schedulinginformer "volcano.sh/volcano/pkg/client/informers/externalversions/scheduling/v1alpha2" + busv1alpha1lister "volcano.sh/volcano/pkg/client/listers/bus/v1alpha1" schedulinglister "volcano.sh/volcano/pkg/client/listers/scheduling/v1alpha2" + queuestate "volcano.sh/volcano/pkg/controllers/queue/state" + schedulerapi "volcano.sh/volcano/pkg/scheduler/api" +) + +const ( + // maxRetries is the number of times a deployment will be retried before it is dropped out of the queue. + // With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times + // a deployment is going to be requeued: + // + // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s + maxRetries = 15 ) // Controller manages queue status. @@ -52,12 +71,24 @@ type Controller struct { pgLister schedulinglister.PodGroupLister pgSynced cache.InformerSynced + cmdInformer busv1alpha1informer.CommandInformer + cmdLister busv1alpha1lister.CommandLister + cmdSynced cache.InformerSynced + // queues that need to be updated. - queue workqueue.RateLimitingInterface + queue workqueue.RateLimitingInterface + commandQueue workqueue.RateLimitingInterface pgMutex sync.RWMutex // queue name -> podgroup namespace/name podGroups map[string]map[string]struct{} + + syncHandler func(req *schedulerapi.QueueRequest) error + syncCommandHandler func(cmd *busv1alpha1.Command) error + + enqueueQueue func(req *schedulerapi.QueueRequest) + + recorder record.EventRecorder } // NewQueueController creates a QueueController @@ -68,6 +99,11 @@ func NewQueueController( factory := informerfactory.NewSharedInformerFactory(kbClient, 0) queueInformer := factory.Scheduling().V1alpha2().Queues() pgInformer := factory.Scheduling().V1alpha2().PodGroups() + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + c := &Controller{ kubeClient: kubeClient, vcClient: kbClient, @@ -81,12 +117,17 @@ func NewQueueController( pgLister: pgInformer.Lister(), pgSynced: pgInformer.Informer().HasSynced, - queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + commandQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + podGroups: make(map[string]map[string]struct{}), + + recorder: eventBroadcaster.NewRecorder(versionedscheme.Scheme, v1.EventSource{Component: "vc-controllers"}), } queueInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.addQueue, + UpdateFunc: c.updateQueue, DeleteFunc: c.deleteQueue, }) @@ -96,22 +137,64 @@ func NewQueueController( DeleteFunc: c.deletePodGroup, }) + c.cmdInformer = informerfactory.NewSharedInformerFactory(c.vcClient, 0).Bus().V1alpha1().Commands() + c.cmdInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + switch obj.(type) { + case *busv1alpha1.Command: + cmd := obj.(*busv1alpha1.Command) + if cmd.TargetObject != nil && + cmd.TargetObject.APIVersion == schedulingv1alpha2.SchemeGroupVersion.String() && + cmd.TargetObject.Kind == "Queue" { + return true + } + + return false + default: + return false + } + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: c.addCommand, + }, + }) + c.cmdLister = c.cmdInformer.Lister() + c.cmdSynced = c.cmdInformer.Informer().HasSynced + + queuestate.SyncQueue = c.syncQueue + queuestate.OpenQueue = c.openQueue + queuestate.CloseQueue = c.closeQueue + + c.syncHandler = c.handleQueue + c.syncCommandHandler = c.handleCommand + + c.enqueueQueue = c.enqueue + return c } // Run starts QueueController func (c *Controller) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + defer c.commandQueue.ShutDown() + + glog.Infof("Starting queue controller") + defer glog.Infof("Shutting down queue controller") go c.queueInformer.Informer().Run(stopCh) go c.pgInformer.Informer().Run(stopCh) + go c.cmdInformer.Informer().Run(stopCh) - if !cache.WaitForCacheSync(stopCh, c.queueSynced, c.pgSynced) { + if !cache.WaitForCacheSync(stopCh, c.queueSynced, c.pgSynced, c.cmdSynced) { glog.Errorf("unable to sync caches for queue controller") return } go wait.Until(c.worker, 0, stopCh) - glog.Infof("QueueController is running ...... ") + go wait.Until(c.commandWorker, 0, stopCh) + + <-stopCh } // worker runs a worker thread that just dequeues items, processes them, and @@ -124,168 +207,131 @@ func (c *Controller) worker() { } func (c *Controller) processNextWorkItem() bool { - eKey, quit := c.queue.Get() - if quit { + obj, shutdown := c.queue.Get() + if shutdown { return false } - defer c.queue.Done(eKey) + defer c.queue.Done(obj) - if err := c.syncQueue(eKey.(string)); err != nil { - glog.V(2).Infof("Error syncing queues %q, retrying. Error: %v", eKey, err) - c.queue.AddRateLimited(eKey) + req, ok := obj.(*schedulerapi.QueueRequest) + if !ok { + glog.V(2).Infof("%v is not a valid queue request struct", obj) return true } - c.queue.Forget(eKey) - return true -} - -func (c *Controller) getPodGroups(key string) ([]string, error) { - c.pgMutex.RLock() - defer c.pgMutex.RUnlock() + err := c.syncHandler(req) + c.handleQueueErr(err, obj) - if c.podGroups[key] == nil { - return nil, fmt.Errorf("queue %s has not been seen or deleted", key) - } - podGroups := make([]string, 0, len(c.podGroups[key])) - for pgKey := range c.podGroups[key] { - podGroups = append(podGroups, pgKey) - } - - return podGroups, nil + return true } -func (c *Controller) syncQueue(key string) error { - glog.V(4).Infof("Begin sync queue %s", key) +func (c *Controller) handleQueue(req *schedulerapi.QueueRequest) error { + startTime := time.Now() + defer func() { + glog.V(4).Infof("Finished syncing queue %s (%v)", req.Name, time.Since(startTime)) + }() - podGroups, err := c.getPodGroups(key) + queue, err := c.queueLister.Get(req.Name) if err != nil { - return err - } - - queueStatus := schedulingv1alpha2.QueueStatus{} - - for _, pgKey := range podGroups { - // Ignore error here, tt can not occur. - ns, name, _ := cache.SplitMetaNamespaceKey(pgKey) - - // TODO: check NotFound error and sync local cache. - pg, err := c.pgLister.PodGroups(ns).Get(name) - if err != nil { - return err + if apierrors.IsNotFound(err) { + glog.V(4).Infof("Queue %s has been deleted", req.Name) + return nil } - switch pg.Status.Phase { - case schedulingv1alpha2.PodGroupPending: - queueStatus.Pending++ - case schedulingv1alpha2.PodGroupRunning: - queueStatus.Running++ - case schedulingv1alpha2.PodGroupUnknown: - queueStatus.Unknown++ - case schedulingv1alpha2.PodGroupInqueue: - queueStatus.Inqueue++ - } + return fmt.Errorf("get queue %s failed for %v", req.Name, err) } - queue, err := c.queueLister.Get(key) - if err != nil { - if errors.IsNotFound(err) { - glog.V(2).Infof("queue %s has been deleted", key) - return nil - } - // TODO: do not retry to syncQueue for this error - return err + queueState := queuestate.NewState(queue) + if queueState == nil { + return fmt.Errorf("queue %s state %s is invalid", queue.Name, queue.Status.State) } - // ignore update when status does not change - if reflect.DeepEqual(queueStatus, queue.Status) { - return nil + if err := queueState.Execute(req.Action); err != nil { + return fmt.Errorf("sync queue %s failed for %v, event is %v, action is %s", + req.Name, err, req.Event, req.Action) } - newQueue := queue.DeepCopy() - newQueue.Status = queueStatus + return nil +} + +func (c *Controller) handleQueueErr(err error, obj interface{}) { + if err == nil { + c.queue.Forget(obj) + return + } - if _, err := c.vcClient.SchedulingV1alpha2().Queues().UpdateStatus(newQueue); err != nil { - glog.Errorf("Failed to update status of Queue %s: %v", newQueue.Name, err) - return err + if c.queue.NumRequeues(obj) < maxRetries { + glog.V(2).Infof("Error syncing queue %v for %v", obj, err) + c.queue.AddRateLimited(obj) + return } - return nil + glog.V(2).Infof("Dropping queue %v out of the queue for %v", obj, err) + c.queue.Forget(obj) } -func (c *Controller) addQueue(obj interface{}) { - queue := obj.(*schedulingv1alpha2.Queue) - c.queue.Add(queue.Name) +func (c *Controller) commandWorker() { + for c.processNextCommand() { + } } -func (c *Controller) deleteQueue(obj interface{}) { - queue, ok := obj.(*schedulingv1alpha2.Queue) +func (c *Controller) processNextCommand() bool { + obj, shutdown := c.commandQueue.Get() + if shutdown { + return false + } + defer c.commandQueue.Done(obj) + + cmd, ok := obj.(*busv1alpha1.Command) if !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - glog.Errorf("Couldn't get object from tombstone %#v", obj) - return - } - queue, ok = tombstone.Obj.(*schedulingv1alpha2.Queue) - if !ok { - glog.Errorf("Tombstone contained object that is not a Queue: %#v", obj) - return - } + glog.V(2).Infof("%v is not a valid Command struct", obj) + return true } - c.pgMutex.Lock() - defer c.pgMutex.Unlock() - delete(c.podGroups, queue.Name) + err := c.syncCommandHandler(cmd) + c.handleCommandErr(err, obj) + + return true } -func (c *Controller) addPodGroup(obj interface{}) { - pg := obj.(*schedulingv1alpha2.PodGroup) - key, _ := cache.MetaNamespaceKeyFunc(obj) +func (c *Controller) handleCommand(cmd *busv1alpha1.Command) error { + startTime := time.Now() + defer func() { + glog.V(4).Infof("Finished syncing command %s/%s (%v)", cmd.Namespace, cmd.Name, time.Since(startTime)) + }() - c.pgMutex.Lock() - defer c.pgMutex.Unlock() + err := c.vcClient.BusV1alpha1().Commands(cmd.Namespace).Delete(cmd.Name, nil) + if err != nil { + if true == apierrors.IsNotFound(err) { + return nil + } - if c.podGroups[pg.Spec.Queue] == nil { - c.podGroups[pg.Spec.Queue] = make(map[string]struct{}) + return fmt.Errorf("failed to delete command <%s/%s> for %v", cmd.Namespace, cmd.Name, err) } - c.podGroups[pg.Spec.Queue][key] = struct{}{} - // enqueue - c.queue.Add(pg.Spec.Queue) -} + req := &schedulerapi.QueueRequest{ + Name: cmd.TargetObject.Name, + Event: schedulingv1alpha2.QueueCommandIssuedEvent, + Action: schedulingv1alpha2.QueueAction(cmd.Action), + } -func (c *Controller) updatePodGroup(old, new interface{}) { - oldPG := old.(*schedulingv1alpha2.PodGroup) - newPG := new.(*schedulingv1alpha2.PodGroup) + c.enqueueQueue(req) - // Note: we have no use case update PodGroup.Spec.Queue - // So do not consider it here. - if oldPG.Status.Phase != newPG.Status.Phase { - c.queue.Add(newPG.Spec.Queue) - } + return nil } -func (c *Controller) deletePodGroup(obj interface{}) { - pg, ok := obj.(*schedulingv1alpha2.PodGroup) - if !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - glog.Errorf("Couldn't get object from tombstone %#v", obj) - return - } - pg, ok = tombstone.Obj.(*schedulingv1alpha2.PodGroup) - if !ok { - glog.Errorf("Tombstone contained object that is not a PodGroup: %#v", obj) - return - } +func (c *Controller) handleCommandErr(err error, obj interface{}) { + if err == nil { + c.commandQueue.Forget(obj) + return } - key, _ := cache.MetaNamespaceKeyFunc(obj) - - c.pgMutex.Lock() - defer c.pgMutex.Unlock() - - delete(c.podGroups[pg.Spec.Queue], key) + if c.commandQueue.NumRequeues(obj) < maxRetries { + glog.V(2).Infof("Error syncing command %v for %v", obj, err) + c.commandQueue.AddRateLimited(obj) + return + } - c.queue.Add(pg.Spec.Queue) + glog.V(2).Infof("Dropping command %v out of the queue for %v", obj, err) + c.commandQueue.Forget(obj) } diff --git a/pkg/controllers/queue/queue_controller_handler.go b/pkg/controllers/queue/queue_controller_handler.go new file mode 100644 index 00000000000..09739c41fed --- /dev/null +++ b/pkg/controllers/queue/queue_controller_handler.go @@ -0,0 +1,271 @@ +/* +Copyright 2019 The Volcano Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "fmt" + "reflect" + + busv1alpha1 "volcano.sh/volcano/pkg/apis/bus/v1alpha1" + schedulingv1alpha2 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" + schedulingapi "volcano.sh/volcano/pkg/scheduler/api" + + "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" + + "github.com/golang/glog" +) + +func (c *Controller) syncQueue(queue *schedulingv1alpha2.Queue) error { + glog.V(4).Infof("Begin sync queue %s", queue.Name) + + podGroups := c.getPodGroups(queue.Name) + queueStatus := schedulingv1alpha2.QueueStatus{} + + for _, pgKey := range podGroups { + // Ignore error here, tt can not occur. + ns, name, _ := cache.SplitMetaNamespaceKey(pgKey) + + // TODO: check NotFound error and sync local cache. + pg, err := c.pgLister.PodGroups(ns).Get(name) + if err != nil { + return err + } + + switch pg.Status.Phase { + case schedulingv1alpha2.PodGroupPending: + queueStatus.Pending++ + case schedulingv1alpha2.PodGroupRunning: + queueStatus.Running++ + case schedulingv1alpha2.PodGroupUnknown: + queueStatus.Unknown++ + case schedulingv1alpha2.PodGroupInqueue: + queueStatus.Inqueue++ + } + } + + // If the `state` value is empty, the status of queue will be set as `Open` + // If the `state` value is `Open`, then the status of queue will also be `Open` + // If the `state` value is `Closed`, then we need to further consider whether there + // is a podgroup under the queue. if there is a podgroup under the queue, the status + // of the queue will be set as `Closing`, while if there is no podgroup under the queue, + // the status of queue will be set as `Stopped`. + queueStatus.State = queue.Spec.State + if len(queueStatus.State) == 0 { + queueStatus.State = schedulingv1alpha2.QueueStateOpen + } + + if queueStatus.State == schedulingv1alpha2.QueueStateClosed { + if len(podGroups) != 0 { + queueStatus.State = schedulingv1alpha2.QueueStateClosing + } + } + + // ignore update when status does not change + if reflect.DeepEqual(queueStatus, queue.Status) { + return nil + } + + newQueue := queue.DeepCopy() + newQueue.Status = queueStatus + + if _, err := c.vcClient.SchedulingV1alpha2().Queues().UpdateStatus(newQueue); err != nil { + glog.Errorf("Failed to update status of Queue %s: %v", newQueue.Name, err) + return err + } + + return nil +} + +func (c *Controller) openQueue(queue *schedulingv1alpha2.Queue) error { + glog.V(4).Infof("Begin open queue %s", queue.Name) + + queue.Spec.State = schedulingv1alpha2.QueueStateOpen + + if _, err := c.vcClient.SchedulingV1alpha2().Queues().Update(queue); err != nil { + c.recorder.Event(queue, v1.EventTypeNormal, string(schedulingv1alpha2.OpenQueueAction), + fmt.Sprintf("Open queue failed for %v", err)) + return err + } + c.recorder.Event(queue, v1.EventTypeNormal, string(schedulingv1alpha2.OpenQueueAction), + fmt.Sprintf("Open queue succeed")) + + return nil +} + +func (c *Controller) closeQueue(queue *schedulingv1alpha2.Queue) error { + glog.V(4).Infof("Begin close queue %s", queue.Name) + + queue.Spec.State = schedulingv1alpha2.QueueStateClosed + + if _, err := c.vcClient.SchedulingV1alpha2().Queues().Update(queue); err != nil { + c.recorder.Event(queue, v1.EventTypeNormal, string(schedulingv1alpha2.CloseQueueAction), + fmt.Sprintf("Close queue failed for %v", err)) + glog.Errorf("Failed to close queue %s: %v", queue.Name, err) + return err + } + c.recorder.Event(queue, v1.EventTypeNormal, string(schedulingv1alpha2.OpenQueueAction), + fmt.Sprintf("Close queue succeed")) + + return nil +} + +func (c *Controller) enqueue(req *schedulingapi.QueueRequest) { + c.queue.Add(req) +} + +func (c *Controller) addQueue(obj interface{}) { + queue := obj.(*schedulingv1alpha2.Queue) + + req := &schedulingapi.QueueRequest{ + Name: queue.Name, + + Event: schedulingv1alpha2.QueueOutOfSyncEvent, + Action: schedulingv1alpha2.SyncQueueAction, + } + + c.enqueue(req) +} + +func (c *Controller) deleteQueue(obj interface{}) { + queue, ok := obj.(*schedulingv1alpha2.Queue) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("Couldn't get object from tombstone %#v", obj) + return + } + queue, ok = tombstone.Obj.(*schedulingv1alpha2.Queue) + if !ok { + glog.Errorf("Tombstone contained object that is not a Queue: %#v", obj) + return + } + } + + c.pgMutex.Lock() + defer c.pgMutex.Unlock() + delete(c.podGroups, queue.Name) +} + +func (c *Controller) updateQueue(old, new interface{}) { + oldQueue, ok := old.(*schedulingv1alpha2.Queue) + if !ok { + glog.Errorf("Can not covert old object %v to queues.scheduling.sigs.dev", old) + return + } + + newQueue, ok := new.(*schedulingv1alpha2.Queue) + if !ok { + glog.Errorf("Can not covert new object %v to queues.scheduling.sigs.dev", old) + return + } + + if oldQueue.ResourceVersion == newQueue.ResourceVersion { + return + } + + c.addQueue(newQueue) + + return +} + +func (c *Controller) addPodGroup(obj interface{}) { + pg := obj.(*schedulingv1alpha2.PodGroup) + key, _ := cache.MetaNamespaceKeyFunc(obj) + + c.pgMutex.Lock() + defer c.pgMutex.Unlock() + + if c.podGroups[pg.Spec.Queue] == nil { + c.podGroups[pg.Spec.Queue] = make(map[string]struct{}) + } + c.podGroups[pg.Spec.Queue][key] = struct{}{} + + req := &schedulingapi.QueueRequest{ + Name: pg.Spec.Queue, + + Event: schedulingv1alpha2.QueueOutOfSyncEvent, + Action: schedulingv1alpha2.SyncQueueAction, + } + + c.enqueue(req) +} + +func (c *Controller) updatePodGroup(old, new interface{}) { + oldPG := old.(*schedulingv1alpha2.PodGroup) + newPG := new.(*schedulingv1alpha2.PodGroup) + + // Note: we have no use case update PodGroup.Spec.Queue + // So do not consider it here. + if oldPG.Status.Phase != newPG.Status.Phase { + c.addPodGroup(newPG) + } +} + +func (c *Controller) deletePodGroup(obj interface{}) { + pg, ok := obj.(*schedulingv1alpha2.PodGroup) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("Couldn't get object from tombstone %#v", obj) + return + } + pg, ok = tombstone.Obj.(*schedulingv1alpha2.PodGroup) + if !ok { + glog.Errorf("Tombstone contained object that is not a PodGroup: %#v", obj) + return + } + } + + key, _ := cache.MetaNamespaceKeyFunc(obj) + + c.pgMutex.Lock() + defer c.pgMutex.Unlock() + + delete(c.podGroups[pg.Spec.Queue], key) + + req := &schedulingapi.QueueRequest{ + Name: pg.Spec.Queue, + + Event: schedulingv1alpha2.QueueOutOfSyncEvent, + Action: schedulingv1alpha2.SyncQueueAction, + } + + c.enqueue(req) +} + +func (c *Controller) addCommand(obj interface{}) { + cmd, ok := obj.(*busv1alpha1.Command) + if !ok { + glog.Errorf("Obj %v is not command", obj) + return + } + + c.commandQueue.Add(cmd) +} + +func (c *Controller) getPodGroups(key string) []string { + c.pgMutex.RLock() + defer c.pgMutex.RUnlock() + + if c.podGroups[key] == nil { + return nil + } + podGroups := make([]string, 0, len(c.podGroups[key])) + for pgKey := range c.podGroups[key] { + podGroups = append(podGroups, pgKey) + } + + return podGroups +} diff --git a/pkg/controllers/queue/queue_controller_test.go b/pkg/controllers/queue/queue_controller_test.go index 85b94e63726..1b32cd39f3f 100644 --- a/pkg/controllers/queue/queue_controller_test.go +++ b/pkg/controllers/queue/queue_controller_test.go @@ -270,7 +270,7 @@ func TestSyncQueue(t *testing.T) { c.queueInformer.Informer().GetIndexer().Add(testcase.queue) c.vcClient.SchedulingV1alpha2().Queues().Create(testcase.queue) - err := c.syncQueue(testcase.queue.Name) + err := c.syncQueue(testcase.queue) item, _ := c.vcClient.SchedulingV1alpha2().Queues().Get(testcase.queue.Name, metav1.GetOptions{}) if err != nil && testcase.ExpectValue != item.Status.Pending { t.Errorf("case %d (%s): expected: %v, got %v ", i, testcase.Name, testcase.ExpectValue, c.queue.Len()) diff --git a/pkg/controllers/queue/state/closed.go b/pkg/controllers/queue/state/closed.go new file mode 100644 index 00000000000..a413c1d49e4 --- /dev/null +++ b/pkg/controllers/queue/state/closed.go @@ -0,0 +1,35 @@ +/* +Copyright 2019 The Volcano Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" +) + +type closedState struct { + queue *v1alpha2.Queue +} + +func (cs *closedState) Execute(action v1alpha2.QueueAction) error { + switch action { + case v1alpha2.OpenQueueAction: + return OpenQueue(cs.queue) + case v1alpha2.CloseQueueAction: + return SyncQueue(cs.queue) + default: + return SyncQueue(cs.queue) + } + + return nil +} diff --git a/pkg/controllers/queue/state/closing.go b/pkg/controllers/queue/state/closing.go new file mode 100644 index 00000000000..fa9901f7ff2 --- /dev/null +++ b/pkg/controllers/queue/state/closing.go @@ -0,0 +1,35 @@ +/* +Copyright 2019 The Volcano Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" +) + +type closingState struct { + queue *v1alpha2.Queue +} + +func (cs *closingState) Execute(action v1alpha2.QueueAction) error { + switch action { + case v1alpha2.OpenQueueAction: + return OpenQueue(cs.queue) + case v1alpha2.CloseQueueAction: + return SyncQueue(cs.queue) + default: + return SyncQueue(cs.queue) + } + + return nil +} diff --git a/pkg/controllers/queue/state/factory.go b/pkg/controllers/queue/state/factory.go new file mode 100644 index 00000000000..5ebd8ea2958 --- /dev/null +++ b/pkg/controllers/queue/state/factory.go @@ -0,0 +1,50 @@ +/* +Copyright 2019 The Volcano Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" +) + +//State interface +type State interface { + // Execute executes the actions based on current state. + Execute(action v1alpha2.QueueAction) error +} + +//ActionFn will open or close queue. +type ActionFn func(queue *v1alpha2.Queue) error + +var ( + // SyncQueue will sync queue status. + SyncQueue ActionFn + // OpenQueue will set state of queue to open + OpenQueue ActionFn + // CloseQueue will set state of queue to close + CloseQueue ActionFn +) + +//NewState gets the state from queue status +func NewState(queue *v1alpha2.Queue) State { + switch queue.Status.State { + case "", v1alpha2.QueueStateOpen: + return &openState{queue: queue} + case v1alpha2.QueueStateClosed: + return &closedState{queue: queue} + case v1alpha2.QueueStateClosing: + return &closingState{queue: queue} + } + + return nil +} diff --git a/pkg/controllers/queue/state/open.go b/pkg/controllers/queue/state/open.go new file mode 100644 index 00000000000..b1af2bf0ad0 --- /dev/null +++ b/pkg/controllers/queue/state/open.go @@ -0,0 +1,35 @@ +/* +Copyright 2019 The Volcano Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" +) + +type openState struct { + queue *v1alpha2.Queue +} + +func (os *openState) Execute(action v1alpha2.QueueAction) error { + switch action { + case v1alpha2.OpenQueueAction: + return SyncQueue(os.queue) + case v1alpha2.CloseQueueAction: + return CloseQueue(os.queue) + default: + return SyncQueue(os.queue) + } + + return nil +} diff --git a/pkg/scheduler/api/queue_info.go b/pkg/scheduler/api/queue_info.go index 3694adbffe8..e781264bacb 100644 --- a/pkg/scheduler/api/queue_info.go +++ b/pkg/scheduler/api/queue_info.go @@ -20,6 +20,7 @@ import ( "k8s.io/apimachinery/pkg/types" "volcano.sh/volcano/pkg/apis/scheduling" + "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" ) // QueueID is UID type, serves as unique ID for each queue @@ -56,3 +57,14 @@ func (q *QueueInfo) Clone() *QueueInfo { Queue: q.Queue, } } + +//QueueRequest struct +type QueueRequest struct { + // Name is queue name + Name string + + // Event is event of queue + Event v1alpha2.QueueEvent + // Action is action to be performed + Action v1alpha2.QueueAction +}