Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add network-topology-aware plugin and hyperNode score callback #3894

Open
wants to merge 3 commits into
base: network-topology
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu
hyperNodesWithLeftTasks := make(map[string]*util.PriorityQueue)
ssn := alloc.session
selectedTier := 0
jobNewHyperNodeMap := map[string]string{}
jobHyperNode := job.PodGroup.Annotations[api.TopologyAllocateLCAHyperNode]

// Find a suitable hyperNode in one tier from down to top everytime to ensure that the selected hyperNode spans the least tier.
for index, tier := range alloc.hyperNodesTiers {
Expand All @@ -245,6 +247,15 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu
break
}
for _, hyperNodeName := range ssn.HyperNodesListByTier[tier] {
// job is scheduled for the first time
jobNewHyperNodeMap[hyperNodeName] = hyperNodeName
if jobHyperNode != "" {
jobNewHyperNode, _ := util.FindLCAHyperNode(hyperNodeName, jobHyperNode, nil)
// to do.
// check whether the hyperNode meets the requirements of the topology hard tier.
jobNewHyperNodeMap[hyperNodeName] = jobNewHyperNode
}

nodes, ok := ssn.HyperNodes[hyperNodeName]
if !ok {
klog.ErrorS(nil, "HyperNode not exists.", "jobName", job.UID, "name", hyperNodeName, "tier", tier)
Expand Down Expand Up @@ -286,6 +297,8 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu
klog.V(4).InfoS("Find available hyperNodes for job", "jobName", job.UID, "tier", selectedTier, "hyperNodes", hyperNodes)
}
stmt, hyperNode := alloc.selectBestHyperNode(jobStmtsByTier[selectedTier], job)
jobNewHyperNode := jobNewHyperNodeMap[hyperNode]
job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] = jobNewHyperNode
return stmt, hyperNodesWithLeftTasks[hyperNode]
}

Expand Down Expand Up @@ -390,6 +403,18 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
continue
}

// recored hyperNode of the job
if hyperNode == "" {
jobHyperNode := job.PodGroup.Annotations[api.TopologyAllocateLCAHyperNode]
hyperNodeOfNode := util.FindHyperNodeOfNode(bestNode.Name, ssn.HyperNodes)
newJobHyperNode := hyperNodeOfNode
if jobHyperNode != "" {
// job is not scheduled for the first time
newJobHyperNode, _ = util.FindLCAHyperNode(hyperNodeOfNode, jobHyperNode, ssn.HyperNodeTree)
}
job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] = newJobHyperNode
}

alloc.sumNodeScoresInHyperNode(string(job.UID), hyperNode, highestScore)
alloc.allocateResourcesForTask(stmt, task, bestNode, job)

Expand Down
10 changes: 10 additions & 0 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/klog/v2"
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"

"volcano.sh/apis/pkg/apis/scheduling"
"volcano.sh/apis/pkg/apis/scheduling/v1beta1"

"volcano.sh/volcano/pkg/scheduler/api/devices/nvidia/gpushare"
Expand Down Expand Up @@ -90,6 +91,15 @@ type NodeInfo struct {
ImageStates map[string]*k8sframework.ImageStateSummary
}

// Recored podgroup old state
type PodGroupOldState struct {
// podGroupStatus cache podgroup status during schedule
// This should not be mutated after initiated
Status map[JobID]scheduling.PodGroupStatus
// recored old annotations for podgroup, used to detect changes
Annotations map[JobID]map[string]string
}

