diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 2e6f22f3a..f4941cb34 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -74,6 +74,8 @@ type SchedulerCache struct { kbclient *kbver.Clientset defaultQueue string + // schedulerName is the name for kube-batch scheduler + schedulerName string podInformer infov1.PodInformer nodeInformer infov1.NodeInformer @@ -189,6 +191,7 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s kubeclient: kubernetes.NewForConfigOrDie(config), kbclient: kbver.NewForConfigOrDie(config), defaultQueue: defaultQueue, + schedulerName: schedulerName, } // Prepare event clients. diff --git a/pkg/scheduler/cache/cache_test.go b/pkg/scheduler/cache/cache_test.go index 10f53aa1f..06486fd03 100644 --- a/pkg/scheduler/cache/cache_test.go +++ b/pkg/scheduler/cache/cache_test.go @@ -257,3 +257,53 @@ func TestAddNode(t *testing.T) { } } } + +func TestGetOrCreateJob(t *testing.T) { + owner1 := buildOwnerReference("j1") + owner2 := buildOwnerReference("j2") + + pod1 := buildPod("c1", "p1", "n1", v1.PodRunning, buildResourceList("1000m", "1G"), + []metav1.OwnerReference{owner1}, make(map[string]string)) + pi1 := api.NewTaskInfo(pod1) + pi1.Job = "j1" // The job name is set by cache. + + pod2 := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("1000m", "1G"), + []metav1.OwnerReference{owner2}, make(map[string]string)) + pod2.Spec.SchedulerName = "kube-batch" + pi2 := api.NewTaskInfo(pod2) + + pod3 := buildPod("c3", "p3", "n1", v1.PodRunning, buildResourceList("1000m", "1G"), + []metav1.OwnerReference{owner2}, make(map[string]string)) + pi3 := api.NewTaskInfo(pod3) + + cache := &SchedulerCache{ + Nodes: make(map[string]*api.NodeInfo), + Jobs: make(map[api.JobID]*api.JobInfo), + schedulerName: "kube-batch", + } + + tests := []struct { + task *api.TaskInfo + gotJob bool // whether getOrCreateJob will return job for corresponding task + }{ + { + task: pi1, + gotJob: true, + }, + { + task: pi2, + gotJob: true, + }, + { + task: pi3, + gotJob: false, + }, + } + for i, test := range tests { + result := cache.getOrCreateJob(test.task) != nil + if result != test.gotJob { + t.Errorf("case %d: \n expected %t, \n got %t \n", + i, test.gotJob, result) + } + } +} diff --git a/pkg/scheduler/cache/event_handlers.go b/pkg/scheduler/cache/event_handlers.go index 6834fd58f..7522be9b3 100644 --- a/pkg/scheduler/cache/event_handlers.go +++ b/pkg/scheduler/cache/event_handlers.go @@ -38,8 +38,15 @@ func isTerminated(status kbapi.TaskStatus) bool { return status == kbapi.Succeeded || status == kbapi.Failed } +// getOrCreateJob will return corresponding Job for pi if it exists, or it will create a Job and return it if +// pi.Pod.Spec.SchedulerName is same as kube-batch scheduler's name, otherwise it will return nil. func (sc *SchedulerCache) getOrCreateJob(pi *kbapi.TaskInfo) *kbapi.JobInfo { if len(pi.Job) == 0 { + if pi.Pod.Spec.SchedulerName != sc.schedulerName { + glog.V(4).Infof("Pod %s/%s will not not scheduled by %s, skip creating PodGroup and Job for it", + pi.Pod.Namespace, pi.Pod.Name, sc.schedulerName) + return nil + } pb := createShadowPodGroup(pi.Pod) pi.Job = kbapi.JobID(pb.Name) @@ -62,7 +69,9 @@ func (sc *SchedulerCache) getOrCreateJob(pi *kbapi.TaskInfo) *kbapi.JobInfo { func (sc *SchedulerCache) addTask(pi *kbapi.TaskInfo) error { job := sc.getOrCreateJob(pi) - job.AddTaskInfo(pi) + if job != nil { + job.AddTaskInfo(pi) + } if len(pi.NodeName) != 0 { if _, found := sc.Nodes[pi.NodeName]; !found {