Skip to content

Commit

Permalink
Merge pull request kubernetes-retired#549 from k82cn/kb_484_1
Browse files Browse the repository at this point in the history
Added name when register plugin.
  • Loading branch information
k8s-ci-robot authored Jan 12, 2019
2 parents a5c043a + 1bb4c30 commit 906cfff
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 73 deletions.
3 changes: 3 additions & 0 deletions pkg/scheduler/framework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ type Action interface {
}

type Plugin interface {
// The unique name of Plugin.
Name() string

OnSessionOpen(ssn *Session)
OnSessionClose(ssn *Session)
}
Expand Down
96 changes: 58 additions & 38 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ type Session struct {

plugins []Plugin
eventHandlers []*EventHandler
jobOrderFns []api.CompareFn
queueOrderFns []api.CompareFn
taskOrderFns []api.CompareFn
predicateFns []api.PredicateFn
preemptableFns []api.PreemptableFn
reclaimableFns []api.ReclaimableFn
overusedFns []api.ValidateFn
jobReadyFns []api.ValidateFn
jobOrderFns map[string]api.CompareFn
queueOrderFns map[string]api.CompareFn
taskOrderFns map[string]api.CompareFn
predicateFns map[string]api.PredicateFn
preemptableFns map[string]api.PreemptableFn
reclaimableFns map[string]api.ReclaimableFn
overusedFns map[string]api.ValidateFn
jobReadyFns map[string]api.ValidateFn
}

