From 927a38155870bb3721ef493a6131e8abde3b3ff8 Mon Sep 17 00:00:00 2001 From: "Da K. Ma" Date: Sat, 12 Jan 2019 15:50:24 +0800 Subject: [PATCH] Added name when register plugin. Signed-off-by: Da K. Ma --- pkg/scheduler/framework/interface.go | 3 + pkg/scheduler/framework/session.go | 96 +++++++++++-------- pkg/scheduler/plugins/drf/drf.go | 10 +- pkg/scheduler/plugins/gang/gang.go | 20 ++-- .../plugins/predicates/predicates.go | 14 ++- pkg/scheduler/plugins/priority/priority.go | 13 ++- .../plugins/proportion/proportion.go | 10 +- 7 files changed, 93 insertions(+), 73 deletions(-) diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index cf079dc13..a92d0287c 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -32,6 +32,9 @@ type Action interface { } type Plugin interface { + // The unique name of Plugin. + Name() string + OnSessionOpen(ssn *Session) OnSessionClose(ssn *Session) } diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index 46d3d7ecb..aabe4b68c 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -45,14 +45,14 @@ type Session struct { plugins []Plugin eventHandlers []*EventHandler - jobOrderFns []api.CompareFn - queueOrderFns []api.CompareFn - taskOrderFns []api.CompareFn - predicateFns []api.PredicateFn - preemptableFns []api.PreemptableFn - reclaimableFns []api.ReclaimableFn - overusedFns []api.ValidateFn - jobReadyFns []api.ValidateFn + jobOrderFns map[string]api.CompareFn + queueOrderFns map[string]api.CompareFn + taskOrderFns map[string]api.CompareFn + predicateFns map[string]api.PredicateFn + preemptableFns map[string]api.PreemptableFn + reclaimableFns map[string]api.ReclaimableFn + overusedFns map[string]api.ValidateFn + jobReadyFns map[string]api.ValidateFn } func openSession(cache cache.Cache) *Session { @@ -62,6 +62,15 @@ func openSession(cache cache.Cache) *Session { JobIndex: map[api.JobID]*api.JobInfo{}, NodeIndex: map[string]*api.NodeInfo{}, QueueIndex: map[api.QueueID]*api.QueueInfo{}, + + jobOrderFns: map[string]api.CompareFn{}, + queueOrderFns: map[string]api.CompareFn{}, + taskOrderFns: map[string]api.CompareFn{}, + predicateFns: map[string]api.PredicateFn{}, + preemptableFns: map[string]api.PreemptableFn{}, + reclaimableFns: map[string]api.ReclaimableFn{}, + overusedFns: map[string]api.ValidateFn{}, + jobReadyFns: map[string]api.ValidateFn{}, } snapshot := cache.Snapshot() @@ -213,13 +222,15 @@ func (ssn *Session) dispatch(task *api.TaskInfo) error { func (ssn *Session) Reclaimable(reclaimer *api.TaskInfo, reclaimees []*api.TaskInfo) []*api.TaskInfo { var victims []*api.TaskInfo + var init bool for _, rf := range ssn.reclaimableFns { candidates := rf(reclaimer, reclaimees) - if victims == nil { + if !init { victims = candidates + init = true } else { - intersection := []*api.TaskInfo{} + var intersection []*api.TaskInfo // Get intersection of victims and candidates. for _, v := range victims { for _, c := range candidates { @@ -256,7 +267,10 @@ func (ssn *Session) Evict(reclaimee *api.TaskInfo, reason string) error { // Update task in node. if node, found := ssn.NodeIndex[reclaimee.NodeName]; found { - node.UpdateTask(reclaimee) + if err := node.UpdateTask(reclaimee); err != nil { + glog.Errorf("Failed to update task <%v/%v> in Session <%v>: %v", + reclaimee.Namespace, reclaimee.Name, ssn.UID, err) + } } for _, eh := range ssn.eventHandlers { @@ -275,22 +289,28 @@ func (ssn *Session) Preemptable(preemptor *api.TaskInfo, preemptees []*api.TaskI return nil } - victims := ssn.preemptableFns[0](preemptor, preemptees) - for _, pf := range ssn.preemptableFns[1:] { - intersection := []*api.TaskInfo{} + var victims []*api.TaskInfo + var init bool + for _, pf := range ssn.preemptableFns { candidates := pf(preemptor, preemptees) - // Get intersection of victims and candidates. - for _, v := range victims { - for _, c := range candidates { - if v.UID == c.UID { - intersection = append(intersection, v) + if !init { + victims = candidates + init = true + } else { + var intersection []*api.TaskInfo + // Get intersection of victims and candidates. + for _, v := range victims { + for _, c := range candidates { + if v.UID == c.UID { + intersection = append(intersection, v) + } } } - } - // Update victims to intersection - victims = intersection + // Update victims to intersection + victims = intersection + } } return victims @@ -324,36 +344,36 @@ func (ssn *Session) AddEventHandler(eh *EventHandler) { ssn.eventHandlers = append(ssn.eventHandlers, eh) } -func (ssn *Session) AddJobOrderFn(cf api.CompareFn) { - ssn.jobOrderFns = append(ssn.jobOrderFns, cf) +func (ssn *Session) AddJobOrderFn(name string, cf api.CompareFn) { + ssn.jobOrderFns[name] = cf } -func (ssn *Session) AddQueueOrderFn(qf api.CompareFn) { - ssn.queueOrderFns = append(ssn.queueOrderFns, qf) +func (ssn *Session) AddQueueOrderFn(name string, qf api.CompareFn) { + ssn.queueOrderFns[name] = qf } -func (ssn *Session) AddTaskOrderFn(cf api.CompareFn) { - ssn.taskOrderFns = append(ssn.taskOrderFns, cf) +func (ssn *Session) AddTaskOrderFn(name string, cf api.CompareFn) { + ssn.taskOrderFns[name] = cf } -func (ssn *Session) AddPreemptableFn(cf api.PreemptableFn) { - ssn.preemptableFns = append(ssn.preemptableFns, cf) +func (ssn *Session) AddPreemptableFn(name string, cf api.PreemptableFn) { + ssn.preemptableFns[name] = cf } -func (ssn *Session) AddReclaimableFn(rf api.ReclaimableFn) { - ssn.reclaimableFns = append(ssn.reclaimableFns, rf) +func (ssn *Session) AddReclaimableFn(name string, rf api.ReclaimableFn) { + ssn.reclaimableFns[name] = rf } -func (ssn *Session) AddJobReadyFn(vf api.ValidateFn) { - ssn.jobReadyFns = append(ssn.jobReadyFns, vf) +func (ssn *Session) AddJobReadyFn(name string, vf api.ValidateFn) { + ssn.jobReadyFns[name] = vf } -func (ssn *Session) AddPredicateFn(pf api.PredicateFn) { - ssn.predicateFns = append(ssn.predicateFns, pf) +func (ssn *Session) AddPredicateFn(name string, pf api.PredicateFn) { + ssn.predicateFns[name] = pf } -func (ssn *Session) AddOverusedFn(fn api.ValidateFn) { - ssn.overusedFns = append(ssn.overusedFns, fn) +func (ssn *Session) AddOverusedFn(name string, fn api.ValidateFn) { + ssn.overusedFns[name] = fn } func (ssn *Session) Overused(queue *api.QueueInfo) bool { diff --git a/pkg/scheduler/plugins/drf/drf.go b/pkg/scheduler/plugins/drf/drf.go index 223beeac0..47f51801f 100644 --- a/pkg/scheduler/plugins/drf/drf.go +++ b/pkg/scheduler/plugins/drf/drf.go @@ -107,10 +107,7 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) { return victims } - if drf.args.PreemptableFnEnabled { - // Add Preemptable function. - ssn.AddPreemptableFn(preemptableFn) - } + ssn.AddPreemptableFn(drf.Name(), preemptableFn) jobOrderFn := func(l interface{}, r interface{}) int { lv := l.(*api.JobInfo) @@ -130,10 +127,7 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) { return 1 } - if drf.args.JobOrderFnEnabled { - // Add Job Order function. - ssn.AddJobOrderFn(jobOrderFn) - } + ssn.AddJobOrderFn(drf.Name(), jobOrderFn) // Register event handlers. ssn.AddEventHandler(&framework.EventHandler{ diff --git a/pkg/scheduler/plugins/gang/gang.go b/pkg/scheduler/plugins/gang/gang.go index 6cf4fc0f3..7842e9794 100644 --- a/pkg/scheduler/plugins/gang/gang.go +++ b/pkg/scheduler/plugins/gang/gang.go @@ -36,6 +36,10 @@ func New(args *framework.PluginArgs) framework.Plugin { } } +func (gp *gangPlugin) Name() string { + return "gang" +} + // readyTaskNum return the number of tasks that are ready to run. func readyTaskNum(job *api.JobInfo) int32 { occupid := 0 @@ -102,11 +106,8 @@ func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) { } // TODO(k82cn): Support preempt/reclaim batch job. - ssn.AddReclaimableFn(preemptableFn) - - if gp.args.PreemptableFnEnabled { - ssn.AddPreemptableFn(preemptableFn) - } + ssn.AddReclaimableFn(gp.Name(), preemptableFn) + ssn.AddPreemptableFn(gp.Name(), preemptableFn) jobOrderFn := func(l, r interface{}) int { lv := l.(*api.JobInfo) @@ -144,13 +145,8 @@ func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) { return 0 } - if gp.args.JobOrderFnEnabled { - ssn.AddJobOrderFn(jobOrderFn) - } - - if gp.args.JobReadyFnEnabled { - ssn.AddJobReadyFn(jobReady) - } + ssn.AddJobOrderFn(gp.Name(), jobOrderFn) + ssn.AddJobReadyFn(gp.Name(), jobReady) } func (gp *gangPlugin) OnSessionClose(ssn *framework.Session) { diff --git a/pkg/scheduler/plugins/predicates/predicates.go b/pkg/scheduler/plugins/predicates/predicates.go index 77fce0f1b..845a3bdd4 100644 --- a/pkg/scheduler/plugins/predicates/predicates.go +++ b/pkg/scheduler/plugins/predicates/predicates.go @@ -31,16 +31,20 @@ import ( "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework" ) -type nodeAffinityPlugin struct { +type predicatesPlugin struct { args *framework.PluginArgs } func New(args *framework.PluginArgs) framework.Plugin { - return &nodeAffinityPlugin{ + return &predicatesPlugin{ args: args, } } +func (pp *predicatesPlugin) Name() string { + return "predicates" +} + type podLister struct { session *framework.Session } @@ -108,7 +112,7 @@ func CheckNodeUnschedulable(pod *v1.Pod, nodeInfo *cache.NodeInfo) (bool, []algo return true, nil, nil } -func (pp *nodeAffinityPlugin) OnSessionOpen(ssn *framework.Session) { +func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { pl := &podLister{ session: ssn, } @@ -117,7 +121,7 @@ func (pp *nodeAffinityPlugin) OnSessionOpen(ssn *framework.Session) { session: ssn, } - ssn.AddPredicateFn(func(task *api.TaskInfo, node *api.NodeInfo) error { + ssn.AddPredicateFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error { nodeInfo := cache.NewNodeInfo(node.Pods()...) nodeInfo.SetNode(node.Node) @@ -200,4 +204,4 @@ func (pp *nodeAffinityPlugin) OnSessionOpen(ssn *framework.Session) { }) } -func (pp *nodeAffinityPlugin) OnSessionClose(ssn *framework.Session) {} +func (pp *predicatesPlugin) OnSessionClose(ssn *framework.Session) {} diff --git a/pkg/scheduler/plugins/priority/priority.go b/pkg/scheduler/plugins/priority/priority.go index 1731f8518..9a09c076c 100644 --- a/pkg/scheduler/plugins/priority/priority.go +++ b/pkg/scheduler/plugins/priority/priority.go @@ -32,6 +32,10 @@ func New(args *framework.PluginArgs) framework.Plugin { } } +func (pp *priorityPlugin) Name() string { + return "priority" +} + func (pp *priorityPlugin) OnSessionOpen(ssn *framework.Session) { taskOrderFn := func(l interface{}, r interface{}) int { lv := l.(*api.TaskInfo) @@ -52,9 +56,7 @@ func (pp *priorityPlugin) OnSessionOpen(ssn *framework.Session) { } // Add Task Order function - if pp.args.TaskOrderFnEnabled { - ssn.AddTaskOrderFn(taskOrderFn) - } + ssn.AddTaskOrderFn(pp.Name(), taskOrderFn) jobOrderFn := func(l, r interface{}) int { lv := l.(*api.JobInfo) @@ -74,10 +76,7 @@ func (pp *priorityPlugin) OnSessionOpen(ssn *framework.Session) { return 0 } - if pp.args.JobOrderFnEnabled { - // Add Job Order function - ssn.AddJobOrderFn(jobOrderFn) - } + ssn.AddJobOrderFn(pp.Name(), jobOrderFn) } func (pp *priorityPlugin) OnSessionClose(ssn *framework.Session) {} diff --git a/pkg/scheduler/plugins/proportion/proportion.go b/pkg/scheduler/plugins/proportion/proportion.go index 1755b47c6..b88c50fe5 100644 --- a/pkg/scheduler/plugins/proportion/proportion.go +++ b/pkg/scheduler/plugins/proportion/proportion.go @@ -49,6 +49,10 @@ func New(args *framework.PluginArgs) framework.Plugin { } } +func (pp *proportionPlugin) Name() string { + return "proportion" +} + func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { // Prepare scheduling data for this session. for _, n := range ssn.Nodes { @@ -141,7 +145,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { } } - ssn.AddQueueOrderFn(func(l, r interface{}) int { + ssn.AddQueueOrderFn(pp.Name(), func(l, r interface{}) int { lv := l.(*api.QueueInfo) rv := r.(*api.QueueInfo) @@ -156,7 +160,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { return 1 }) - ssn.AddReclaimableFn(func(reclaimer *api.TaskInfo, reclaimees []*api.TaskInfo) []*api.TaskInfo { + ssn.AddReclaimableFn(pp.Name(), func(reclaimer *api.TaskInfo, reclaimees []*api.TaskInfo) []*api.TaskInfo { var victims []*api.TaskInfo allocations := map[api.QueueID]*api.Resource{} @@ -183,7 +187,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { return victims }) - ssn.AddOverusedFn(func(obj interface{}) bool { + ssn.AddOverusedFn(pp.Name(), func(obj interface{}) bool { queue := obj.(*api.QueueInfo) attr := pp.queueOpts[queue.UID]