// FutureIdle returns resources that will be idle in the future:
//
// That is current idle resources plus released resources minus pipelined resources.
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/api/well_known_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,7 @@ const (

// topologyDecisionAnnotation is the key of topology decision about pod request resource
topologyDecisionAnnotation = "volcano.sh/topology-decision"

// TopologyAllocateLCAHyperNode is the key to the lowest common ancestor of the network topology to which the tasks assigned to a job belong.
TopologyAllocateLCAHyperNode = "volcano.sh/allocate-lca-hypernode"
)
5 changes: 4 additions & 1 deletion pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
vcinformerv1 "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1"
"volcano.sh/volcano/cmd/scheduler/app/options"
"volcano.sh/volcano/pkg/features"
"volcano.sh/volcano/pkg/scheduler/api"
schedulingapi "volcano.sh/volcano/pkg/scheduler/api"
volumescheduling "volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding"
"volcano.sh/volcano/pkg/scheduler/metrics"
Expand Down Expand Up @@ -1536,9 +1537,11 @@ func (sc *SchedulerCache) UpdateJobStatus(job *schedulingapi.JobInfo, updatePG b
if err != nil {
return nil, err
}
sc.Mutex.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is no need to add lock? session --> UpdateJobStatus is still in the same goroutine, and there is no other goroutine will read or write this map

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the discussion, locks need to be added for all operations on the cache.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is to update podgroup to apiserver. If you just want to update cache, should not be here .

sc.Jobs[job.UID].PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] = job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please move to event handler where hande the pogroup update event.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if we want store some results get in scheduling cycle to the cache, do it in another function while close session. Just keep this UpdateJobStatus clean.

sc.Mutex.Unlock()
job.PodGroup = pg
}

sc.RecordJobStatusEvent(job, updatePG)

return job, nil
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/conf/scheduler_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ type PluginOption struct {
EnabledOverused *bool `yaml:"enabledOverused"`
// EnabledAllocatable defines whether allocatable is enabled
EnabledAllocatable *bool `yaml:"enabledAllocatable"`
// EnabledNetworkTopology defines whether network topology is enabled
EnabledNetworkTopology *bool `yaml:"enabledNetworkTopology"`
// Arguments defines the different arguments that can be given to different plugins
Arguments map[string]interface{} `yaml:"arguments"`
}
6 changes: 4 additions & 2 deletions pkg/scheduler/framework/job_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,12 @@ func isPodGroupStatusUpdated(newStatus, oldStatus scheduling.PodGroupStatus) boo
func (ju *jobUpdater) updateJob(index int) {
job := ju.jobQueue[index]
ssn := ju.ssn
oldHyperNode := ssn.PodGroupOldState.Annotations[job.UID][api.TopologyAllocateLCAHyperNode]

job.PodGroup.Status = jobStatus(ssn, job)
oldStatus, found := ssn.podGroupStatus[job.UID]
updatePG := !found || isPodGroupStatusUpdated(job.PodGroup.Status, oldStatus)
oldStatus, found := ssn.PodGroupOldState.Status[job.UID]
updatePG := !found || isPodGroupStatusUpdated(job.PodGroup.Status, oldStatus) || oldHyperNode != job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode]

