Skip to content

Commit

Permalink
Merge pull request volcano-sh#8 from dingtsh1/master
Browse files Browse the repository at this point in the history
The scheduler supports the scheduling algorithm of the map-reduce str…
  • Loading branch information
volcano-sh-bot authored May 9, 2019
2 parents 245bec1 + 5f0c4ea commit fad1182
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
break
}

nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.NodeOrderFn)
nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)

node := util.SelectBestNode(nodeScores)
// Allocate idle resource to the task.
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func preempt(

predicateNodes := util.PredicateNodes(preemptor, allNodes, ssn.PredicateFn)

nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.NodeOrderFn)
nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)

selectedNodes := util.SortNodes(nodeScores)
for _, node := range selectedNodes {
Expand Down
16 changes: 16 additions & 0 deletions pkg/scheduler/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ limitations under the License.

package api

import (
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
)

// TaskStatus defines the status of a task/pod.
type TaskStatus int

Expand Down Expand Up @@ -106,3 +110,15 @@ type EvictableFn func(*TaskInfo, []*TaskInfo) []*TaskInfo

// NodeOrderFn is the func declaration used to get priority score for a node for a particular task.
type NodeOrderFn func(*TaskInfo, *NodeInfo) (float64, error)

// NodeMapFn is the func declaration used to get priority score for a node for a particular task.
type NodeMapFn func(*TaskInfo, *NodeInfo) (float64, error)

// NodeReduceFn is the func declaration used to reduce priority score for a node for a particular task.
type NodeReduceFn func(*TaskInfo, schedulerapi.HostPriorityList) error

// NodeOrderMapFn is the func declaration used to get priority score of all plugins for a node for a particular task.
type NodeOrderMapFn func(*TaskInfo, *NodeInfo) (map[string]float64, float64, error)

// NodeOrderReduceFn is the func declaration used to reduce priority score of all nodes for a plugiin for a particular task.
type NodeOrderReduceFn func(*TaskInfo, map[string]schedulerapi.HostPriorityList) (map[string]float64, error)
4 changes: 4 additions & 0 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type Session struct {
taskOrderFns map[string]api.CompareFn
predicateFns map[string]api.PredicateFn
nodeOrderFns map[string]api.NodeOrderFn
nodeMapFns map[string]api.NodeMapFn
nodeReduceFns map[string]api.NodeReduceFn
preemptableFns map[string]api.EvictableFn
reclaimableFns map[string]api.EvictableFn
overusedFns map[string]api.ValidateFn
Expand All @@ -75,6 +77,8 @@ func openSession(cache cache.Cache) *Session {
taskOrderFns: map[string]api.CompareFn{},
predicateFns: map[string]api.PredicateFn{},
nodeOrderFns: map[string]api.NodeOrderFn{},
nodeMapFns: map[string]api.NodeMapFn{},
nodeReduceFns: map[string]api.NodeReduceFn{},
preemptableFns: map[string]api.EvictableFn{},
reclaimableFns: map[string]api.EvictableFn{},
overusedFns: map[string]api.ValidateFn{},
Expand Down
63 changes: 63 additions & 0 deletions pkg/scheduler/framework/session_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package framework

import (
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
)

// AddJobOrderFn add job order function
Expand Down Expand Up @@ -65,6 +66,16 @@ func (ssn *Session) AddNodeOrderFn(name string, pf api.NodeOrderFn) {
ssn.nodeOrderFns[name] = pf
}

// AddNodeMapFn add Node map function
func (ssn *Session) AddNodeMapFn(name string, pf api.NodeMapFn) {
ssn.nodeMapFns[name] = pf
}

// AddNodeReduceFn add Node reduce function
func (ssn *Session) AddNodeReduceFn(name string, pf api.NodeReduceFn) {
ssn.nodeReduceFns[name] = pf
}

// AddOverusedFn add overused function
func (ssn *Session) AddOverusedFn(name string, fn api.ValidateFn) {
ssn.overusedFns[name] = fn
Expand Down Expand Up @@ -375,3 +386,55 @@ func (ssn *Session) NodeOrderFn(task *api.TaskInfo, node *api.NodeInfo) (float64
func isEnabled(enabled *bool) bool {
return enabled != nil && *enabled
}

// NodeOrderMapFn invoke node order function of the plugins
func (ssn *Session) NodeOrderMapFn(task *api.TaskInfo, node *api.NodeInfo) (map[string]float64, float64, error) {
nodeScoreMap := map[string]float64{}
var priorityScore float64
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledNodeOrder) {
continue
}
if pfn, found := ssn.nodeOrderFns[plugin.Name]; found {
score, err := pfn(task, node)
if err != nil {
return nodeScoreMap, priorityScore, err
}
priorityScore = priorityScore + score
}
if pfn, found := ssn.nodeMapFns[plugin.Name]; found {
score, err := pfn(task, node)
if err != nil {
return nodeScoreMap, priorityScore, err
}
nodeScoreMap[plugin.Name] = score
}

}
}
return nodeScoreMap, priorityScore, nil
}

// NodeOrderReduceFn invoke node order function of the plugins
func (ssn *Session) NodeOrderReduceFn(task *api.TaskInfo, pluginNodeScoreMap map[string]schedulerapi.HostPriorityList) (map[string]float64, error) {
nodeScoreMap := map[string]float64{}
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledNodeOrder) {
continue
}
pfn, found := ssn.nodeReduceFns[plugin.Name]
if !found {
continue
}
if err := pfn(task, pluginNodeScoreMap[plugin.Name]); err != nil {
return nodeScoreMap, err
}
for _, hp := range pluginNodeScoreMap[plugin.Name] {
nodeScoreMap[hp.Host] = nodeScoreMap[hp.Host] + float64(hp.Score)
}
}
}
return nodeScoreMap, nil
}
41 changes: 37 additions & 4 deletions pkg/scheduler/util/scheduler_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package util

