From f19d32f13e9fd0db9669ec38848cc9b4bdeebb0c Mon Sep 17 00:00:00 2001 From: "Da K. Ma" Date: Sun, 3 Mar 2019 14:27:59 +0800 Subject: [PATCH] Removed namespaceAsQueue. Signed-off-by: Da K. Ma --- cmd/kube-batch/app/options/options.go | 16 +- cmd/kube-batch/app/server.go | 7 +- cmd/kube-batch/main.go | 2 +- config/kube-batch-conf.yaml | 11 ++ config/queue/default.yaml | 6 + example/kube-batch-conf.yaml | 1 + hack/run-e2e.sh | 16 +- .../actions/allocate/allocate_test.go | 9 ++ pkg/scheduler/actions/preempt/preempt.go | 21 +-- pkg/scheduler/actions/reclaim/reclaim.go | 6 +- pkg/scheduler/api/job_info.go | 61 +------- pkg/scheduler/api/job_info_test.go | 19 --- pkg/scheduler/api/types.go | 4 - pkg/scheduler/api/util.go | 29 ---- pkg/scheduler/cache/cache.go | 71 ++++----- pkg/scheduler/cache/cache_test.go | 33 ++++- pkg/scheduler/cache/event_handlers.go | 140 ++++-------------- pkg/scheduler/cache/util.go | 60 ++++++++ .../plugins/conformance/conformance.go | 6 +- pkg/scheduler/plugins/factory.go | 2 + pkg/scheduler/scheduler.go | 4 +- test/e2e/queue.go | 3 + test/e2e/util.go | 80 ++-------- 23 files changed, 222 insertions(+), 385 deletions(-) create mode 100644 config/kube-batch-conf.yaml create mode 100644 config/queue/default.yaml delete mode 100644 pkg/scheduler/api/util.go create mode 100644 pkg/scheduler/cache/util.go diff --git a/cmd/kube-batch/app/options/options.go b/cmd/kube-batch/app/options/options.go index 7bcb4bfbe..f8b571439 100644 --- a/cmd/kube-batch/app/options/options.go +++ b/cmd/kube-batch/app/options/options.go @@ -30,24 +30,12 @@ type ServerOption struct { SchedulerName string SchedulerConf string SchedulePeriod string - NamespaceAsQueue bool EnableLeaderElection bool LockObjectNamespace string DefaultQueue string PrintVersion bool } -var ( - opts *ServerOption -) - -func Options() *ServerOption { - if opts == nil { - opts = &ServerOption{} - } - return opts -} - // NewServerOption creates a new CMServer with a default config. func NewServerOption() *ServerOption { s := ServerOption{} @@ -62,12 +50,10 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.SchedulerName, "scheduler-name", "kube-batch", "kube-batch will handle pods with the scheduler-name") fs.StringVar(&s.SchedulerConf, "scheduler-conf", "", "The absolute path of scheduler configuration file") fs.StringVar(&s.SchedulePeriod, "schedule-period", "1s", "The period between each scheduling cycle") - fs.StringVar(&s.DefaultQueue, "default-queue", "", "The name of the queue to fall-back to instead of namespace name") + fs.StringVar(&s.DefaultQueue, "default-queue", "default", "The default queue name of the job") fs.BoolVar(&s.EnableLeaderElection, "leader-elect", s.EnableLeaderElection, "Start a leader election client and gain leadership before "+ "executing the main loop. Enable this when running replicated kube-batch for high availability") - fs.BoolVar(&s.NamespaceAsQueue, "enable-namespace-as-queue", true, "Make Namespace as Queue with weight one, "+ - "but kube-batch will not handle Queue CRD anymore") fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit") fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", s.LockObjectNamespace, "Define the namespace of the lock object") } diff --git a/cmd/kube-batch/app/server.go b/cmd/kube-batch/app/server.go index 4df9309f2..c9a09be10 100644 --- a/cmd/kube-batch/app/server.go +++ b/cmd/kube-batch/app/server.go @@ -66,8 +66,11 @@ func Run(opt *options.ServerOption) error { } // Start policy controller to allocate resources. - sched, err := scheduler.NewScheduler(config, opt.SchedulerName, - opt.SchedulerConf, opt.SchedulePeriod, opt.NamespaceAsQueue) + sched, err := scheduler.NewScheduler(config, + opt.SchedulerName, + opt.SchedulerConf, + opt.SchedulePeriod, + opt.DefaultQueue) if err != nil { panic(err) } diff --git a/cmd/kube-batch/main.go b/cmd/kube-batch/main.go index 65739c512..8c1761dff 100644 --- a/cmd/kube-batch/main.go +++ b/cmd/kube-batch/main.go @@ -37,7 +37,7 @@ import ( var logFlushFreq = pflag.Duration("log-flush-frequency", 5*time.Second, "Maximum number of seconds between log flushes") func main() { - s := options.Options() + s := options.NewServerOption() s.AddFlags(pflag.CommandLine) flag.InitFlags() diff --git a/config/kube-batch-conf.yaml b/config/kube-batch-conf.yaml new file mode 100644 index 000000000..4512474b1 --- /dev/null +++ b/config/kube-batch-conf.yaml @@ -0,0 +1,11 @@ +actions: "reclaim, allocate, backfill, preempt" +tiers: +- plugins: + - name: priority + - name: gang + - name: conformance +- plugins: + - name: drf + - name: predicates + - name: proportion + - name: prioritize diff --git a/config/queue/default.yaml b/config/queue/default.yaml new file mode 100644 index 000000000..3aa233f5d --- /dev/null +++ b/config/queue/default.yaml @@ -0,0 +1,6 @@ +apiVersion: scheduling.incubator.k8s.io/v1alpha1 +kind: Queue +metadata: + name: default +spec: + weight: 1 diff --git a/example/kube-batch-conf.yaml b/example/kube-batch-conf.yaml index 274be3cb6..4512474b1 100644 --- a/example/kube-batch-conf.yaml +++ b/example/kube-batch-conf.yaml @@ -3,6 +3,7 @@ tiers: - plugins: - name: priority - name: gang + - name: conformance - plugins: - name: drf - name: predicates diff --git a/hack/run-e2e.sh b/hack/run-e2e.sh index 964afd44c..01c42dac7 100755 --- a/hack/run-e2e.sh +++ b/hack/run-e2e.sh @@ -3,20 +3,11 @@ export PATH="${HOME}/.kubeadm-dind-cluster:${PATH}" export KA_BIN=_output/bin export LOG_LEVEL=3 -export NUM_NODES=4 +export NUM_NODES=3 dind_url=https://cdn.rawgit.com/kubernetes-sigs/kubeadm-dind-cluster/master/fixed/dind-cluster-v1.13.sh dind_dest=./hack/dind-cluster-v1.13.sh -if [ $(echo $RANDOM%2 | bc) -eq 1 ] -then - enable_namespace_as_queue=true -else - enable_namespace_as_queue=false -fi - -export ENABLE_NAMESPACES_AS_QUEUE=$enable_namespace_as_queue - # start k8s dind cluster curl ${dind_url} --output ${dind_dest} chmod +x ${dind_dest} @@ -24,9 +15,10 @@ ${dind_dest} up kubectl create -f config/crds/scheduling_v1alpha1_podgroup.yaml kubectl create -f config/crds/scheduling_v1alpha1_queue.yaml +kubectl create -f config/queue/default.yaml # start kube-batch -nohup ${KA_BIN}/kube-batch --kubeconfig ${HOME}/.kube/config --scheduler-conf=example/kube-batch-conf.yaml --enable-namespace-as-queue=${ENABLE_NAMESPACES_AS_QUEUE} --logtostderr --v ${LOG_LEVEL} > scheduler.log 2>&1 & +nohup ${KA_BIN}/kube-batch --kubeconfig ${HOME}/.kube/config --scheduler-conf=config/kube-batch-conf.yaml --logtostderr --v ${LOG_LEVEL} > scheduler.log 2>&1 & # clean up function cleanup { @@ -43,4 +35,4 @@ function cleanup { trap cleanup EXIT # Run e2e test -go test ./test/e2e -v +go test ./test/e2e -v -timeout 30m diff --git a/pkg/scheduler/actions/allocate/allocate_test.go b/pkg/scheduler/actions/allocate/allocate_test.go index 36562cf6e..1ff6fd829 100644 --- a/pkg/scheduler/actions/allocate/allocate_test.go +++ b/pkg/scheduler/actions/allocate/allocate_test.go @@ -158,6 +158,9 @@ func TestAllocate(t *testing.T) { Name: "pg1", Namespace: "c1", }, + Spec: kbv1.PodGroupSpec{ + Queue: "c1", + }, }, }, pods: []*v1.Pod{ @@ -190,12 +193,18 @@ func TestAllocate(t *testing.T) { Name: "pg1", Namespace: "c1", }, + Spec: kbv1.PodGroupSpec{ + Queue: "c1", + }, }, { ObjectMeta: metav1.ObjectMeta{ Name: "pg2", Namespace: "c2", }, + Spec: kbv1.PodGroupSpec{ + Queue: "c2", + }, }, }, diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index ae86aba68..267f024bf 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -174,8 +174,6 @@ func preempt( nodes map[string]*api.NodeInfo, filter func(*api.TaskInfo) bool, ) (bool, error) { - resreq := preemptor.Resreq.Clone() - preempted := api.EmptyResource() predicateNodes := []*api.NodeInfo{} nodeScores := map[int][]*api.NodeInfo{} assigned := false @@ -203,6 +201,9 @@ func preempt( preemptor.Namespace, preemptor.Name, node.Name) var preemptees []*api.TaskInfo + preempted := api.EmptyResource() + resreq := preemptor.Resreq.Clone() + for _, task := range node.Tasks { if filter == nil { preemptees = append(preemptees, task.Clone()) @@ -237,15 +238,17 @@ func preempt( glog.V(3).Infof("Preempted <%v> for task <%s/%s> requested <%v>.", preempted, preemptor.Namespace, preemptor.Name, preemptor.Resreq) - if err := stmt.Pipeline(preemptor, node.Name); err != nil { - glog.Errorf("Failed to pipline Task <%s/%s> on Node <%s>", - preemptor.Namespace, preemptor.Name, node.Name) - } + if preemptor.Resreq.LessEqual(preempted) { + if err := stmt.Pipeline(preemptor, node.Name); err != nil { + glog.Errorf("Failed to pipline Task <%s/%s> on Node <%s>", + preemptor.Namespace, preemptor.Name, node.Name) + } - // Ignore pipeline error, will be corrected in next scheduling loop. - assigned = true + // Ignore pipeline error, will be corrected in next scheduling loop. + assigned = true - break + break + } } return assigned, nil diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index f64db2382..e113a65a9 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -109,9 +109,6 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) { task = tasks.Pop().(*api.TaskInfo) } - resreq := task.Resreq.Clone() - reclaimed := api.EmptyResource() - assigned := false for _, n := range ssn.Nodes { @@ -120,6 +117,9 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) { continue } + resreq := task.Resreq.Clone() + reclaimed := api.EmptyResource() + glog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.", task.Namespace, task.Name, n.Name) diff --git a/pkg/scheduler/api/job_info.go b/pkg/scheduler/api/job_info.go index 70dcc9eec..4e9afd801 100644 --- a/pkg/scheduler/api/job_info.go +++ b/pkg/scheduler/api/job_info.go @@ -26,9 +26,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/options" "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" - "github.com/kubernetes-sigs/kube-batch/pkg/apis/utils" ) type TaskID types.UID @@ -40,8 +38,6 @@ type TaskInfo struct { Name string Namespace string - PodGroup *v1alpha1.PodGroup - Resreq *Resource NodeName string @@ -52,34 +48,16 @@ type TaskInfo struct { Pod *v1.Pod } -func getOwners(pod *v1.Pod) (JobID, *v1alpha1.PodGroup) { +func getJobID(pod *v1.Pod) JobID { if len(pod.Annotations) != 0 { if gn, found := pod.Annotations[v1alpha1.GroupNameAnnotationKey]; found && len(gn) != 0 { // Make sure Pod and PodGroup belong to the same namespace. jobID := fmt.Sprintf("%s/%s", pod.Namespace, gn) - return JobID(jobID), nil + return JobID(jobID) } } - jobID := JobID(utils.GetController(pod)) - if len(jobID) == 0 { - jobID = JobID(pod.UID) - } - - pg := &v1alpha1.PodGroup{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: pod.Namespace, - Name: string(jobID), - Annotations: map[string]string{ - ShadowPodGroupKey: string(jobID), - }, - }, - Spec: v1alpha1.PodGroupSpec{ - MinMember: 1, - }, - } - - return jobID, pg + return "" } func NewTaskInfo(pod *v1.Pod) *TaskInfo { @@ -90,12 +68,11 @@ func NewTaskInfo(pod *v1.Pod) *TaskInfo { req.Add(NewResource(c.Resources.Requests)) } - jobID, pg := getOwners(pod) + jobID := getJobID(pod) ti := &TaskInfo{ UID: TaskID(pod.UID), Job: jobID, - PodGroup: pg, Name: pod.Name, Namespace: pod.Namespace, NodeName: pod.Spec.NodeName, @@ -118,7 +95,6 @@ func (ti *TaskInfo) Clone() *TaskInfo { Job: ti.Job, Name: ti.Name, Namespace: ti.Namespace, - PodGroup: ti.PodGroup, NodeName: ti.NodeName, Status: ti.Status, Priority: ti.Priority, @@ -129,8 +105,8 @@ func (ti *TaskInfo) Clone() *TaskInfo { } func (ti TaskInfo) String() string { - return fmt.Sprintf("Task (%v:%v/%v): job %v, status %v, type %v, pri %v, resreq %v", - ti.UID, ti.Namespace, ti.Name, ti.Job, ti.Status, ti.PodGroup, ti.Priority, ti.Resreq) + return fmt.Sprintf("Task (%v:%v/%v): job %v, status %v, pri %v, resreq %v", + ti.UID, ti.Namespace, ti.Name, ti.Job, ti.Status, ti.Priority, ti.Resreq) } // JobID is the type of JobInfo's ID. @@ -198,21 +174,9 @@ func (ji *JobInfo) SetPodGroup(pg *v1alpha1.PodGroup) { ji.Name = pg.Name ji.Namespace = pg.Namespace ji.MinAvailable = pg.Spec.MinMember - - //set queue name based on the available information - //in the following priority order: - // 1. queue name from PodGroup spec (if available) - // 2. queue name from default-queue command line option (if specified) - // 3. namespace name - if len(pg.Spec.Queue) > 0 { - ji.Queue = QueueID(pg.Spec.Queue) - } else if len(options.Options().DefaultQueue) > 0 { - ji.Queue = QueueID(options.Options().DefaultQueue) - } else { - ji.Queue = QueueID(pg.Namespace) - } - + ji.Queue = QueueID(pg.Spec.Queue) ji.CreationTimestamp = pg.GetCreationTimestamp() + ji.PodGroup = pg } @@ -220,11 +184,6 @@ func (ji *JobInfo) SetPDB(pdb *policyv1.PodDisruptionBudget) { ji.Name = pdb.Name ji.MinAvailable = pdb.Spec.MinAvailable.IntVal ji.Namespace = pdb.Namespace - if len(options.Options().DefaultQueue) == 0 { - ji.Queue = QueueID(pdb.Namespace) - } else { - ji.Queue = QueueID(options.Options().DefaultQueue) - } ji.CreationTimestamp = pdb.GetCreationTimestamp() ji.PDB = pdb @@ -265,10 +224,6 @@ func (ji *JobInfo) AddTaskInfo(ti *TaskInfo) { if AllocatedStatus(ti.Status) { ji.Allocated.Add(ti.Resreq) } - - if ji.PodGroup == nil && ti.PodGroup != nil { - ji.SetPodGroup(ti.PodGroup) - } } func (ji *JobInfo) UpdateTaskStatus(task *TaskInfo, status TaskStatus) error { diff --git a/pkg/scheduler/api/job_info_test.go b/pkg/scheduler/api/job_info_test.go index bfbd9f022..e61688046 100644 --- a/pkg/scheduler/api/job_info_test.go +++ b/pkg/scheduler/api/job_info_test.go @@ -47,8 +47,6 @@ func TestAddTaskInfo(t *testing.T) { case01_pod4 := buildPod(case01_ns, "p4", "n1", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case01_owner}, make(map[string]string)) case01_task4 := NewTaskInfo(case01_pod4) - _, pg := getOwners(case01_pod1) - tests := []struct { name string uid JobID @@ -61,11 +59,6 @@ func TestAddTaskInfo(t *testing.T) { pods: []*v1.Pod{case01_pod1, case01_pod2, case01_pod3, case01_pod4}, expected: &JobInfo{ UID: case01_uid, - Namespace: case01_ns, - Queue: QueueID(case01_ns), - Name: string(case01_uid), - MinAvailable: 1, - PodGroup: pg, Allocated: buildResource("4000m", "4G"), TotalRequest: buildResource("5000m", "5G"), Tasks: tasksMap{ @@ -117,7 +110,6 @@ func TestDeleteTaskInfo(t *testing.T) { case01_pod2 := buildPod(case01_ns, "p2", "n1", v1.PodRunning, buildResourceList("2000m", "2G"), []metav1.OwnerReference{case01_owner}, make(map[string]string)) case01_pod3 := buildPod(case01_ns, "p3", "n1", v1.PodRunning, buildResourceList("3000m", "3G"), []metav1.OwnerReference{case01_owner}, make(map[string]string)) case01_task3 := NewTaskInfo(case01_pod3) - _, case01_job := getOwners(case01_pod1) // case2 case02_uid := JobID("owner2") @@ -128,7 +120,6 @@ func TestDeleteTaskInfo(t *testing.T) { case02_pod2 := buildPod(case02_ns, "p2", "n1", v1.PodPending, buildResourceList("2000m", "2G"), []metav1.OwnerReference{case02_owner}, make(map[string]string)) case02_pod3 := buildPod(case02_ns, "p3", "n1", v1.PodRunning, buildResourceList("3000m", "3G"), []metav1.OwnerReference{case02_owner}, make(map[string]string)) case02_task3 := NewTaskInfo(case02_pod3) - _, case02_job := getOwners(case02_pod1) tests := []struct { name string @@ -144,11 +135,6 @@ func TestDeleteTaskInfo(t *testing.T) { rmPods: []*v1.Pod{case01_pod2}, expected: &JobInfo{ UID: case01_uid, - Namespace: case01_ns, - Name: string(case01_uid), - Queue: QueueID(case01_ns), - MinAvailable: 1, - PodGroup: case01_job, Allocated: buildResource("3000m", "3G"), TotalRequest: buildResource("4000m", "4G"), Tasks: tasksMap{ @@ -170,11 +156,6 @@ func TestDeleteTaskInfo(t *testing.T) { rmPods: []*v1.Pod{case02_pod2}, expected: &JobInfo{ UID: case02_uid, - Namespace: case02_ns, - Name: string(case02_uid), - Queue: QueueID(case02_ns), - MinAvailable: 1, - PodGroup: case02_job, Allocated: buildResource("3000m", "3G"), TotalRequest: buildResource("4000m", "4G"), Tasks: tasksMap{ diff --git a/pkg/scheduler/api/types.go b/pkg/scheduler/api/types.go index a82a89a86..a20992b0e 100644 --- a/pkg/scheduler/api/types.go +++ b/pkg/scheduler/api/types.go @@ -53,10 +53,6 @@ const ( Unknown ) -const ( - ShadowPodGroupKey = "kube-batch/shadow-pod-group" -) - func (ts TaskStatus) String() string { switch ts { case Pending: diff --git a/pkg/scheduler/api/util.go b/pkg/scheduler/api/util.go deleted file mode 100644 index 2acf13546..000000000 --- a/pkg/scheduler/api/util.go +++ /dev/null @@ -1,29 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package api - -import "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" - -func ShadowPodGroup(pg *v1alpha1.PodGroup) bool { - if pg == nil { - return true - } - - _, found := pg.Annotations[ShadowPodGroupKey] - - return found -} diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 829b63c92..dea066285 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -61,8 +61,8 @@ func init() { } // New returns a Cache implementation. -func New(config *rest.Config, schedulerName string, nsAsQueue bool) Cache { - return newSchedulerCache(config, schedulerName, nsAsQueue) +func New(config *rest.Config, schedulerName string, defaultQueue string) Cache { + return newSchedulerCache(config, schedulerName, defaultQueue) } type SchedulerCache struct { @@ -71,6 +71,8 @@ type SchedulerCache struct { kubeclient *kubernetes.Clientset kbclient *kbver.Clientset + defaultQueue string + podInformer infov1.PodInformer nodeInformer infov1.NodeInformer pdbInformer policyv1.PodDisruptionBudgetInformer @@ -94,8 +96,6 @@ type SchedulerCache struct { errTasks workqueue.RateLimitingInterface deletedJobs workqueue.RateLimitingInterface - - namespaceAsQueue bool } type defaultBinder struct { @@ -177,16 +177,16 @@ func (dvb *defaultVolumeBinder) BindVolumes(task *api.TaskInfo) error { return dvb.volumeBinder.Binder.BindPodVolumes(task.Pod) } -func newSchedulerCache(config *rest.Config, schedulerName string, nsAsQueue bool) *SchedulerCache { +func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue string) *SchedulerCache { sc := &SchedulerCache{ - Jobs: make(map[kbapi.JobID]*kbapi.JobInfo), - Nodes: make(map[string]*kbapi.NodeInfo), - Queues: make(map[kbapi.QueueID]*kbapi.QueueInfo), - errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - deletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - kubeclient: kubernetes.NewForConfigOrDie(config), - kbclient: kbver.NewForConfigOrDie(config), - namespaceAsQueue: nsAsQueue, + Jobs: make(map[kbapi.JobID]*kbapi.JobInfo), + Nodes: make(map[string]*kbapi.NodeInfo), + Queues: make(map[kbapi.QueueID]*kbapi.QueueInfo), + errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + deletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + kubeclient: kubernetes.NewForConfigOrDie(config), + kbclient: kbver.NewForConfigOrDie(config), + defaultQueue: defaultQueue, } // Prepare event clients. @@ -272,23 +272,13 @@ func newSchedulerCache(config *rest.Config, schedulerName string, nsAsQueue bool DeleteFunc: sc.DeletePodGroup, }) - if sc.namespaceAsQueue { - // create informer for Namespace information - sc.nsInformer = informerFactory.Core().V1().Namespaces() - sc.nsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: sc.AddNamespace, - UpdateFunc: sc.UpdateNamespace, - DeleteFunc: sc.DeleteNamespace, - }) - } else { - // create informer for Queue information - sc.queueInformer = kbinformer.Scheduling().V1alpha1().Queues() - sc.queueInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: sc.AddQueue, - UpdateFunc: sc.UpdateQueue, - DeleteFunc: sc.DeleteQueue, - }) - } + // create informer for Queue information + sc.queueInformer = kbinformer.Scheduling().V1alpha1().Queues() + sc.queueInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: sc.AddQueue, + UpdateFunc: sc.UpdateQueue, + DeleteFunc: sc.DeleteQueue, + }) return sc } @@ -301,12 +291,7 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) { go sc.pvInformer.Informer().Run(stopCh) go sc.pvcInformer.Informer().Run(stopCh) go sc.scInformer.Informer().Run(stopCh) - - if sc.namespaceAsQueue { - go sc.nsInformer.Informer().Run(stopCh) - } else { - go sc.queueInformer.Informer().Run(stopCh) - } + go sc.queueInformer.Informer().Run(stopCh) // Re-sync error tasks. go wait.Until(sc.processResyncTask, 0, stopCh) @@ -316,12 +301,6 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) { } func (sc *SchedulerCache) WaitForCacheSync(stopCh <-chan struct{}) bool { - var queueSync func() bool - if sc.namespaceAsQueue { - queueSync = sc.nsInformer.Informer().HasSynced - } else { - queueSync = sc.queueInformer.Informer().HasSynced - } return cache.WaitForCacheSync(stopCh, sc.pdbInformer.Informer().HasSynced, @@ -331,7 +310,7 @@ func (sc *SchedulerCache) WaitForCacheSync(stopCh <-chan struct{}) bool { sc.pvInformer.Informer().HasSynced, sc.pvcInformer.Informer().HasSynced, sc.scInformer.Informer().HasSynced, - queueSync, + sc.queueInformer.Informer().HasSynced, ) } @@ -386,7 +365,7 @@ func (sc *SchedulerCache) Evict(taskInfo *kbapi.TaskInfo, reason string) error { } }() - if !kbapi.ShadowPodGroup(job.PodGroup) { + if !shadowPodGroup(job.PodGroup) { sc.Recorder.Eventf(job.PodGroup, v1.EventTypeNormal, "Evict", reason) } @@ -591,7 +570,7 @@ func (sc *SchedulerCache) String() string { func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) { jobErrMsg := job.FitError() - if !kbapi.ShadowPodGroup(job.PodGroup) { + if !shadowPodGroup(job.PodGroup) { pgUnschedulable := job.PodGroup != nil && (job.PodGroup.Status.Phase == v1alpha1.PodGroupUnknown || job.PodGroup.Status.Phase == v1alpha1.PodGroupPending) @@ -619,7 +598,7 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) { // UpdateJobStatus update the status of job and its tasks. func (sc *SchedulerCache) UpdateJobStatus(job *kbapi.JobInfo) (*kbapi.JobInfo, error) { - if !kbapi.ShadowPodGroup(job.PodGroup) { + if !shadowPodGroup(job.PodGroup) { pg, err := sc.StatusUpdater.UpdatePodGroup(job.PodGroup) if err != nil { return nil, err diff --git a/pkg/scheduler/cache/cache_test.go b/pkg/scheduler/cache/cache_test.go index c9be52353..10f53aa1f 100644 --- a/pkg/scheduler/cache/cache_test.go +++ b/pkg/scheduler/cache/cache_test.go @@ -133,11 +133,15 @@ func TestAddPod(t *testing.T) { pod1 := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{owner}, make(map[string]string)) pi1 := api.NewTaskInfo(pod1) + pi1.Job = "j1" // The job name is set by cache. pod2 := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("1000m", "1G"), []metav1.OwnerReference{owner}, make(map[string]string)) pi2 := api.NewTaskInfo(pod2) + pi2.Job = "j1" // The job name is set by cache. j1 := api.NewJobInfo(api.JobID("j1"), pi1, pi2) + pg := createShadowPodGroup(pod1) + j1.SetPodGroup(pg) node1 := buildNode("n1", buildResourceList("2000m", "10G")) ni1 := api.NewNodeInfo(node1) @@ -184,21 +188,34 @@ func TestAddPod(t *testing.T) { } func TestAddNode(t *testing.T) { + owner1 := buildOwnerReference("j1") + owner2 := buildOwnerReference("j2") + // case 1 node1 := buildNode("n1", buildResourceList("2000m", "10G")) pod1 := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1000m", "1G"), - []metav1.OwnerReference{}, make(map[string]string)) + []metav1.OwnerReference{owner1}, make(map[string]string)) + pi1 := api.NewTaskInfo(pod1) + pi1.Job = "j1" // The job name is set by cache. + pod2 := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("1000m", "1G"), - []metav1.OwnerReference{}, make(map[string]string)) + []metav1.OwnerReference{owner2}, make(map[string]string)) pi2 := api.NewTaskInfo(pod2) + pi2.Job = "j2" // The job name is set by cache. ni1 := api.NewNodeInfo(node1) ni1.AddTask(pi2) - j1 := api.NewJobInfo("c1-p1") - j1.AddTaskInfo(api.NewTaskInfo(pod1)) - j2 := api.NewJobInfo("c1-p2") - j2.AddTaskInfo(api.NewTaskInfo(pod2)) + j1 := api.NewJobInfo("j1") + pg1 := createShadowPodGroup(pod1) + j1.SetPodGroup(pg1) + + j2 := api.NewJobInfo("j2") + pg2 := createShadowPodGroup(pod2) + j2.SetPodGroup(pg2) + + j1.AddTaskInfo(pi1) + j2.AddTaskInfo(pi2) tests := []struct { pods []*v1.Pod @@ -213,8 +230,8 @@ func TestAddNode(t *testing.T) { "n1": ni1, }, Jobs: map[api.JobID]*api.JobInfo{ - "c1-p1": j1, - "c1-p2": j2, + "j1": j1, + "j2": j2, }, }, }, diff --git a/pkg/scheduler/cache/event_handlers.go b/pkg/scheduler/cache/event_handlers.go index c4a45889e..39c77a72c 100644 --- a/pkg/scheduler/cache/event_handlers.go +++ b/pkg/scheduler/cache/event_handlers.go @@ -37,15 +37,32 @@ func isTerminated(status kbapi.TaskStatus) bool { return status == kbapi.Succeeded || status == kbapi.Failed } -func (sc *SchedulerCache) addTask(pi *kbapi.TaskInfo) error { - if len(pi.Job) != 0 { +func (sc *SchedulerCache) getOrCreateJob(pi *kbapi.TaskInfo) *kbapi.JobInfo { + if len(pi.Job) == 0 { + pb := createShadowPodGroup(pi.Pod) + pi.Job = kbapi.JobID(pb.Name) + if _, found := sc.Jobs[pi.Job]; !found { - sc.Jobs[pi.Job] = kbapi.NewJobInfo(pi.Job, pi) - } else { - sc.Jobs[pi.Job].AddTaskInfo(pi) + job := kbapi.NewJobInfo(pi.Job) + job.SetPodGroup(pb) + // Set default queue for shadow podgroup. + job.Queue = kbapi.QueueID(sc.defaultQueue) + + sc.Jobs[pi.Job] = job + } + } else { + if _, found := sc.Jobs[pi.Job]; !found { + sc.Jobs[pi.Job] = kbapi.NewJobInfo(pi.Job) } } + return sc.Jobs[pi.Job] +} + +func (sc *SchedulerCache) addTask(pi *kbapi.TaskInfo) error { + job := sc.getOrCreateJob(pi) + job.AddTaskInfo(pi) + if len(pi.NodeName) != 0 { if _, found := sc.Nodes[pi.NodeName]; !found { sc.Nodes[pi.NodeName] = kbapi.NewNodeInfo(nil) @@ -354,6 +371,11 @@ func (sc *SchedulerCache) setPodGroup(ss *kbv1.PodGroup) error { sc.Jobs[job].SetPodGroup(ss) + // TODO(k82cn): set default queue in admission. + if len(ss.Spec.Queue) == 0 { + sc.Jobs[job].Queue = kbapi.QueueID(sc.defaultQueue) + } + return nil } @@ -389,11 +411,6 @@ func (sc *SchedulerCache) AddPodGroup(obj interface{}) { sc.Mutex.Lock() defer sc.Mutex.Unlock() - // If namespace as queue, the `.spec.Queue` of PodGroup is ignored. - if sc.namespaceAsQueue { - ss.Spec.Queue = "" - } - glog.V(4).Infof("Add PodGroup(%s) into cache, spec(%#v)", ss.Name, ss.Spec) err := sc.setPodGroup(ss) if err != nil { @@ -467,6 +484,8 @@ func (sc *SchedulerCache) setPDB(pdb *policyv1.PodDisruptionBudget) error { } sc.Jobs[job].SetPDB(pdb) + // Set it to default queue, as PDB did not support queue right now. + sc.Jobs[job].Queue = kbapi.QueueID(sc.defaultQueue) return nil } @@ -652,104 +671,3 @@ func (sc *SchedulerCache) deleteQueue(queue *kbv1.Queue) error { return nil } - -func (sc *SchedulerCache) AddNamespace(obj interface{}) { - ss, ok := obj.(*v1.Namespace) - if !ok { - glog.Errorf("Cannot convert to *v1.Namespace: %v", obj) - return - } - - sc.Mutex.Lock() - defer sc.Mutex.Unlock() - - glog.V(4).Infof("Add Queue(%s) into cache, spec(%#v)", ss.Name, ss.Spec) - err := sc.addNamespace(ss) - if err != nil { - glog.Errorf("Failed to add Queue %s into cache: %v", ss.Name, err) - return - } - return -} - -func (sc *SchedulerCache) UpdateNamespace(oldObj, newObj interface{}) { - oldSS, ok := oldObj.(*v1.Namespace) - if !ok { - glog.Errorf("Cannot convert oldObj to *v1.Namespace: %v", oldObj) - return - } - newSS, ok := newObj.(*v1.Namespace) - if !ok { - glog.Errorf("Cannot convert newObj to *v1.Namespace: %v", newObj) - return - } - - sc.Mutex.Lock() - defer sc.Mutex.Unlock() - - err := sc.updateNamespace(oldSS, newSS) - if err != nil { - glog.Errorf("Failed to update Queue (NS) %s into cache: %v", oldSS.Name, err) - return - } - return -} - -func (sc *SchedulerCache) DeleteNamespace(obj interface{}) { - var ss *v1.Namespace - switch t := obj.(type) { - case *v1.Namespace: - ss = t - case cache.DeletedFinalStateUnknown: - var ok bool - ss, ok = t.Obj.(*v1.Namespace) - if !ok { - glog.Errorf("Cannot convert to *v1.Namespace: %v", t.Obj) - return - } - default: - glog.Errorf("Cannot convert to *v1.Namespace: %v", t) - return - } - - sc.Mutex.Lock() - defer sc.Mutex.Unlock() - - err := sc.deleteNamespace(ss) - if err != nil { - glog.Errorf("Failed to delete Queue (NS) %s from cache: %v", ss.Name, err) - return - } - return -} - -func (sc *SchedulerCache) addNamespace(ns *v1.Namespace) error { - qi := &kbapi.QueueInfo{ - UID: kbapi.QueueID(ns.Name), - Name: ns.Name, - - Weight: 1, - } - sc.Queues[qi.UID] = qi - - return nil -} - -func (sc *SchedulerCache) updateNamespace(oldObj, newObj *v1.Namespace) error { - sc.deleteNamespace(oldObj) - sc.addNamespace(newObj) - - return nil -} - -func (sc *SchedulerCache) deleteNamespace(ns *v1.Namespace) error { - qi := &kbapi.QueueInfo{ - UID: kbapi.QueueID(ns.Name), - Name: ns.Name, - - Weight: 1, - } - delete(sc.Queues, qi.UID) - - return nil -} diff --git a/pkg/scheduler/cache/util.go b/pkg/scheduler/cache/util.go new file mode 100644 index 000000000..411b47552 --- /dev/null +++ b/pkg/scheduler/cache/util.go @@ -0,0 +1,60 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" + "github.com/kubernetes-sigs/kube-batch/pkg/apis/utils" + "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" +) + +const ( + shadowPodGroupKey = "kube-batch/shadow-pod-group" +) + +func shadowPodGroup(pg *v1alpha1.PodGroup) bool { + if pg == nil { + return true + } + + _, found := pg.Annotations[shadowPodGroupKey] + + return found +} + +func createShadowPodGroup(pod *v1.Pod) *v1alpha1.PodGroup { + jobID := api.JobID(utils.GetController(pod)) + if len(jobID) == 0 { + jobID = api.JobID(pod.UID) + } + + return &v1alpha1.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: pod.Namespace, + Name: string(jobID), + Annotations: map[string]string{ + shadowPodGroupKey: string(jobID), + }, + }, + Spec: v1alpha1.PodGroupSpec{ + MinMember: 1, + }, + } +} diff --git a/pkg/scheduler/plugins/conformance/conformance.go b/pkg/scheduler/plugins/conformance/conformance.go index 5178e9157..779c3f79a 100644 --- a/pkg/scheduler/plugins/conformance/conformance.go +++ b/pkg/scheduler/plugins/conformance/conformance.go @@ -17,10 +17,11 @@ limitations under the License. package conformance import ( - "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" - "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework" "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/apis/scheduling" + + "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" + "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework" ) type conformancePlugin struct { @@ -44,6 +45,7 @@ func (pp *conformancePlugin) OnSessionOpen(ssn *framework.Session) { if className == scheduling.SystemClusterCritical || className == scheduling.SystemNodeCritical || evictee.Namespace == v1.NamespaceSystem { + continue } diff --git a/pkg/scheduler/plugins/factory.go b/pkg/scheduler/plugins/factory.go index f747a0baa..e7c17c76c 100644 --- a/pkg/scheduler/plugins/factory.go +++ b/pkg/scheduler/plugins/factory.go @@ -19,6 +19,7 @@ package plugins import ( "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework" + "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/conformance" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/drf" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/gang" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/predicates" @@ -34,6 +35,7 @@ func init() { framework.RegisterPluginBuilder("predicates", predicates.New) framework.RegisterPluginBuilder("priority", priority.New) framework.RegisterPluginBuilder("prioritize", prioritize.New) + framework.RegisterPluginBuilder("conformance", conformance.New) // Plugins for Queues framework.RegisterPluginBuilder("proportion", proportion.New) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index fb5505631..d822df5d3 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -43,13 +43,13 @@ func NewScheduler( schedulerName string, conf string, period string, - nsAsQueue bool, + defaultQueue string, ) (*Scheduler, error) { sp, _ := time.ParseDuration(period) scheduler := &Scheduler{ config: config, schedulerConf: conf, - cache: schedcache.New(config, schedulerName, nsAsQueue), + cache: schedcache.New(config, schedulerName, defaultQueue), schedulePeriod: sp, } diff --git a/test/e2e/queue.go b/test/e2e/queue.go index 4fa78bde8..06e511c63 100644 --- a/test/e2e/queue.go +++ b/test/e2e/queue.go @@ -27,6 +27,9 @@ var _ = Describe("Queue E2E Test", func() { context := initTestContext() defer cleanupTestContext(context) + createQueues(context) + defer deleteQueues(context) + slot := oneCPU rep := clusterSize(context, slot) diff --git a/test/e2e/util.go b/test/e2e/util.go index 3247ba1e4..1644e296f 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -22,7 +22,6 @@ import ( "fmt" "os" "path/filepath" - "strconv" "text/template" "time" @@ -69,13 +68,11 @@ type context struct { kubeclient *kubernetes.Clientset kbclient *kbver.Clientset - namespace string - queues []string - enableNamespaceAsQueue bool + namespace string + queues []string } func initTestContext() *context { - enableNamespaceAsQueue, _ := strconv.ParseBool(os.Getenv("ENABLE_NAMESPACES_AS_QUEUE")) cxt := &context{ namespace: "test", queues: []string{"q1", "q2"}, @@ -90,8 +87,6 @@ func initTestContext() *context { cxt.kbclient = kbver.NewForConfigOrDie(config) cxt.kubeclient = kubernetes.NewForConfigOrDie(config) - cxt.enableNamespaceAsQueue = enableNamespaceAsQueue - _, err = cxt.kubeclient.CoreV1().Namespaces().Create(&v1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: cxt.namespace, @@ -99,8 +94,6 @@ func initTestContext() *context { }) checkError(cxt, err) - createQueues(cxt) - _, err = cxt.kubeclient.SchedulingV1beta1().PriorityClasses().Create(&schedv1.PriorityClass{ ObjectMeta: metav1.ObjectMeta{ Name: masterPriority, @@ -135,13 +128,7 @@ func namespaceNotExist(ctx *context) wait.ConditionFunc { func queueNotExist(ctx *context) wait.ConditionFunc { return func() (bool, error) { for _, q := range ctx.queues { - var err error - if ctx.enableNamespaceAsQueue { - _, err = ctx.kubeclient.CoreV1().Namespaces().Get(q, metav1.GetOptions{}) - } else { - _, err = ctx.kbclient.Scheduling().Queues().Get(q, metav1.GetOptions{}) - } - + _, err := ctx.kbclient.SchedulingV1alpha1().Queues().Get(q, metav1.GetOptions{}) if !(err != nil && errors.IsNotFound(err)) { return false, err } @@ -159,8 +146,6 @@ func cleanupTestContext(cxt *context) { }) checkError(cxt, err) - deleteQueues(cxt) - err = cxt.kubeclient.SchedulingV1beta1().PriorityClasses().Delete(masterPriority, &metav1.DeleteOptions{ PropagationPolicy: &foreground, }) @@ -174,10 +159,6 @@ func cleanupTestContext(cxt *context) { // Wait for namespace deleted. err = wait.Poll(100*time.Millisecond, oneMinute, namespaceNotExist(cxt)) checkError(cxt, err) - - // Wait for queues deleted - err = wait.Poll(100*time.Millisecond, oneMinute, queueNotExist(cxt)) - checkError(cxt, err) } func clusterInfo(cxt *context) string { @@ -232,7 +213,7 @@ Cluster Info: func checkError(cxt *context, err error) { if err != nil { - Expect(err).NotTo(HaveOccurred(), clusterInfo(cxt)) + ExpectWithOffset(1, err).NotTo(HaveOccurred(), clusterInfo(cxt)) } } @@ -240,30 +221,9 @@ func createQueues(cxt *context) { var err error for _, q := range cxt.queues { - if cxt.enableNamespaceAsQueue { - _, err = cxt.kubeclient.CoreV1().Namespaces().Create(&v1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: q, - }, - }) - } else { - _, err = cxt.kbclient.Scheduling().Queues().Create(&kbv1.Queue{ - ObjectMeta: metav1.ObjectMeta{ - Name: q, - }, - Spec: kbv1.QueueSpec{ - Weight: 1, - }, - }) - } - - checkError(cxt, err) - } - - if !cxt.enableNamespaceAsQueue { - _, err := cxt.kbclient.Scheduling().Queues().Create(&kbv1.Queue{ + _, err = cxt.kbclient.SchedulingV1alpha1().Queues().Create(&kbv1.Queue{ ObjectMeta: metav1.ObjectMeta{ - Name: cxt.namespace, + Name: q, }, Spec: kbv1.QueueSpec{ Weight: 1, @@ -278,28 +238,16 @@ func deleteQueues(cxt *context) { foreground := metav1.DeletePropagationForeground for _, q := range cxt.queues { - var err error - - if cxt.enableNamespaceAsQueue { - err = cxt.kubeclient.CoreV1().Namespaces().Delete(q, &metav1.DeleteOptions{ - PropagationPolicy: &foreground, - }) - } else { - err = cxt.kbclient.Scheduling().Queues().Delete(q, &metav1.DeleteOptions{ - PropagationPolicy: &foreground, - }) - } - - checkError(cxt, err) - } - - if !cxt.enableNamespaceAsQueue { - err := cxt.kbclient.Scheduling().Queues().Delete(cxt.namespace, &metav1.DeleteOptions{ + err := cxt.kbclient.SchedulingV1alpha1().Queues().Delete(q, &metav1.DeleteOptions{ PropagationPolicy: &foreground, }) checkError(cxt, err) } + + // Wait for queues deleted + err := wait.Poll(100*time.Millisecond, oneMinute, queueNotExist(cxt)) + checkError(cxt, err) } type taskSpec struct { @@ -325,12 +273,6 @@ func getNS(context *context, job *jobSpec) string { return job.namespace } - if context.enableNamespaceAsQueue { - if len(job.queue) != 0 { - return job.queue - } - } - return context.namespace }