Skip to content

Commit

Permalink
Add network-topology-aware plugin and hyperNode score callback
Browse files Browse the repository at this point in the history
Signed-off-by: ecosysbin <14729934+ecosysbin@user.noreply.gitee.com>
  • Loading branch information
ecosysbin committed Dec 24, 2024
1 parent c9bf5f3 commit e893b75
Show file tree
Hide file tree
Showing 10 changed files with 1,013 additions and 12 deletions.
9 changes: 9 additions & 0 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu
hyperNodesWithLeftTasks := make(map[string]*util.PriorityQueue)
ssn := alloc.session
selectedTier := 0
jobNewHyperNodeMap := map[string]string{}
jobHyperNode := job.PodGroup.Annotations[api.TopologyAllocateLCAHyperNode]

// Find a suitable hyperNode in one tier from down to top everytime to ensure that the selected hyperNode spans the least tier.
for index, tier := range alloc.hyperNodesTiers {
Expand All @@ -245,6 +247,9 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu
break
}
for _, hyperNodeName := range ssn.HyperNodesListByTier[tier] {
jobNewHyperNode, _ := util.FindLCAHyperNode(hyperNodeName, jobHyperNode, nil)
jobNewHyperNodeMap[hyperNodeName] = jobNewHyperNode

nodes, ok := ssn.HyperNodes[hyperNodeName]
if !ok {
klog.ErrorS(nil, "HyperNode not exists.", "jobName", job.UID, "name", hyperNodeName, "tier", tier)
Expand Down Expand Up @@ -286,6 +291,10 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu
klog.V(4).InfoS("Find available hyperNodes for job", "jobName", job.UID, "tier", selectedTier, "hyperNodes", hyperNodes)
}
stmt, hyperNode := alloc.selectBestHyperNode(jobStmtsByTier[selectedTier], job)

// set the LCA hyperNode for job,next scheduling will use this hyperNode as the base for selecting hyperNode.
jobNewHyperNode := jobNewHyperNodeMap[hyperNode]
job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] = jobNewHyperNode
return stmt, hyperNodesWithLeftTasks[hyperNode]
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/api/well_known_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,7 @@ const (

// topologyDecisionAnnotation is the key of topology decision about pod request resource
topologyDecisionAnnotation = "volcano.sh/topology-decision"

// TopologyAllocateLCAHyperNode is the key to the lowest common ancestor of the network topology to which the tasks assigned to a job belong.
TopologyAllocateLCAHyperNode = "volcano.sh/allocate-lca-hypernode"
)
6 changes: 5 additions & 1 deletion pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
vcinformerv1 "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1"
"volcano.sh/volcano/cmd/scheduler/app/options"
"volcano.sh/volcano/pkg/features"
"volcano.sh/volcano/pkg/scheduler/api"
schedulingapi "volcano.sh/volcano/pkg/scheduler/api"
volumescheduling "volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding"
"volcano.sh/volcano/pkg/scheduler/metrics"
Expand Down Expand Up @@ -1536,9 +1537,12 @@ func (sc *SchedulerCache) UpdateJobStatus(job *schedulingapi.JobInfo, updatePG b
if err != nil {
return nil, err
}
sc.Mutex.Lock()
sc.Jobs[job.UID].PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] = job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode]
sc.Mutex.Unlock()
job.PodGroup = pg
}

sc.Jobs[job.UID] = job
sc.RecordJobStatusEvent(job, updatePG)

return job, nil
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/conf/scheduler_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ type PluginOption struct {
EnabledOverused *bool `yaml:"enabledOverused"`
// EnabledAllocatable defines whether allocatable is enabled
EnabledAllocatable *bool `yaml:"enabledAllocatable"`
// EnabledNetworkTopology defines whether network topology is enabled
EnabledNetworkTopology *bool `yaml:"enabledNetworkTopology"`
// Arguments defines the different arguments that can be given to different plugins
Arguments map[string]interface{} `yaml:"arguments"`
}
4 changes: 3 additions & 1 deletion pkg/scheduler/framework/job_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,12 @@ func isPodGroupStatusUpdated(newStatus, oldStatus scheduling.PodGroupStatus) boo
func (ju *jobUpdater) updateJob(index int) {
job := ju.jobQueue[index]
ssn := ju.ssn
oldHyperNode := ssn.podGroupAnnotations[job.UID][api.TopologyAllocateLCAHyperNode]

job.PodGroup.Status = jobStatus(ssn, job)
oldStatus, found := ssn.podGroupStatus[job.UID]
updatePG := !found || isPodGroupStatusUpdated(job.PodGroup.Status, oldStatus)
updatePG := !found || isPodGroupStatusUpdated(job.PodGroup.Status, oldStatus) || oldHyperNode != job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode]

