Skip to content

Commit

Permalink
Merge pull request #654 from hzxuzhonghu/pdb
Browse files Browse the repository at this point in the history
Remove pdb support
  • Loading branch information
volcano-sh-bot authored Dec 30, 2019
2 parents 10f61a6 + db1e557 commit 8a7680a
Show file tree
Hide file tree
Showing 6 changed files with 4 additions and 225 deletions.
4 changes: 1 addition & 3 deletions pkg/scheduler/api/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,5 @@ func MergeErrors(errs ...error) error {

// JobTerminated checks whether job was terminated.
func JobTerminated(job *JobInfo) bool {
return job.PodGroup == nil &&
job.PDB == nil &&
len(job.Tasks) == 0
return job.PodGroup == nil && len(job.Tasks) == 0
}
23 changes: 0 additions & 23 deletions pkg/scheduler/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ import (
"strings"

"k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"

"volcano.sh/volcano/pkg/apis/scheduling"
"volcano.sh/volcano/pkg/apis/scheduling/v1alpha2"
Expand Down Expand Up @@ -150,9 +148,6 @@ type JobInfo struct {

CreationTimestamp metav1.Time
PodGroup *PodGroup

// TODO(k82cn): keep backward compatibility, removed it when v1alpha1 finalized.
PDB *policyv1.PodDisruptionBudget
}

// NewJobInfo creates a new jobInfo for set of tasks
Expand Down Expand Up @@ -194,23 +189,6 @@ func (ji *JobInfo) SetPodGroup(pg *PodGroup) {
ji.PodGroup = pg
}

// SetPDB sets PDB to a job
func (ji *JobInfo) SetPDB(pdb *policyv1.PodDisruptionBudget) {
ji.Name = pdb.Name
defaultMinAvailable := intstr.IntOrString{Type: intstr.Int, IntVal: int32(0)}
minAvailable := pdb.Spec.MinAvailable
ji.MinAvailable = int32(intstr.ValueOrDefault(minAvailable, defaultMinAvailable).IntValue())
ji.Namespace = pdb.Namespace

ji.CreationTimestamp = pdb.GetCreationTimestamp()
ji.PDB = pdb
}

// UnsetPDB removes PDB info of a job
func (ji *JobInfo) UnsetPDB() {
ji.PDB = nil
}

func (ji *JobInfo) addTaskIndex(ti *TaskInfo) {
if _, found := ji.TaskStatusIndex[ti.Status]; !found {
ji.TaskStatusIndex[ti.Status] = tasksMap{}
Expand Down Expand Up @@ -292,7 +270,6 @@ func (ji *JobInfo) Clone() *JobInfo {

NodesFitErrors: make(map[TaskID]*FitErrors),

PDB: ji.PDB,
PodGroup: ji.PodGroup,

TaskStatusIndex: map[TaskStatus]tasksMap{},
Expand Down
23 changes: 0 additions & 23 deletions pkg/scheduler/api/job_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ import (
"testing"

v1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

func jobInfoEqual(l, r *JobInfo) bool {
Expand Down Expand Up @@ -200,24 +198,3 @@ func TestDeleteTaskInfo(t *testing.T) {
}
}
}

func TestJobInfo_SetPDB(t *testing.T) {

info := &JobInfo{}
// case1
//intOrString := &intstr.IntOrString{Type:intstr.Int, IntVal:1}
//spec := policyv1.PodDisruptionBudgetSpec{MinAvailable:intOrString}
// case2
//spec := policyv1.PodDisruptionBudgetSpec{}
// case3
//intOrString := &intstr.IntOrString{}
//spec := policyv1.PodDisruptionBudgetSpec{MinAvailable:intOrString}
// case4
intOrString := &intstr.IntOrString{Type: intstr.String, StrVal: "1"}
spec := policyv1.PodDisruptionBudgetSpec{MinAvailable: intOrString}

pdb := &policyv1.PodDisruptionBudget{
Spec: spec,
}
info.SetPDB(pdb)
}
47 changes: 0 additions & 47 deletions pkg/scheduler/api/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,44 +18,13 @@ package api

import (
"fmt"
"reflect"

v1 "k8s.io/api/core/v1"
"k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
)

func nodesEqual(l, r map[string]*NodeInfo) bool {
if len(l) != len(r) {
return false
}

for k, n := range l {
if !reflect.DeepEqual(n, r[k]) {
return false
}
}

return true
}

func podsEqual(l, r map[string]*TaskInfo) bool {
if len(l) != len(r) {
return false
}

for k, p := range l {
if !reflect.DeepEqual(p, r[k]) {
return false
}
}

return true
}

func buildNode(name string, alloc v1.ResourceList) *v1.Node {
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -93,22 +62,6 @@ func buildPod(ns, n, nn string, p v1.PodPhase, req v1.ResourceList, owner []meta
}
}

func buildPdb(n string, min int, selectorMap map[string]string) *v1beta1.PodDisruptionBudget {
selector := &metav1.LabelSelector{
MatchLabels: selectorMap,
}
minAvailable := intstr.FromInt(min)
return &v1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: n,
},
Spec: v1beta1.PodDisruptionBudgetSpec{
Selector: selector,
MinAvailable: &minAvailable,
},
}
}

func buildResourceList(cpu string, memory string) v1.ResourceList {
return v1.ResourceList{
v1.ResourceCPU: resource.MustParse(cpu),
Expand Down
18 changes: 3 additions & 15 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
infov1 "k8s.io/client-go/informers/core/v1"
policyv1 "k8s.io/client-go/informers/policy/v1beta1"
schedv1 "k8s.io/client-go/informers/scheduling/v1beta1"
storagev1 "k8s.io/client-go/informers/storage/v1"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -82,7 +81,6 @@ type SchedulerCache struct {

podInformer infov1.PodInformer
nodeInformer infov1.NodeInformer
pdbInformer policyv1.PodDisruptionBudgetInformer
nsInformer infov1.NamespaceInformer
podGroupInformerV1alpha1 vcinformerv1.PodGroupInformer
podGroupInformerV1alpha2 vcinformerv2.PodGroupInformer
Expand Down Expand Up @@ -369,13 +367,6 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s
},
})

sc.pdbInformer = informerFactory.Policy().V1beta1().PodDisruptionBudgets()
sc.pdbInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sc.AddPDB,
UpdateFunc: sc.UpdatePDB,
DeleteFunc: sc.DeletePDB,
})

sc.pcInformer = informerFactory.Scheduling().V1beta1().PriorityClasses()
sc.pcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sc.AddPriorityClass,
Expand Down Expand Up @@ -428,7 +419,6 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s

// Run starts the schedulerCache
func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
go sc.pdbInformer.Informer().Run(stopCh)
go sc.podInformer.Informer().Run(stopCh)
go sc.nodeInformer.Informer().Run(stopCh)
go sc.podGroupInformerV1alpha1.Informer().Run(stopCh)
Expand Down Expand Up @@ -457,7 +447,6 @@ func (sc *SchedulerCache) WaitForCacheSync(stopCh <-chan struct{}) bool {
return cache.WaitForCacheSync(stopCh,
func() []cache.InformerSynced {
informerSynced := []cache.InformerSynced{
sc.pdbInformer.Informer().HasSynced,
sc.podInformer.Informer().HasSynced,
sc.podGroupInformerV1alpha1.Informer().HasSynced,
sc.podGroupInformerV1alpha2.Informer().HasSynced,
Expand Down Expand Up @@ -736,6 +725,7 @@ func (sc *SchedulerCache) Snapshot() *schedulingapi.ClusterInfo {
var wg sync.WaitGroup

cloneJob := func(value *schedulingapi.JobInfo) {
defer wg.Done()
if value.PodGroup != nil {
value.Priority = sc.defaultPriority

Expand All @@ -753,7 +743,6 @@ func (sc *SchedulerCache) Snapshot() *schedulingapi.ClusterInfo {
cloneJobLock.Lock()
snapshot.Jobs[value.UID] = clonedJob
cloneJobLock.Unlock()
wg.Done()
}

for _, value := range sc.NamespaceCollection {
Expand All @@ -765,7 +754,7 @@ func (sc *SchedulerCache) Snapshot() *schedulingapi.ClusterInfo {

for _, value := range sc.Jobs {
// If no scheduling spec, does not handle it.
if value.PodGroup == nil && value.PDB == nil {
if value.PodGroup == nil {
klog.V(4).Infof("The scheduling spec of Job <%v:%s/%s> is nil, ignore it.",
value.UID, value.Namespace, value.Name)

Expand Down Expand Up @@ -840,10 +829,9 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *schedulingapi.JobInfo) {
(job.PodGroup.Status.Phase == scheduling.PodGroupUnknown ||
job.PodGroup.Status.Phase == scheduling.PodGroupPending ||
job.PodGroup.Status.Phase == scheduling.PodGroupInqueue)
pdbUnschedulabe := job.PDB != nil && len(job.TaskStatusIndex[schedulingapi.Pending]) != 0

// If pending or unschedulable, record unschedulable event.
if pgUnschedulable || pdbUnschedulabe {
if pgUnschedulable {
msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v",
len(job.TaskStatusIndex[schedulingapi.Pending]),
len(job.Tasks),
Expand Down
114 changes: 0 additions & 114 deletions pkg/scheduler/cache/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"

"k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1beta1"
"k8s.io/api/scheduling/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -579,119 +578,6 @@ func (sc *SchedulerCache) DeletePodGroupV1alpha2(obj interface{}) {
return
}

// Assumes that lock is already acquired.
func (sc *SchedulerCache) setPDB(pdb *policyv1.PodDisruptionBudget) error {
job := schedulingapi.JobID(utils.GetController(pdb))

if len(job) == 0 {
return fmt.Errorf("the controller of PodDisruptionBudget is empty")
}

if _, found := sc.Jobs[job]; !found {
sc.Jobs[job] = schedulingapi.NewJobInfo(job)
}

sc.Jobs[job].SetPDB(pdb)
// Set it to default queue, as PDB did not support queue right now.
sc.Jobs[job].Queue = schedulingapi.QueueID(sc.defaultQueue)

return nil
}

// Assumes that lock is already acquired.
func (sc *SchedulerCache) updatePDB(oldPDB, newPDB *policyv1.PodDisruptionBudget) error {
return sc.setPDB(newPDB)
}

// Assumes that lock is already acquired.
func (sc *SchedulerCache) deletePDB(pdb *policyv1.PodDisruptionBudget) error {
jobID := schedulingapi.JobID(utils.GetController(pdb))

job, found := sc.Jobs[jobID]
if !found {
return fmt.Errorf("can not found job %v:%v/%v", jobID, pdb.Namespace, pdb.Name)
}

// Unset SchedulingSpec
job.UnsetPDB()

sc.deleteJob(job)

return nil
}

// AddPDB add pdb to scheduler cache
func (sc *SchedulerCache) AddPDB(obj interface{}) {
pdb, ok := obj.(*policyv1.PodDisruptionBudget)
if !ok {
klog.Errorf("Cannot convert to *policyv1.PodDisruptionBudget: %v", obj)
return
}

sc.Mutex.Lock()
defer sc.Mutex.Unlock()

err := sc.setPDB(pdb)
if err != nil {
klog.Errorf("Failed to add PodDisruptionBudget %s into cache: %v", pdb.Name, err)
return
}
return
}

//UpdatePDB update pdb to scheduler cache
func (sc *SchedulerCache) UpdatePDB(oldObj, newObj interface{}) {
oldPDB, ok := oldObj.(*policyv1.PodDisruptionBudget)
if !ok {
klog.Errorf("Cannot convert oldObj to *policyv1.PodDisruptionBudget: %v", oldObj)
return
}
newPDB, ok := newObj.(*policyv1.PodDisruptionBudget)
if !ok {
klog.Errorf("Cannot convert newObj to *policyv1.PodDisruptionBudget: %v", newObj)
return
}

sc.Mutex.Lock()
defer sc.Mutex.Unlock()

err := sc.updatePDB(oldPDB, newPDB)
if err != nil {
klog.Errorf("Failed to update PodDisruptionBudget %s into cache: %v", oldPDB.Name, err)
return
}
return
}

//DeletePDB delete pdb from scheduler cache
func (sc *SchedulerCache) DeletePDB(obj interface{}) {
var pdb *policyv1.PodDisruptionBudget
switch t := obj.(type) {
case *policyv1.PodDisruptionBudget:
pdb = t
case cache.DeletedFinalStateUnknown:
var ok bool
pdb, ok = t.Obj.(*policyv1.PodDisruptionBudget)
if !ok {
klog.Errorf("Cannot convert to *policyv1.PodDisruptionBudget: %v", t.Obj)
return
}
default:
klog.Errorf("Cannot convert to *policyv1.PodDisruptionBudget: %v", t)
return
}

sc.Mutex.Lock()
defer sc.Mutex.Unlock()

err := sc.deletePDB(pdb)
if err != nil {
klog.Errorf("Failed to delete PodDisruptionBudget %s from cache: %v", pdb.Name, err)
return
}
return
}

// AddQueueV1alpha1 add queue to scheduler cache
func (sc *SchedulerCache) AddQueueV1alpha1(obj interface{}) {
ss, ok := obj.(*schedulingv1.Queue)
Expand Down

0 comments on commit 8a7680a

Please sign in to comment.