Skip to content
This repository has been archived by the owner on May 25, 2023. It is now read-only.

Commit

Permalink
Added PriorityClass to PodGroup.
Browse files Browse the repository at this point in the history
Signed-off-by: Da K. Ma <klaus1982.cn@gmail.com>
  • Loading branch information
k82cn committed Mar 4, 2019
1 parent 1c750a0 commit 3c929a8
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 12 deletions.
9 changes: 9 additions & 0 deletions pkg/apis/scheduling/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ type PodGroupSpec struct {
// Queue defines the queue to allocate resource for PodGroup; if queue does not exist,
// the PodGroup will not be scheduled.
Queue string `json:"queue,omitempty" protobuf:"bytes,2,opt,name=queue"`

// If specified, indicates the PodGroup's priority. "system-node-critical" and
// "system-cluster-critical" are two special keywords which indicate the
// highest priorities with the former being the highest priority. Any other
// name must be defined by creating a PriorityClass object with that name.
// If not specified, the PodGroup priority will be default or zero if there is no
// default.
// +optional
PriorityClassName string `json:"priorityClassName,omitempty" protobuf:"bytes,3,opt,name=priorityClassName"`
}

// PodGroupStatus represents the current state of a pod group.
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ type JobInfo struct {

Queue QueueID

Priority int
Priority int32

NodeSelector map[string]string
MinAvailable int32
Expand Down
47 changes: 36 additions & 11 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ import (
"github.com/golang/glog"

"k8s.io/api/core/v1"
"k8s.io/api/scheduling/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
infov1 "k8s.io/client-go/informers/core/v1"
policyv1 "k8s.io/client-go/informers/policy/v1beta1"
schedv1 "k8s.io/client-go/informers/scheduling/v1beta1"
storagev1 "k8s.io/client-go/informers/storage/v1"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down Expand Up @@ -82,6 +84,7 @@ type SchedulerCache struct {
pvInformer infov1.PersistentVolumeInformer
pvcInformer infov1.PersistentVolumeClaimInformer
scInformer storagev1.StorageClassInformer
pcInformer schedv1.PriorityClassInformer

Binder Binder
Evictor Evictor
Expand All @@ -90,9 +93,12 @@ type SchedulerCache struct {

Recorder record.EventRecorder

Jobs map[kbapi.JobID]*kbapi.JobInfo
Nodes map[string]*kbapi.NodeInfo
Queues map[kbapi.QueueID]*kbapi.QueueInfo
Jobs map[kbapi.JobID]*kbapi.JobInfo
Nodes map[string]*kbapi.NodeInfo
Queues map[kbapi.QueueID]*kbapi.QueueInfo
PriorityClasses map[string]*v1beta1.PriorityClass
defaultPriorityClass *v1beta1.PriorityClass
defaultPriority int32

errTasks workqueue.RateLimitingInterface
deletedJobs workqueue.RateLimitingInterface
Expand Down Expand Up @@ -179,14 +185,15 @@ func (dvb *defaultVolumeBinder) BindVolumes(task *api.TaskInfo) error {

func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue string) *SchedulerCache {
sc := &SchedulerCache{
Jobs: make(map[kbapi.JobID]*kbapi.JobInfo),
Nodes: make(map[string]*kbapi.NodeInfo),
Queues: make(map[kbapi.QueueID]*kbapi.QueueInfo),
errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
deletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
kubeclient: kubernetes.NewForConfigOrDie(config),
kbclient: kbver.NewForConfigOrDie(config),
defaultQueue: defaultQueue,
Jobs: make(map[kbapi.JobID]*kbapi.JobInfo),
Nodes: make(map[string]*kbapi.NodeInfo),
Queues: make(map[kbapi.QueueID]*kbapi.QueueInfo),
PriorityClasses: make(map[string]*v1beta1.PriorityClass),
errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
deletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
kubeclient: kubernetes.NewForConfigOrDie(config),
kbclient: kbver.NewForConfigOrDie(config),
defaultQueue: defaultQueue,
}

// Prepare event clients.
Expand Down Expand Up @@ -263,6 +270,13 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s
DeleteFunc: sc.DeletePDB,
})

sc.pcInformer = informerFactory.Scheduling().V1beta1().PriorityClasses()
sc.pcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sc.AddPriorityClass,
UpdateFunc: sc.UpdatePriorityClass,
DeleteFunc: sc.DeletePriorityClass,
})

kbinformer := kbinfo.NewSharedInformerFactory(sc.kbclient, 0)
// create informer for PodGroup information
sc.podGroupInformer = kbinformer.Scheduling().V1alpha1().PodGroups()
Expand Down Expand Up @@ -292,6 +306,7 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
go sc.pvcInformer.Informer().Run(stopCh)
go sc.scInformer.Informer().Run(stopCh)
go sc.queueInformer.Informer().Run(stopCh)
go sc.pcInformer.Informer().Run(stopCh)

// Re-sync error tasks.
go wait.Until(sc.processResyncTask, 0, stopCh)
Expand All @@ -311,6 +326,7 @@ func (sc *SchedulerCache) WaitForCacheSync(stopCh <-chan struct{}) bool {
sc.pvcInformer.Informer().HasSynced,
sc.scInformer.Informer().HasSynced,
sc.queueInformer.Informer().HasSynced,
sc.pcInformer.Informer().HasSynced,
)
}

