diff --git a/pkg/apis/scheduling/v1alpha1/types.go b/pkg/apis/scheduling/v1alpha1/types.go index 8ddf64259..c8ff2c7db 100644 --- a/pkg/apis/scheduling/v1alpha1/types.go +++ b/pkg/apis/scheduling/v1alpha1/types.go @@ -114,6 +114,15 @@ type PodGroupSpec struct { // Queue defines the queue to allocate resource for PodGroup; if queue does not exist, // the PodGroup will not be scheduled. Queue string `json:"queue,omitempty" protobuf:"bytes,2,opt,name=queue"` + + // If specified, indicates the PodGroup's priority. "system-node-critical" and + // "system-cluster-critical" are two special keywords which indicate the + // highest priorities with the former being the highest priority. Any other + // name must be defined by creating a PriorityClass object with that name. + // If not specified, the PodGroup priority will be default or zero if there is no + // default. + // +optional + PriorityClassName string `json:"priorityClassName,omitempty" protobuf:"bytes,3,opt,name=priorityClassName"` } // PodGroupStatus represents the current state of a pod group. diff --git a/pkg/scheduler/api/job_info.go b/pkg/scheduler/api/job_info.go index 4e9afd801..e773e53f9 100644 --- a/pkg/scheduler/api/job_info.go +++ b/pkg/scheduler/api/job_info.go @@ -124,7 +124,7 @@ type JobInfo struct { Queue QueueID - Priority int + Priority int32 NodeSelector map[string]string MinAvailable int32 diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index dea066285..e054ba813 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -25,6 +25,7 @@ import ( "github.com/golang/glog" "k8s.io/api/core/v1" + "k8s.io/api/scheduling/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -32,6 +33,7 @@ import ( "k8s.io/client-go/informers" infov1 "k8s.io/client-go/informers/core/v1" policyv1 "k8s.io/client-go/informers/policy/v1beta1" + schedv1 "k8s.io/client-go/informers/scheduling/v1beta1" storagev1 "k8s.io/client-go/informers/storage/v1" "k8s.io/client-go/kubernetes" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -82,6 +84,7 @@ type SchedulerCache struct { pvInformer infov1.PersistentVolumeInformer pvcInformer infov1.PersistentVolumeClaimInformer scInformer storagev1.StorageClassInformer + pcInformer schedv1.PriorityClassInformer Binder Binder Evictor Evictor @@ -90,9 +93,12 @@ type SchedulerCache struct { Recorder record.EventRecorder - Jobs map[kbapi.JobID]*kbapi.JobInfo - Nodes map[string]*kbapi.NodeInfo - Queues map[kbapi.QueueID]*kbapi.QueueInfo + Jobs map[kbapi.JobID]*kbapi.JobInfo + Nodes map[string]*kbapi.NodeInfo + Queues map[kbapi.QueueID]*kbapi.QueueInfo + PriorityClasses map[string]*v1beta1.PriorityClass + defaultPriorityClass *v1beta1.PriorityClass + defaultPriority int32 errTasks workqueue.RateLimitingInterface deletedJobs workqueue.RateLimitingInterface @@ -179,14 +185,15 @@ func (dvb *defaultVolumeBinder) BindVolumes(task *api.TaskInfo) error { func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue string) *SchedulerCache { sc := &SchedulerCache{ - Jobs: make(map[kbapi.JobID]*kbapi.JobInfo), - Nodes: make(map[string]*kbapi.NodeInfo), - Queues: make(map[kbapi.QueueID]*kbapi.QueueInfo), - errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - deletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - kubeclient: kubernetes.NewForConfigOrDie(config), - kbclient: kbver.NewForConfigOrDie(config), - defaultQueue: defaultQueue, + Jobs: make(map[kbapi.JobID]*kbapi.JobInfo), + Nodes: make(map[string]*kbapi.NodeInfo), + Queues: make(map[kbapi.QueueID]*kbapi.QueueInfo), + PriorityClasses: make(map[string]*v1beta1.PriorityClass), + errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + deletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + kubeclient: kubernetes.NewForConfigOrDie(config), + kbclient: kbver.NewForConfigOrDie(config), + defaultQueue: defaultQueue, } // Prepare event clients. @@ -263,6 +270,13 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s DeleteFunc: sc.DeletePDB, }) + sc.pcInformer = informerFactory.Scheduling().V1beta1().PriorityClasses() + sc.pcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: sc.AddPriorityClass, + UpdateFunc: sc.UpdatePriorityClass, + DeleteFunc: sc.DeletePriorityClass, + }) + kbinformer := kbinfo.NewSharedInformerFactory(sc.kbclient, 0) // create informer for PodGroup information sc.podGroupInformer = kbinformer.Scheduling().V1alpha1().PodGroups() @@ -292,6 +306,7 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) { go sc.pvcInformer.Informer().Run(stopCh) go sc.scInformer.Informer().Run(stopCh) go sc.queueInformer.Informer().Run(stopCh) + go sc.pcInformer.Informer().Run(stopCh) // Re-sync error tasks. go wait.Until(sc.processResyncTask, 0, stopCh) @@ -311,6 +326,7 @@ func (sc *SchedulerCache) WaitForCacheSync(stopCh <-chan struct{}) bool { sc.pvcInformer.Informer().HasSynced, sc.scInformer.Informer().HasSynced, sc.queueInformer.Informer().HasSynced, + sc.pcInformer.Informer().HasSynced, ) } @@ -527,6 +543,15 @@ func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo { continue } + if value.PodGroup != nil { + value.Priority = sc.defaultPriority + + priName := value.PodGroup.Spec.PriorityClassName + if priorityClass, found := sc.PriorityClasses[priName]; found { + value.Priority = priorityClass.Value + } + } + snapshot.Jobs[value.UID] = value.Clone() } diff --git a/pkg/scheduler/cache/event_handlers.go b/pkg/scheduler/cache/event_handlers.go index 39c77a72c..9f2db863d 100644 --- a/pkg/scheduler/cache/event_handlers.go +++ b/pkg/scheduler/cache/event_handlers.go @@ -18,6 +18,7 @@ package cache import ( "fmt" + "k8s.io/api/scheduling/v1beta1" "reflect" "github.com/golang/glog" @@ -671,3 +672,97 @@ func (sc *SchedulerCache) deleteQueue(queue *kbv1.Queue) error { return nil } + +func (sc *SchedulerCache) DeletePriorityClass(obj interface{}) { + var ss *v1beta1.PriorityClass + switch t := obj.(type) { + case *v1beta1.PriorityClass: + ss = t + case cache.DeletedFinalStateUnknown: + var ok bool + ss, ok = t.Obj.(*v1beta1.PriorityClass) + if !ok { + glog.Errorf("Cannot convert to *v1beta1.PriorityClass: %v", t.Obj) + return + } + default: + glog.Errorf("Cannot convert to *v1beta1.PriorityClass: %v", t) + return + } + + sc.Mutex.Lock() + defer sc.Mutex.Unlock() + + sc.deletePriorityClass(ss) +} + +func (sc *SchedulerCache) UpdatePriorityClass(oldObj, newObj interface{}) { + oldSS, ok := oldObj.(*v1beta1.PriorityClass) + if !ok { + glog.Errorf("Cannot convert oldObj to *v1beta1.PriorityClass: %v", oldObj) + + return + + } + + newSS, ok := newObj.(*v1beta1.PriorityClass) + if !ok { + glog.Errorf("Cannot convert newObj to *v1beta1.PriorityClass: %v", newObj) + + return + + } + + sc.Mutex.Lock() + defer sc.Mutex.Unlock() + + sc.deletePriorityClass(oldSS) + sc.addPriorityClass(newSS) +} + +func (sc *SchedulerCache) AddPriorityClass(obj interface{}) { + var ss *v1beta1.PriorityClass + switch t := obj.(type) { + case *v1beta1.PriorityClass: + ss = t + case cache.DeletedFinalStateUnknown: + var ok bool + ss, ok = t.Obj.(*v1beta1.PriorityClass) + if !ok { + glog.Errorf("Cannot convert to *v1beta1.PriorityClass: %v", t.Obj) + return + } + default: + glog.Errorf("Cannot convert to *v1beta1.PriorityClass: %v", t) + return + } + + sc.Mutex.Lock() + defer sc.Mutex.Unlock() + + sc.addPriorityClass(ss) +} + +func (sc *SchedulerCache) deletePriorityClass(pc *v1beta1.PriorityClass) { + if pc.GlobalDefault { + sc.defaultPriorityClass = nil + sc.defaultPriority = 0 + + } + + delete(sc.PriorityClasses, pc.Name) +} + +func (sc *SchedulerCache) addPriorityClass(pc *v1beta1.PriorityClass) { + if pc.GlobalDefault { + if sc.defaultPriorityClass != nil { + glog.Errorf("Updated default priority class from <%s> to <%s> forcefully.", + sc.defaultPriorityClass.Name, pc.Name) + + } + sc.defaultPriorityClass = pc + sc.defaultPriority = pc.Value + } + + sc.PriorityClasses[pc.Name] = pc +}