if _, err := ssn.cache.UpdateJobStatus(job, updatePG); err != nil {
klog.Errorf("Failed to update job <%s/%s>: %v",
job.Namespace, job.Name, err)
Expand Down
22 changes: 13 additions & 9 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type Session struct {
// This should not be mutated after initiated
podGroupStatus map[api.JobID]scheduling.PodGroupStatus

// recored old annotations for podgroup, used to detect changes
podGroupAnnotations map[api.JobID]map[string]string

Jobs map[api.JobID]*api.JobInfo
Nodes map[string]*api.NodeInfo
CSINodesStatus map[string]*api.CSINodeStatusInfo
Expand Down Expand Up @@ -125,15 +128,15 @@ func openSession(cache cache.Cache) *Session {
cache: cache,
informerFactory: cache.SharedInformerFactory(),

TotalResource: api.EmptyResource(),
TotalGuarantee: api.EmptyResource(),
podGroupStatus: map[api.JobID]scheduling.PodGroupStatus{},

Jobs: map[api.JobID]*api.JobInfo{},
Nodes: map[string]*api.NodeInfo{},
CSINodesStatus: map[string]*api.CSINodeStatusInfo{},
RevocableNodes: map[string]*api.NodeInfo{},
Queues: map[api.QueueID]*api.QueueInfo{},
TotalResource: api.EmptyResource(),
TotalGuarantee: api.EmptyResource(),
podGroupStatus: map[api.JobID]scheduling.PodGroupStatus{},
podGroupAnnotations: map[api.JobID]map[string]string{},
Jobs: map[api.JobID]*api.JobInfo{},
Nodes: map[string]*api.NodeInfo{},
CSINodesStatus: map[string]*api.CSINodeStatusInfo{},
RevocableNodes: map[string]*api.NodeInfo{},
Queues: map[api.QueueID]*api.QueueInfo{},

plugins: map[string]Plugin{},
jobOrderFns: map[string]api.CompareFn{},
Expand Down Expand Up @@ -171,6 +174,7 @@ func openSession(cache cache.Cache) *Session {
for _, job := range ssn.Jobs {
if job.PodGroup != nil {
ssn.podGroupStatus[job.UID] = *job.PodGroup.Status.DeepCopy()
ssn.podGroupAnnotations[job.UID] = job.PodGroup.GetAnnotations()
}

if vjr := ssn.JobValid(job); vjr != nil {
Expand Down
151 changes: 151 additions & 0 deletions pkg/scheduler/plugins/networktopologyaware/networktopologyaware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
Copyright 2019 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package networktopologyaware

import (
"math"

"k8s.io/klog/v2"

"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/util"
)

const (
// PluginName indicates name of volcano scheduler plugin.
PluginName = "networktopologyaware"
BaseScore = 100
TaskBaseScore = 10
NetworkTopologyWeight = "weight"
)

// HyperNodeTree is the hypernode tree of all hypernodes in the cluster.
// currentJobLCAHyperNode is the hypernode of the job's LCAHyperNode.
var (
HyperNodeTree []map[string][]string
)

type networkTopologyAwarePlugin struct {
// Arguments given for the plugin
pluginArguments framework.Arguments
}

// New function returns prioritizePlugin object
func New(arguments framework.Arguments) framework.Plugin {
return &networkTopologyAwarePlugin{
pluginArguments: arguments,
}
}

func (nta *networkTopologyAwarePlugin) Name() string {
return PluginName
}

func calculateWeight(args framework.Arguments) int {
weight := 1
args.GetInt(&weight, NetworkTopologyWeight)
return weight
}

func (nta *networkTopologyAwarePlugin) OnSessionOpen(ssn *framework.Session) {
klog.V(5).Infof("Enter networkTopologyAwarePlugin plugin ...")
defer func() {
klog.V(5).Infof("Leaving networkTopologyAware plugin ...")
}()

weight := calculateWeight(nta.pluginArguments)
hyperNodeFn := func(job *api.JobInfo, hyperNodes map[string][]*api.NodeInfo) (map[string]float64, error) {
hyperNodeScores := make(map[string]float64)
for hyperNode := range hyperNodes {
score := networkTopologyAwareScore(hyperNode, job, HyperNodeTree)
score *= float64(weight)
hyperNodeScores[hyperNode] = score
}

klog.V(1).Infof("networkTopologyAware score is: %v", hyperNodeScores)
return hyperNodeScores, nil
}

nodeFn := func(task *api.TaskInfo, nodes []*api.NodeInfo) (map[string]float64, error) {
taskJob := ssn.Jobs[task.Job]
nodeScores := make(map[string]float64)
for _, node := range nodes {
hyperNode := util.FindHyperNodeOfNode(node.Name, HyperNodeTree)
score := networkTopologyAwareScore(hyperNode, taskJob, HyperNodeTree)
score *= float64(weight)
nodeScores[node.Name] = score
}
klog.V(1).Infof("networkTopologyAware score is: %v", nodeScores)
return nodeScores, nil
}

ssn.AddHyperNodeOrederFn(nta.Name(), hyperNodeFn)
ssn.AddBatchNodeOrderFn(nta.Name(), nodeFn)
}

func (bp *networkTopologyAwarePlugin) OnSessionClose(ssn *framework.Session) {
}

// networkTopologyAwareScore use the best fit polices during scheduling.

// Explanation:
// The currentJobLCAHyperNode is a property of job and that serves as the smallest root in the hypernode tree of job.
// A job has multiple tasks, each belonging to a hypernode. This LCAHyperNode is the topmost and lowest common ancestor among the hypernodes of all tasks within the job.

// Goals:
// - The tier index to which the LCAHyperNode of a job belongs should be as low as possible.
// - Tasks under a job should be scheduled to one hyperNode as much as possible.
func networkTopologyAwareScore(hyperNodeName string, job *api.JobInfo, hyperNodeTree []map[string][]string) float64 {
jobHyperNode := job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode]
// job fist first scheduler.
if jobHyperNode == "" {
return BaseScore
}

if jobHyperNode == hyperNodeName {
return BaseScore
}

// Calculate hyperNode tier index score.
_, index := util.FindLCAHyperNode(hyperNodeName, jobHyperNode, hyperNodeTree)
if index <= 0 {
klog.V(4).Infof("find LCAhyperNode failed wtih %s in hyperNodeTree", hyperNodeName)
return 0.0
}
tierIndexScore := BaseScore * scoreHyperNodeWithIndex(index, 1, len(hyperNodeTree))

// Calculate tasks num score.
taskNum := util.FindJobTaskNumOfHyperNode(hyperNodeName, job, hyperNodeTree)
taskNumScore := 0.0
if len(job.Tasks) > 0 {
taskNumScore = TaskBaseScore * scoreHyperNodeWithTaskNum(taskNum, len(job.Tasks))
}
// aggregate scores
return tierIndexScore + taskNumScore
}

func scoreHyperNodeWithIndex(index int, minIndex int, maxIndex int) float64 {
// Use logarithmic operations to calculate scores and map the original values to the range between 0 and 1.
// Adjust the formula to ensure that the scores meet the requirements (excluding 0 and 1).
return (math.Log(float64(maxIndex)) - math.Log(float64(index))) / (math.Log(float64(maxIndex)) - math.Log(float64(minIndex)))
}

func scoreHyperNodeWithTaskNum(taskNum int, maxTaskNum int) float64 {
// Calculate task distribution rate as score and make sure the score to the range between 0 and 1.
return float64(taskNum) / float64(maxTaskNum)
}
Loading

0 comments on commit e893b75

Please sign in to comment.