if _, err := ssn.cache.UpdateJobStatus(job, updatePG); err != nil {
klog.Errorf("Failed to update job <%s/%s>: %v",
job.Namespace, job.Name, err)
Expand Down
16 changes: 11 additions & 5 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ type Session struct {

TotalResource *api.Resource
TotalGuarantee *api.Resource
// podGroupStatus cache podgroup status during schedule

// PodGroupOldState contains podgroup status and annotations during schedule
// This should not be mutated after initiated
podGroupStatus map[api.JobID]scheduling.PodGroupStatus
api.PodGroupOldState

Jobs map[api.JobID]*api.JobInfo
Nodes map[string]*api.NodeInfo
Expand All @@ -81,6 +82,8 @@ type Session struct {
HyperNodesListByTier map[int][]string
// HyperNodes maps hyperNode Name -> nodes under the hyperNode.
HyperNodes map[string][]*api.NodeInfo
// HyperNodeTree is the hypernode tree of all hypernodes in the cluster.
HyperNodeTree []map[string][]string

plugins map[string]Plugin
eventHandlers []*EventHandler
Expand Down Expand Up @@ -127,8 +130,10 @@ func openSession(cache cache.Cache) *Session {

TotalResource: api.EmptyResource(),
TotalGuarantee: api.EmptyResource(),
podGroupStatus: map[api.JobID]scheduling.PodGroupStatus{},

PodGroupOldState: api.PodGroupOldState{
Status: map[api.JobID]scheduling.PodGroupStatus{},
Annotations: map[api.JobID]map[string]string{},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove annotaions, because it exist in job's deep copied podgroup in snapshot. Here we need to record old podGroupStatus is that it will be changed during scheduling cycle. The annotation is no need to be stored anymore here.

},
Jobs: map[api.JobID]*api.JobInfo{},
Nodes: map[string]*api.NodeInfo{},
CSINodesStatus: map[string]*api.CSINodeStatusInfo{},
Expand Down Expand Up @@ -170,7 +175,8 @@ func openSession(cache cache.Cache) *Session {
ssn.Jobs = snapshot.Jobs
for _, job := range ssn.Jobs {
if job.PodGroup != nil {
ssn.podGroupStatus[job.UID] = *job.PodGroup.Status.DeepCopy()
ssn.PodGroupOldState.Status[job.UID] = *job.PodGroup.Status.DeepCopy()
ssn.PodGroupOldState.Annotations[job.UID] = job.PodGroup.GetAnnotations()
}

if vjr := ssn.JobValid(job); vjr != nil {
Expand Down
198 changes: 198 additions & 0 deletions pkg/scheduler/plugins/networktopologyaware/networktopologyaware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
Copyright 2019 The Volcano Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package networktopologyaware

import (
"k8s.io/klog/v2"

"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/util"
)

const (
// PluginName indicates name of volcano scheduler plugin.
PluginName = "networktopologyaware"
BaseScore = 100.0
TaskBaseScore = 10.0
ZeroScore = 0.0
NetworkTopologyWeight = "weight"
)

type networkTopologyAwarePlugin struct {
// Arguments given for the plugin
pluginArguments framework.Arguments
}

// New function returns prioritizePlugin object
func New(arguments framework.Arguments) framework.Plugin {
return &networkTopologyAwarePlugin{
pluginArguments: arguments,
}
}

func (nta *networkTopologyAwarePlugin) Name() string {
return PluginName
}

func calculateWeight(args framework.Arguments) int {
weight := 1
args.GetInt(&weight, NetworkTopologyWeight)
return weight
}

func (nta *networkTopologyAwarePlugin) OnSessionOpen(ssn *framework.Session) {
klog.V(5).Infof("Enter networkTopologyAwarePlugin plugin ...")
defer func() {
klog.V(5).Infof("Leaving networkTopologyAware plugin ...")
}()

weight := calculateWeight(nta.pluginArguments)
hyperNodeFn := func(job *api.JobInfo, hyperNodes map[string][]*api.NodeInfo) (map[string]float64, error) {
hyperNodeScores := make(map[string]float64)
jobHyperNode := job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode]
// job is scheduled for the first time, All hyperNodes have the same score..
if jobHyperNode == "" {
for hyperNode := range hyperNodes {
hyperNodeScores[hyperNode] = ZeroScore
}
return hyperNodeScores, nil
}
// job is not scheduled for the first time, calculate score based on hypernode tree.
maxScore := ZeroScore
scoreHyperNode := map[float64][]string{}
for hyperNode := range hyperNodes {
score := networkTopologyAwareScore(hyperNode, job, ssn.HyperNodeTree)
score *= float64(weight)
hyperNodeScores[hyperNode] = score
if score >= maxScore {
maxScore = score
scoreHyperNode[score] = append(scoreHyperNode[score], hyperNode)
}
}
// calculate score based on task num if max score hyperNode has more than one.
if len(scoreHyperNode[maxScore]) > 1 {
for hyperNode, score := range hyperNodeScores {
if score == maxScore {
taskNumScore := networkTopologyAwareScoreWithTaskNum(hyperNode, job, ssn.HyperNodes)
taskNumScore *= float64(weight)
hyperNodeScores[hyperNode] += taskNumScore
}
}
}

klog.V(1).Infof("networkTopologyAware score is: %v", hyperNodeScores)
return hyperNodeScores, nil
}

nodeFn := func(task *api.TaskInfo, nodes []*api.NodeInfo) (map[string]float64, error) {
nodeScores := make(map[string]float64)

taskJob := ssn.Jobs[task.Job]
jobHyperNode := taskJob.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode]
// job fist first scheduler, All node have the same score.
if jobHyperNode == "" {
for _, node := range nodes {
nodeScores[node.Name] = ZeroScore
}
return nodeScores, nil
}
// job not first scheduler, calculate score based on hypernode tree.
for _, node := range nodes {
hyperNode := util.FindHyperNodeOfNode(node.Name, ssn.HyperNodes)
score := networkTopologyAwareScore(hyperNode, taskJob, ssn.HyperNodeTree)
score *= float64(weight)
nodeScores[node.Name] = score
}

maxScore := ZeroScore
scoreNodes := map[float64][]string{}
for _, node := range nodes {
hyperNode := util.FindHyperNodeOfNode(node.Name, ssn.HyperNodes)
score := networkTopologyAwareScore(hyperNode, taskJob, ssn.HyperNodeTree)
score *= float64(weight)
nodeScores[node.Name] = score
if score >= maxScore {
maxScore = score
scoreNodes[score] = append(scoreNodes[score], node.Name)
}
}
// calculate score based on task num if max score hyperNode has more than one.
if len(scoreNodes[maxScore]) > 1 {
for node, score := range nodeScores {
if score == maxScore {
hyperNode := util.FindHyperNodeOfNode(node, ssn.HyperNodes)
taskNumScore := networkTopologyAwareScoreWithTaskNum(hyperNode, taskJob, ssn.HyperNodes)
taskNumScore *= float64(weight)
nodeScores[node] += taskNumScore
}
}
}

klog.V(1).Infof("networkTopologyAware score is: %v", nodeScores)
return nodeScores, nil
}

ssn.AddHyperNodeOrederFn(nta.Name(), hyperNodeFn)
ssn.AddBatchNodeOrderFn(nta.Name(), nodeFn)
}

func (bp *networkTopologyAwarePlugin) OnSessionClose(ssn *framework.Session) {
}

// networkTopologyAwareScore use the best fit polices during scheduling.

// Goals:
// - The tier index to which the LCAHyperNode of a job belongs should be as low as possible.
func networkTopologyAwareScore(hyperNodeName string, job *api.JobInfo, hyperNodeTree []map[string][]string) float64 {
jobHyperNode := job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode]

if jobHyperNode == hyperNodeName {
return BaseScore
}
// Calculate hyperNode tier index score.
_, index := util.FindLCAHyperNode(hyperNodeName, jobHyperNode, hyperNodeTree)
if index <= 0 {
klog.V(4).Infof("find LCAhyperNode failed wtih %s in hyperNodeTree", hyperNodeName)
return 0.0
}
tierIndexScore := BaseScore * scoreHyperNodeWithIndex(index, 1, len(hyperNodeTree))

return tierIndexScore
}

// Goals:
// - Tasks under a job should be scheduled to one hyperNode as much as possible.
func networkTopologyAwareScoreWithTaskNum(hyperNodeName string, job *api.JobInfo, hyperNodes map[string][]*api.NodeInfo) float64 {
// Calculate tasks num score.
taskNum := util.FindJobTaskNumOfHyperNode(hyperNodeName, job, hyperNodes)
taskNumScore := ZeroScore
if len(job.Tasks) > 0 {
taskNumScore = TaskBaseScore * scoreHyperNodeWithTaskNum(taskNum, len(job.Tasks))
}
return taskNumScore
}

func scoreHyperNodeWithIndex(index int, minIndex int, maxIndex int) float64 {
// Use tier index to calculate scores and map the original score to the range between 0 and 1.
return float64(maxIndex-index) / float64(maxIndex-minIndex)
}

func scoreHyperNodeWithTaskNum(taskNum int, maxTaskNum int) float64 {
// Calculate task distribution rate as score and make sure the original score to the range between 0 and 1.
return float64(taskNum) / float64(maxTaskNum)
}
Loading