Expand Down Expand Up @@ -527,6 +543,15 @@ func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo {
continue
}

if value.PodGroup != nil {
value.Priority = sc.defaultPriority

priName := value.PodGroup.Spec.PriorityClassName
if priorityClass, found := sc.PriorityClasses[priName]; found {
value.Priority = priorityClass.Value
}
}

snapshot.Jobs[value.UID] = value.Clone()
}

Expand Down
95 changes: 95 additions & 0 deletions pkg/scheduler/cache/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cache

import (
"fmt"
"k8s.io/api/scheduling/v1beta1"
"reflect"

"github.com/golang/glog"
Expand Down Expand Up @@ -671,3 +672,97 @@ func (sc *SchedulerCache) deleteQueue(queue *kbv1.Queue) error {

return nil
}

func (sc *SchedulerCache) DeletePriorityClass(obj interface{}) {
var ss *v1beta1.PriorityClass
switch t := obj.(type) {
case *v1beta1.PriorityClass:
ss = t
case cache.DeletedFinalStateUnknown:
var ok bool
ss, ok = t.Obj.(*v1beta1.PriorityClass)
if !ok {
glog.Errorf("Cannot convert to *v1beta1.PriorityClass: %v", t.Obj)
return
}
default:
glog.Errorf("Cannot convert to *v1beta1.PriorityClass: %v", t)
return
}

sc.Mutex.Lock()
defer sc.Mutex.Unlock()

sc.deletePriorityClass(ss)
}

func (sc *SchedulerCache) UpdatePriorityClass(oldObj, newObj interface{}) {
oldSS, ok := oldObj.(*v1beta1.PriorityClass)
if !ok {
glog.Errorf("Cannot convert oldObj to *v1beta1.PriorityClass: %v", oldObj)

return

}

newSS, ok := newObj.(*v1beta1.PriorityClass)
if !ok {
glog.Errorf("Cannot convert newObj to *v1beta1.PriorityClass: %v", newObj)

return

}

sc.Mutex.Lock()
defer sc.Mutex.Unlock()

sc.deletePriorityClass(oldSS)
sc.addPriorityClass(newSS)
}

func (sc *SchedulerCache) AddPriorityClass(obj interface{}) {
var ss *v1beta1.PriorityClass
switch t := obj.(type) {
case *v1beta1.PriorityClass:
ss = t
case cache.DeletedFinalStateUnknown:
var ok bool
ss, ok = t.Obj.(*v1beta1.PriorityClass)
if !ok {
glog.Errorf("Cannot convert to *v1beta1.PriorityClass: %v", t.Obj)
return
}
default:
glog.Errorf("Cannot convert to *v1beta1.PriorityClass: %v", t)
return
}

sc.Mutex.Lock()
defer sc.Mutex.Unlock()

sc.addPriorityClass(ss)
}

func (sc *SchedulerCache) deletePriorityClass(pc *v1beta1.PriorityClass) {
if pc.GlobalDefault {
sc.defaultPriorityClass = nil
sc.defaultPriority = 0

}

delete(sc.PriorityClasses, pc.Name)
}

func (sc *SchedulerCache) addPriorityClass(pc *v1beta1.PriorityClass) {
if pc.GlobalDefault {
if sc.defaultPriorityClass != nil {
glog.Errorf("Updated default priority class from <%s> to <%s> forcefully.",
sc.defaultPriorityClass.Name, pc.Name)

}
sc.defaultPriorityClass = pc
sc.defaultPriority = pc.Value
}

sc.PriorityClasses[pc.Name] = pc
}

0 comments on commit 3c929a8

Please sign in to comment.