Skip to content

Commit

Permalink
Do not create PodGroup and Job for task whose scheduler is not kube-b…
Browse files Browse the repository at this point in the history
…atch
  • Loading branch information
hex108 committed Mar 28, 2019
1 parent 5c60f9c commit 2d211aa
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 1 deletion.
3 changes: 3 additions & 0 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type SchedulerCache struct {
kbclient *kbver.Clientset

defaultQueue string
// schedulerName is the name for kube-batch scheduler
schedulerName string

podInformer infov1.PodInformer
nodeInformer infov1.NodeInformer
Expand Down Expand Up @@ -189,6 +191,7 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s
kubeclient: kubernetes.NewForConfigOrDie(config),
kbclient: kbver.NewForConfigOrDie(config),
defaultQueue: defaultQueue,
schedulerName: schedulerName,
}

// Prepare event clients.
Expand Down
50 changes: 50 additions & 0 deletions pkg/scheduler/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,53 @@ func TestAddNode(t *testing.T) {
}
}
}

func TestGetOrCreateJob(t *testing.T) {
owner1 := buildOwnerReference("j1")
owner2 := buildOwnerReference("j2")

pod1 := buildPod("c1", "p1", "n1", v1.PodRunning, buildResourceList("1000m", "1G"),
[]metav1.OwnerReference{owner1}, make(map[string]string))
pi1 := api.NewTaskInfo(pod1)
pi1.Job = "j1" // The job name is set by cache.

pod2 := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("1000m", "1G"),
[]metav1.OwnerReference{owner2}, make(map[string]string))
pod2.Spec.SchedulerName = "kube-batch"
pi2 := api.NewTaskInfo(pod2)

pod3 := buildPod("c3", "p3", "n1", v1.PodRunning, buildResourceList("1000m", "1G"),
[]metav1.OwnerReference{owner2}, make(map[string]string))
pi3 := api.NewTaskInfo(pod3)

cache := &SchedulerCache{
Nodes: make(map[string]*api.NodeInfo),
Jobs: make(map[api.JobID]*api.JobInfo),
schedulerName: "kube-batch",
}

tests := []struct {
task *api.TaskInfo
gotJob bool // whether getOrCreateJob will return job for corresponding task
}{
{
task: pi1,
gotJob: true,
},
{
task: pi2,
gotJob: true,
},
{
task: pi3,
gotJob: false,
},
}
for i, test := range tests {
result := cache.getOrCreateJob(test.task) != nil
if result != test.gotJob {
t.Errorf("case %d: \n expected %t, \n got %t \n",
i, test.gotJob, result)
}
}
}
11 changes: 10 additions & 1 deletion pkg/scheduler/cache/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,15 @@ func isTerminated(status kbapi.TaskStatus) bool {
return status == kbapi.Succeeded || status == kbapi.Failed
}

// getOrCreateJob will return corresponding Job for pi if it exists, or it will create a Job and return it if
// pi.Pod.Spec.SchedulerName is same as kube-batch scheduler's name, otherwise it will return nil.
func (sc *SchedulerCache) getOrCreateJob(pi *kbapi.TaskInfo) *kbapi.JobInfo {
if len(pi.Job) == 0 {
if pi.Pod.Spec.SchedulerName != sc.schedulerName {
glog.V(4).Infof("Pod %s/%s will not not scheduled by %s, skip creating PodGroup and Job for it",
pi.Pod.Namespace, pi.Pod.Name, sc.schedulerName)
return nil
}
pb := createShadowPodGroup(pi.Pod)
pi.Job = kbapi.JobID(pb.Name)

Expand All @@ -62,7 +69,9 @@ func (sc *SchedulerCache) getOrCreateJob(pi *kbapi.TaskInfo) *kbapi.JobInfo {

func (sc *SchedulerCache) addTask(pi *kbapi.TaskInfo) error {
job := sc.getOrCreateJob(pi)
job.AddTaskInfo(pi)
if job != nil {
job.AddTaskInfo(pi)
}

if len(pi.NodeName) != 0 {
if _, found := sc.Nodes[pi.NodeName]; !found {
Expand Down

0 comments on commit 2d211aa

Please sign in to comment.