func openSession(cache cache.Cache) *Session {
Expand All @@ -62,6 +62,15 @@ func openSession(cache cache.Cache) *Session {
JobIndex: map[api.JobID]*api.JobInfo{},
NodeIndex: map[string]*api.NodeInfo{},
QueueIndex: map[api.QueueID]*api.QueueInfo{},

jobOrderFns: map[string]api.CompareFn{},
queueOrderFns: map[string]api.CompareFn{},
taskOrderFns: map[string]api.CompareFn{},
predicateFns: map[string]api.PredicateFn{},
preemptableFns: map[string]api.PreemptableFn{},
reclaimableFns: map[string]api.ReclaimableFn{},
overusedFns: map[string]api.ValidateFn{},
jobReadyFns: map[string]api.ValidateFn{},
}

snapshot := cache.Snapshot()
Expand Down Expand Up @@ -213,13 +222,15 @@ func (ssn *Session) dispatch(task *api.TaskInfo) error {

func (ssn *Session) Reclaimable(reclaimer *api.TaskInfo, reclaimees []*api.TaskInfo) []*api.TaskInfo {
var victims []*api.TaskInfo
var init bool

for _, rf := range ssn.reclaimableFns {
candidates := rf(reclaimer, reclaimees)
if victims == nil {
if !init {
victims = candidates
init = true
} else {
intersection := []*api.TaskInfo{}
var intersection []*api.TaskInfo
// Get intersection of victims and candidates.
for _, v := range victims {
for _, c := range candidates {
Expand Down Expand Up @@ -256,7 +267,10 @@ func (ssn *Session) Evict(reclaimee *api.TaskInfo, reason string) error {

// Update task in node.
if node, found := ssn.NodeIndex[reclaimee.NodeName]; found {
node.UpdateTask(reclaimee)
if err := node.UpdateTask(reclaimee); err != nil {
glog.Errorf("Failed to update task <%v/%v> in Session <%v>: %v",
reclaimee.Namespace, reclaimee.Name, ssn.UID, err)
}
}

for _, eh := range ssn.eventHandlers {
Expand All @@ -275,22 +289,28 @@ func (ssn *Session) Preemptable(preemptor *api.TaskInfo, preemptees []*api.TaskI
return nil
}

victims := ssn.preemptableFns[0](preemptor, preemptees)
for _, pf := range ssn.preemptableFns[1:] {
intersection := []*api.TaskInfo{}
var victims []*api.TaskInfo
var init bool

for _, pf := range ssn.preemptableFns {
candidates := pf(preemptor, preemptees)
// Get intersection of victims and candidates.
for _, v := range victims {
for _, c := range candidates {
if v.UID == c.UID {
intersection = append(intersection, v)
if !init {
victims = candidates
init = true
} else {
var intersection []*api.TaskInfo
// Get intersection of victims and candidates.
for _, v := range victims {
for _, c := range candidates {
if v.UID == c.UID {
intersection = append(intersection, v)
}
}
}
}

// Update victims to intersection
victims = intersection
// Update victims to intersection
victims = intersection
}
}

return victims
Expand Down Expand Up @@ -324,36 +344,36 @@ func (ssn *Session) AddEventHandler(eh *EventHandler) {
ssn.eventHandlers = append(ssn.eventHandlers, eh)
}

func (ssn *Session) AddJobOrderFn(cf api.CompareFn) {
ssn.jobOrderFns = append(ssn.jobOrderFns, cf)
func (ssn *Session) AddJobOrderFn(name string, cf api.CompareFn) {
ssn.jobOrderFns[name] = cf
}

func (ssn *Session) AddQueueOrderFn(qf api.CompareFn) {
ssn.queueOrderFns = append(ssn.queueOrderFns, qf)
func (ssn *Session) AddQueueOrderFn(name string, qf api.CompareFn) {
ssn.queueOrderFns[name] = qf
}

func (ssn *Session) AddTaskOrderFn(cf api.CompareFn) {
ssn.taskOrderFns = append(ssn.taskOrderFns, cf)
func (ssn *Session) AddTaskOrderFn(name string, cf api.CompareFn) {
ssn.taskOrderFns[name] = cf
}

func (ssn *Session) AddPreemptableFn(cf api.PreemptableFn) {
ssn.preemptableFns = append(ssn.preemptableFns, cf)
func (ssn *Session) AddPreemptableFn(name string, cf api.PreemptableFn) {
ssn.preemptableFns[name] = cf
}

func (ssn *Session) AddReclaimableFn(rf api.ReclaimableFn) {
ssn.reclaimableFns = append(ssn.reclaimableFns, rf)
func (ssn *Session) AddReclaimableFn(name string, rf api.ReclaimableFn) {
ssn.reclaimableFns[name] = rf
}

func (ssn *Session) AddJobReadyFn(vf api.ValidateFn) {
ssn.jobReadyFns = append(ssn.jobReadyFns, vf)
func (ssn *Session) AddJobReadyFn(name string, vf api.ValidateFn) {
ssn.jobReadyFns[name] = vf
}

func (ssn *Session) AddPredicateFn(pf api.PredicateFn) {
ssn.predicateFns = append(ssn.predicateFns, pf)
func (ssn *Session) AddPredicateFn(name string, pf api.PredicateFn) {
ssn.predicateFns[name] = pf
}

func (ssn *Session) AddOverusedFn(fn api.ValidateFn) {
ssn.overusedFns = append(ssn.overusedFns, fn)
func (ssn *Session) AddOverusedFn(name string, fn api.ValidateFn) {
ssn.overusedFns[name] = fn
}

func (ssn *Session) Overused(queue *api.QueueInfo) bool {
Expand Down
10 changes: 2 additions & 8 deletions pkg/scheduler/plugins/drf/drf.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,7 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) {
return victims
}

if drf.args.PreemptableFnEnabled {
// Add Preemptable function.
ssn.AddPreemptableFn(preemptableFn)
}
ssn.AddPreemptableFn(drf.Name(), preemptableFn)

jobOrderFn := func(l interface{}, r interface{}) int {
lv := l.(*api.JobInfo)
Expand All @@ -130,10 +127,7 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) {
return 1
}

if drf.args.JobOrderFnEnabled {
// Add Job Order function.
ssn.AddJobOrderFn(jobOrderFn)
}
ssn.AddJobOrderFn(drf.Name(), jobOrderFn)

// Register event handlers.
ssn.AddEventHandler(&framework.EventHandler{
Expand Down
20 changes: 8 additions & 12 deletions pkg/scheduler/plugins/gang/gang.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ func New(args *framework.PluginArgs) framework.Plugin {
}
}

func (gp *gangPlugin) Name() string {
return "gang"
}

// readyTaskNum return the number of tasks that are ready to run.
func readyTaskNum(job *api.JobInfo) int32 {
occupid := 0
Expand Down Expand Up @@ -102,11 +106,8 @@ func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) {
}

// TODO(k82cn): Support preempt/reclaim batch job.
ssn.AddReclaimableFn(preemptableFn)

if gp.args.PreemptableFnEnabled {
ssn.AddPreemptableFn(preemptableFn)
}
ssn.AddReclaimableFn(gp.Name(), preemptableFn)
ssn.AddPreemptableFn(gp.Name(), preemptableFn)

jobOrderFn := func(l, r interface{}) int {
lv := l.(*api.JobInfo)
Expand Down Expand Up @@ -144,13 +145,8 @@ func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) {
return 0
}

if gp.args.JobOrderFnEnabled {
ssn.AddJobOrderFn(jobOrderFn)
}

if gp.args.JobReadyFnEnabled {
ssn.AddJobReadyFn(jobReady)
}
ssn.AddJobOrderFn(gp.Name(), jobOrderFn)
ssn.AddJobReadyFn(gp.Name(), jobReady)
}

func (gp *gangPlugin) OnSessionClose(ssn *framework.Session) {
Expand Down
14 changes: 9 additions & 5 deletions pkg/scheduler/plugins/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,20 @@ import (
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
)

type nodeAffinityPlugin struct {
type predicatesPlugin struct {
args *framework.PluginArgs
}

func New(args *framework.PluginArgs) framework.Plugin {
return &nodeAffinityPlugin{
return &predicatesPlugin{
args: args,
}
}

func (pp *predicatesPlugin) Name() string {
return "predicates"
}

type podLister struct {
session *framework.Session
}
Expand Down Expand Up @@ -108,7 +112,7 @@ func CheckNodeUnschedulable(pod *v1.Pod, nodeInfo *cache.NodeInfo) (bool, []algo
return true, nil, nil
}

func (pp *nodeAffinityPlugin) OnSessionOpen(ssn *framework.Session) {
func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
pl := &podLister{
session: ssn,
}
Expand All @@ -117,7 +121,7 @@ func (pp *nodeAffinityPlugin) OnSessionOpen(ssn *framework.Session) {
session: ssn,
}

ssn.AddPredicateFn(func(task *api.TaskInfo, node *api.NodeInfo) error {
ssn.AddPredicateFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error {
nodeInfo := cache.NewNodeInfo(node.Pods()...)
nodeInfo.SetNode(node.Node)

Expand Down Expand Up @@ -200,4 +204,4 @@ func (pp *nodeAffinityPlugin) OnSessionOpen(ssn *framework.Session) {
})
}

func (pp *nodeAffinityPlugin) OnSessionClose(ssn *framework.Session) {}
func (pp *predicatesPlugin) OnSessionClose(ssn *framework.Session) {}
13 changes: 6 additions & 7 deletions pkg/scheduler/plugins/priority/priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func New(args *framework.PluginArgs) framework.Plugin {
}
}

func (pp *priorityPlugin) Name() string {
return "priority"
}

func (pp *priorityPlugin) OnSessionOpen(ssn *framework.Session) {
taskOrderFn := func(l interface{}, r interface{}) int {
lv := l.(*api.TaskInfo)
Expand All @@ -52,9 +56,7 @@ func (pp *priorityPlugin) OnSessionOpen(ssn *framework.Session) {
}

// Add Task Order function
if pp.args.TaskOrderFnEnabled {
ssn.AddTaskOrderFn(taskOrderFn)
}
ssn.AddTaskOrderFn(pp.Name(), taskOrderFn)

jobOrderFn := func(l, r interface{}) int {
lv := l.(*api.JobInfo)
Expand All @@ -74,10 +76,7 @@ func (pp *priorityPlugin) OnSessionOpen(ssn *framework.Session) {
return 0
}

if pp.args.JobOrderFnEnabled {
// Add Job Order function
ssn.AddJobOrderFn(jobOrderFn)
}
ssn.AddJobOrderFn(pp.Name(), jobOrderFn)
}

func (pp *priorityPlugin) OnSessionClose(ssn *framework.Session) {}
10 changes: 7 additions & 3 deletions pkg/scheduler/plugins/proportion/proportion.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func New(args *framework.PluginArgs) framework.Plugin {
}
}

func (pp *proportionPlugin) Name() string {
return "proportion"
}

func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
// Prepare scheduling data for this session.
for _, n := range ssn.Nodes {
Expand Down Expand Up @@ -141,7 +145,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
}
}

ssn.AddQueueOrderFn(func(l, r interface{}) int {
ssn.AddQueueOrderFn(pp.Name(), func(l, r interface{}) int {
lv := l.(*api.QueueInfo)
rv := r.(*api.QueueInfo)

Expand All @@ -156,7 +160,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
return 1
})

ssn.AddReclaimableFn(func(reclaimer *api.TaskInfo, reclaimees []*api.TaskInfo) []*api.TaskInfo {
ssn.AddReclaimableFn(pp.Name(), func(reclaimer *api.TaskInfo, reclaimees []*api.TaskInfo) []*api.TaskInfo {
var victims []*api.TaskInfo
allocations := map[api.QueueID]*api.Resource{}

Expand All @@ -183,7 +187,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
return victims
})

ssn.AddOverusedFn(func(obj interface{}) bool {
ssn.AddOverusedFn(pp.Name(), func(obj interface{}) bool {
queue := obj.(*api.QueueInfo)
attr := pp.queueOpts[queue.UID]

Expand Down

0 comments on commit 906cfff

Please sign in to comment.