import (
"context"
"math"
"math/rand"
"sort"
"sync"
Expand All @@ -26,6 +27,7 @@ import (
"k8s.io/client-go/util/workqueue"

"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
)

// PredicateNodes returns nodes that fit task
Expand Down Expand Up @@ -55,23 +57,54 @@ func PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateF
}

// PrioritizeNodes returns a map whose key is node's score and value are corresponding nodes
func PrioritizeNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.NodeOrderFn) map[float64][]*api.NodeInfo {
func PrioritizeNodes(task *api.TaskInfo, nodes []*api.NodeInfo, mapFn api.NodeOrderMapFn, reduceFn api.NodeOrderReduceFn) map[float64][]*api.NodeInfo {
pluginNodeScoreMap := map[string]schedulerapi.HostPriorityList{}
nodeOrderScoreMap := map[string]float64{}
nodeScores := map[float64][]*api.NodeInfo{}

var workerLock sync.Mutex
scoreNode := func(index int) {
node := nodes[index]
score, err := fn(task, node)
mapScores, orderScore, err := mapFn(task, node)
if err != nil {
glog.Errorf("Error in Calculating Priority for the node:%v", err)
return
}

workerLock.Lock()
nodeScores[score] = append(nodeScores[score], node)
for plugin, score := range mapScores {
nodeScoreMap, ok := pluginNodeScoreMap[plugin]
if !ok {
nodeScoreMap = schedulerapi.HostPriorityList{}
}
hp := schedulerapi.HostPriority{}
hp.Host = node.Name
hp.Score = int(math.Floor(score))
pluginNodeScoreMap[plugin] = append(nodeScoreMap, hp)
}
nodeOrderScoreMap[node.Name] = orderScore
workerLock.Unlock()
}
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), scoreNode)
reduceScores, err := reduceFn(task, pluginNodeScoreMap)
if err != nil {
glog.Errorf("Error in Calculating Priority for the node:%v", err)
return nodeScores
}
for _, node := range nodes {
if score, found := reduceScores[node.Name]; found {
if orderScore, ok := nodeOrderScoreMap[node.Name]; ok {
score = score + orderScore
}
nodeScores[score] = append(nodeScores[score], node)
} else {
// If no plugin is applied to this node, the default is 0.0
score = 0.0
if orderScore, ok := nodeOrderScoreMap[node.Name]; ok {
score = score + orderScore
}
nodeScores[score] = append(nodeScores[score], node)
}
}
return nodeScores
}

Expand Down

0 comments on commit fad1182

Please sign in to comment.