From 889afed2d9f964c754c7937e15ff9d4516395f69 Mon Sep 17 00:00:00 2001 From: ecosysbin <14729934+ecosysbin@user.noreply.gitee.com> Date: Tue, 24 Dec 2024 20:18:29 +0800 Subject: [PATCH] Add network-topology-aware plugin and hyperNode score callback Signed-off-by: ecosysbin <14729934+ecosysbin@user.noreply.gitee.com> combine PodGroupStatus and PodGroupAnnotations Signed-off-by: ecosysbin <14729934+ecosysbin@user.noreply.gitee.com> rebase with pr:8704 Signed-off-by: ecosysbin <14729934+ecosysbin@user.noreply.gitee.com> rebase with pr:8704 Signed-off-by: ecosysbin <14729934+ecosysbin@user.noreply.gitee.com> update networktopologyaware score Signed-off-by: ecosysbin <14729934+ecosysbin@user.noreply.gitee.com> --- go.mod | 1 - go.sum | 4 +- pkg/scheduler/actions/allocate/allocate.go | 45 +- pkg/scheduler/api/node_info.go | 10 + pkg/scheduler/api/well_known_labels.go | 3 + pkg/scheduler/cache/cache.go | 6 +- pkg/scheduler/conf/scheduler_conf.go | 2 + pkg/scheduler/framework/job_updater.go | 6 +- pkg/scheduler/framework/session.go | 14 +- .../networktopologyaware.go | 217 +++++++ .../networktopologyaware_test.go | 580 ++++++++++++++++++ pkg/scheduler/uthelper/helper.go | 8 - pkg/scheduler/util/scheduler_helper.go | 115 +++- pkg/scheduler/util/scheduler_helper_test.go | 162 ++++- 14 files changed, 1096 insertions(+), 77 deletions(-) create mode 100644 pkg/scheduler/plugins/networktopologyaware/networktopologyaware.go create mode 100644 pkg/scheduler/plugins/networktopologyaware/networktopologyaware_test.go diff --git a/go.mod b/go.mod index 8f6f6ac902..af4d2e14d9 100644 --- a/go.mod +++ b/go.mod @@ -190,5 +190,4 @@ replace ( k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.31.1 k8s.io/sample-cli-plugin => k8s.io/sample-cli-plugin v0.31.1 k8s.io/sample-controller => k8s.io/sample-controller v0.31.1 - volcano.sh/apis => github.com/Monokaix/apis v0.0.0-20241205070443-2cc3b54d83c7 ) diff --git a/go.sum b/go.sum index 42f182df38..7791857a2e 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Microsoft/go-winio v0.6.0 h1:slsWYD/zyx7lCXoZVlvQrj0hPTM1HI4+v1sIda2yDvg= github.com/Microsoft/go-winio v0.6.0/go.mod h1:cTAf44im0RAYeL23bpB+fzCyDH2MJiz2BO69KH/soAE= -github.com/Monokaix/apis v0.0.0-20241205070443-2cc3b54d83c7 h1:Fb5bCm+ZygQ4Rrqt9x+uTG60kVlm41prlOpWwuWyHA4= -github.com/Monokaix/apis v0.0.0-20241205070443-2cc3b54d83c7/go.mod h1:XHIjTlHDMZTLRg2Y2JAkj85iP0iiet2tv+HfPQZrsHs= github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I= github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= github.com/agiledragon/gomonkey/v2 v2.11.0 h1:5oxSgA+tC1xuGsrIorR+sYiziYltmJyEZ9qA25b6l5U= @@ -512,3 +510,5 @@ sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c= stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0= +volcano.sh/apis v1.10.0-alpha.0.0.20241218081838-e5d361b6bfbe h1:iHd1Xt36a7S47IFksuF0h9W9J4LKzhBEz0C9XbkBvB8= +volcano.sh/apis v1.10.0-alpha.0.0.20241218081838-e5d361b6bfbe/go.mod h1:XHIjTlHDMZTLRg2Y2JAkj85iP0iiet2tv+HfPQZrsHs= diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 0c344bc523..eed8c46dc6 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -233,6 +233,8 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu hyperNodesWithLeftTasks := make(map[string]*util.PriorityQueue) ssn := alloc.session selectedTier := 0 + jobNewHyperNodeMap := map[string]string{} + jobHyperNode := job.PodGroup.Annotations[api.TopologyAllocateLCAHyperNode] // Find a suitable hyperNode in one tier from down to top everytime to ensure that the selected hyperNode spans the least tier. for index, tier := range alloc.hyperNodesTiers { @@ -245,6 +247,14 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu break } for _, hyperNodeName := range ssn.HyperNodesListByTier[tier] { + if jobHyperNode == "" { + // job first scheduler + jobNewHyperNodeMap[hyperNodeName] = hyperNodeName + } else { + jobNewHyperNode, _ := util.FindLCAHyperNode(hyperNodeName, jobHyperNode, nil) + jobNewHyperNodeMap[hyperNodeName] = jobNewHyperNode + } + nodes, ok := ssn.HyperNodes[hyperNodeName] if !ok { klog.ErrorS(nil, "HyperNode not exists.", "jobName", job.UID, "name", hyperNodeName, "tier", tier) @@ -286,6 +296,8 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu klog.V(4).InfoS("Find available hyperNodes for job", "jobName", job.UID, "tier", selectedTier, "hyperNodes", hyperNodes) } stmt, hyperNode := alloc.selectBestHyperNode(jobStmtsByTier[selectedTier], job) + jobNewHyperNode := jobNewHyperNodeMap[hyperNode] + job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] = jobNewHyperNode return stmt, hyperNodesWithLeftTasks[hyperNode] } @@ -480,6 +492,7 @@ func (alloc *Action) allocateResourcesForTask(stmt *framework.Statement, task *a if task.InitResreq.LessEqual(node.Idle, api.Zero) { klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name) if err := stmt.Allocate(task, node); err != nil { + klog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v", task.UID, node.Name, alloc.session.UID, err) if rollbackErr := stmt.UnAllocate(task); rollbackErr != nil { klog.Errorf("Failed to unallocate Task %v on %v in Session %v for %v.", @@ -492,38 +505,6 @@ func (alloc *Action) allocateResourcesForTask(stmt *framework.Statement, task *a return } - klog.V(3).Infof("Predicates failed in allocate for task <%s/%s> on node <%s> with limited resources", - task.Namespace, task.Name, node.Name) - - // Allocate releasing resource to the task if any. - if task.InitResreq.LessEqual(node.FutureIdle(), api.Zero) { - klog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>", - task.Namespace, task.Name, node.Name, task.InitResreq, node.Releasing) - if err := stmt.Pipeline(task, node.Name, false); err != nil { - klog.Errorf("Failed to pipeline Task %v on %v in Session %v for %v.", - task.UID, node.Name, alloc.session.UID, err) - } else { - metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time)) - metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now()) -======= - if ssn.JobReady(job) { - klog.V(3).InfoS("Job ready, return statement", "jobName", job.UID) - return stmt - } else { - if !ssn.JobPipelined(job) { - // Allocate idle resource to the task. - if task.InitResreq.LessEqual(node.Idle, api.Zero) { - klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name) - if err := stmt.Allocate(task, node); err != nil { - klog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v", - task.UID, node.Name, alloc.session.UID, err) - } else { - metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time)) - metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now()) - } - return - } - klog.V(3).Infof("Predicates failed in allocate for task <%s/%s> on node <%s> with limited resources", task.Namespace, task.Name, node.Name) diff --git a/pkg/scheduler/api/node_info.go b/pkg/scheduler/api/node_info.go index f1fdbdfc32..1dd49520b6 100644 --- a/pkg/scheduler/api/node_info.go +++ b/pkg/scheduler/api/node_info.go @@ -25,6 +25,7 @@ import ( "k8s.io/klog/v2" k8sframework "k8s.io/kubernetes/pkg/scheduler/framework" + "volcano.sh/apis/pkg/apis/scheduling" "volcano.sh/apis/pkg/apis/scheduling/v1beta1" "volcano.sh/volcano/pkg/scheduler/api/devices/nvidia/gpushare" @@ -90,6 +91,15 @@ type NodeInfo struct { ImageStates map[string]*k8sframework.ImageStateSummary } +// Recored podgroup cache +type PodGroupCache struct { + // podGroupStatus cache podgroup status during schedule + // This should not be mutated after initiated + PodGroupStatus map[JobID]scheduling.PodGroupStatus + // recored old annotations for podgroup, used to detect changes + PodGroupAnnotations map[JobID]map[string]string +} + // FutureIdle returns resources that will be idle in the future: // // That is current idle resources plus released resources minus pipelined resources. diff --git a/pkg/scheduler/api/well_known_labels.go b/pkg/scheduler/api/well_known_labels.go index 2e1db71c74..5801c5b6ce 100644 --- a/pkg/scheduler/api/well_known_labels.go +++ b/pkg/scheduler/api/well_known_labels.go @@ -42,4 +42,7 @@ const ( // topologyDecisionAnnotation is the key of topology decision about pod request resource topologyDecisionAnnotation = "volcano.sh/topology-decision" + + // TopologyAllocateLCAHyperNode is the key to the lowest common ancestor of the network topology to which the tasks assigned to a job belong. + TopologyAllocateLCAHyperNode = "volcano.sh/allocate-lca-hypernode" ) diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 6ba27cc3a8..4e340647ad 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -64,6 +64,7 @@ import ( vcinformerv1 "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1" "volcano.sh/volcano/cmd/scheduler/app/options" "volcano.sh/volcano/pkg/features" + "volcano.sh/volcano/pkg/scheduler/api" schedulingapi "volcano.sh/volcano/pkg/scheduler/api" volumescheduling "volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding" "volcano.sh/volcano/pkg/scheduler/metrics" @@ -1536,9 +1537,12 @@ func (sc *SchedulerCache) UpdateJobStatus(job *schedulingapi.JobInfo, updatePG b if err != nil { return nil, err } + sc.Mutex.Lock() + sc.Jobs[job.UID].PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] = job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] + sc.Mutex.Unlock() job.PodGroup = pg } - + sc.Jobs[job.UID] = job sc.RecordJobStatusEvent(job, updatePG) return job, nil diff --git a/pkg/scheduler/conf/scheduler_conf.go b/pkg/scheduler/conf/scheduler_conf.go index e911e0b69e..182501ed8f 100644 --- a/pkg/scheduler/conf/scheduler_conf.go +++ b/pkg/scheduler/conf/scheduler_conf.go @@ -87,6 +87,8 @@ type PluginOption struct { EnabledOverused *bool `yaml:"enabledOverused"` // EnabledAllocatable defines whether allocatable is enabled EnabledAllocatable *bool `yaml:"enabledAllocatable"` + // EnabledNetworkTopology defines whether network topology is enabled + EnabledNetworkTopology *bool `yaml:"enabledNetworkTopology"` // Arguments defines the different arguments that can be given to different plugins Arguments map[string]interface{} `yaml:"arguments"` } diff --git a/pkg/scheduler/framework/job_updater.go b/pkg/scheduler/framework/job_updater.go index 80ba52987d..dd535e81cc 100644 --- a/pkg/scheduler/framework/job_updater.go +++ b/pkg/scheduler/framework/job_updater.go @@ -91,10 +91,12 @@ func isPodGroupStatusUpdated(newStatus, oldStatus scheduling.PodGroupStatus) boo func (ju *jobUpdater) updateJob(index int) { job := ju.jobQueue[index] ssn := ju.ssn + oldHyperNode := ssn.PodGroupAnnotations[job.UID][api.TopologyAllocateLCAHyperNode] job.PodGroup.Status = jobStatus(ssn, job) - oldStatus, found := ssn.podGroupStatus[job.UID] - updatePG := !found || isPodGroupStatusUpdated(job.PodGroup.Status, oldStatus) + oldStatus, found := ssn.PodGroupStatus[job.UID] + updatePG := !found || isPodGroupStatusUpdated(job.PodGroup.Status, oldStatus) || oldHyperNode != job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] + if _, err := ssn.cache.UpdateJobStatus(job, updatePG); err != nil { klog.Errorf("Failed to update job <%s/%s>: %v", job.Namespace, job.Name, err) diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index ebacefb696..f787e7224a 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -56,9 +56,8 @@ type Session struct { TotalResource *api.Resource TotalGuarantee *api.Resource - // podGroupStatus cache podgroup status during schedule - // This should not be mutated after initiated - podGroupStatus map[api.JobID]scheduling.PodGroupStatus + + api.PodGroupCache Jobs map[api.JobID]*api.JobInfo Nodes map[string]*api.NodeInfo @@ -127,8 +126,10 @@ func openSession(cache cache.Cache) *Session { TotalResource: api.EmptyResource(), TotalGuarantee: api.EmptyResource(), - podGroupStatus: map[api.JobID]scheduling.PodGroupStatus{}, - + PodGroupCache: api.PodGroupCache{ + PodGroupStatus: map[api.JobID]scheduling.PodGroupStatus{}, + PodGroupAnnotations: map[api.JobID]map[string]string{}, + }, Jobs: map[api.JobID]*api.JobInfo{}, Nodes: map[string]*api.NodeInfo{}, CSINodesStatus: map[string]*api.CSINodeStatusInfo{}, @@ -170,7 +171,8 @@ func openSession(cache cache.Cache) *Session { ssn.Jobs = snapshot.Jobs for _, job := range ssn.Jobs { if job.PodGroup != nil { - ssn.podGroupStatus[job.UID] = *job.PodGroup.Status.DeepCopy() + ssn.PodGroupStatus[job.UID] = *job.PodGroup.Status.DeepCopy() + ssn.PodGroupAnnotations[job.UID] = job.PodGroup.GetAnnotations() } if vjr := ssn.JobValid(job); vjr != nil { diff --git a/pkg/scheduler/plugins/networktopologyaware/networktopologyaware.go b/pkg/scheduler/plugins/networktopologyaware/networktopologyaware.go new file mode 100644 index 0000000000..29348cf821 --- /dev/null +++ b/pkg/scheduler/plugins/networktopologyaware/networktopologyaware.go @@ -0,0 +1,217 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package networktopologyaware + +import ( + "math" + + "k8s.io/klog/v2" + + "volcano.sh/volcano/pkg/scheduler/api" + "volcano.sh/volcano/pkg/scheduler/framework" + "volcano.sh/volcano/pkg/scheduler/util" +) + +const ( + // PluginName indicates name of volcano scheduler plugin. + PluginName = "networktopologyaware" + BaseScore = 100 + TaskBaseScore = 10 + NetworkTopologyWeight = "weight" +) + +// HyperNodeTree is the hypernode tree of all hypernodes in the cluster. +// currentJobLCAHyperNode is the hypernode of the job's LCAHyperNode. +var ( + HyperNodeTree []map[string][]string +) + +type networkTopologyAwarePlugin struct { + // Arguments given for the plugin + pluginArguments framework.Arguments +} + +// New function returns prioritizePlugin object +func New(arguments framework.Arguments) framework.Plugin { + return &networkTopologyAwarePlugin{ + pluginArguments: arguments, + } +} + +func (nta *networkTopologyAwarePlugin) Name() string { + return PluginName +} + +func calculateWeight(args framework.Arguments) int { + weight := 1 + args.GetInt(&weight, NetworkTopologyWeight) + return weight +} + +func (nta *networkTopologyAwarePlugin) OnSessionOpen(ssn *framework.Session) { + klog.V(5).Infof("Enter networkTopologyAwarePlugin plugin ...") + defer func() { + klog.V(5).Infof("Leaving networkTopologyAware plugin ...") + }() + + weight := calculateWeight(nta.pluginArguments) + hyperNodeFn := func(job *api.JobInfo, hyperNodes map[string][]*api.NodeInfo) (map[string]float64, error) { + hyperNodeScores := make(map[string]float64) + jobHyperNode := job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] + // job fist first scheduler, All hyperNodes have the same score.. + if jobHyperNode == "" { + for hyperNode := range hyperNodes { + score := float64(weight * BaseScore) + hyperNodeScores[hyperNode] = score + } + return hyperNodeScores, nil + } + // job not first scheduler, calculate score based on hypernode tree. + maxScore := 0.0 + scoreHyperNode := map[float64][]string{} + for hyperNode := range hyperNodes { + score := networkTopologyAwareScore(hyperNode, job, HyperNodeTree) + score *= float64(weight) + hyperNodeScores[hyperNode] = score + if score >= maxScore { + maxScore = score + scoreHyperNode[score] = append(scoreHyperNode[score], hyperNode) + } + } + // calculate score based on task num if max score hyperNode has more than one. + if len(scoreHyperNode[maxScore]) > 1 { + for hyperNode, score := range hyperNodeScores { + if score == maxScore { + score := networkTopologyAwareScoreWithTaskNum(hyperNode, job, HyperNodeTree) + score *= float64(weight) + hyperNodeScores[hyperNode] = score + } + } + } + + klog.V(1).Infof("networkTopologyAware score is: %v", hyperNodeScores) + return hyperNodeScores, nil + } + + nodeFn := func(task *api.TaskInfo, nodes []*api.NodeInfo) (map[string]float64, error) { + nodeScores := make(map[string]float64) + + taskJob := ssn.Jobs[task.Job] + jobHyperNode := taskJob.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] + // job fist first scheduler, All node have the same score. + if jobHyperNode == "" { + for _, node := range nodes { + score := float64(weight * BaseScore) + nodeScores[node.Name] = score + } + return nodeScores, nil + } + // job not first scheduler, calculate score based on hypernode tree. + for _, node := range nodes { + hyperNode := util.FindHyperNodeOfNode(node.Name, HyperNodeTree) + score := networkTopologyAwareScore(hyperNode, taskJob, HyperNodeTree) + score *= float64(weight) + nodeScores[node.Name] = score + } + + maxScore := 0.0 + scoreHyperNode := map[float64][]string{} + for _, node := range nodes { + hyperNode := util.FindHyperNodeOfNode(node.Name, HyperNodeTree) + score := networkTopologyAwareScore(hyperNode, taskJob, HyperNodeTree) + score *= float64(weight) + nodeScores[node.Name] = score + if score >= maxScore { + maxScore = score + scoreHyperNode[score] = append(scoreHyperNode[score], hyperNode) + } + } + // calculate score based on task num if max score hyperNode has more than one. + if len(scoreHyperNode[maxScore]) > 1 { + for node, score := range nodeScores { + if score == maxScore { + score := networkTopologyAwareScoreWithTaskNum(node, taskJob, HyperNodeTree) + score *= float64(weight) + nodeScores[node] = score + } + } + } + + klog.V(1).Infof("networkTopologyAware score is: %v", nodeScores) + return nodeScores, nil + } + + ssn.AddHyperNodeOrederFn(nta.Name(), hyperNodeFn) + ssn.AddBatchNodeOrderFn(nta.Name(), nodeFn) +} + +func (bp *networkTopologyAwarePlugin) OnSessionClose(ssn *framework.Session) { +} + +// networkTopologyAwareScore use the best fit polices during scheduling. + +// Explanation: +// The currentJobLCAHyperNode is a property of job and that serves as the smallest root in the hypernode tree of job. +// A job has multiple tasks, each belonging to a hypernode. This LCAHyperNode is the topmost and lowest common ancestor among the hypernodes of all tasks within the job. + +// Goals: +// - The tier index to which the LCAHyperNode of a job belongs should be as low as possible. +// - Tasks under a job should be scheduled to one hyperNode as much as possible. +func networkTopologyAwareScore(hyperNodeName string, job *api.JobInfo, hyperNodeTree []map[string][]string) float64 { + jobHyperNode := job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] + // job fist first scheduler. + if jobHyperNode == "" { + return BaseScore + } + + if jobHyperNode == hyperNodeName { + return BaseScore + } + + // Calculate hyperNode tier index score. + _, index := util.FindLCAHyperNode(hyperNodeName, jobHyperNode, hyperNodeTree) + if index <= 0 { + klog.V(4).Infof("find LCAhyperNode failed wtih %s in hyperNodeTree", hyperNodeName) + return 0.0 + } + tierIndexScore := BaseScore * scoreHyperNodeWithIndex(index, 1, len(hyperNodeTree)) + + return tierIndexScore +} + +// Goals: +// - Tasks under a job should be scheduled to one hyperNode as much as possible. +func networkTopologyAwareScoreWithTaskNum(hyperNodeName string, job *api.JobInfo, hyperNodeTree []map[string][]string) float64 { + // Calculate tasks num score. + taskNum := util.FindJobTaskNumOfHyperNode(hyperNodeName, job, hyperNodeTree) + taskNumScore := 0.0 + if len(job.Tasks) > 0 { + taskNumScore = TaskBaseScore * scoreHyperNodeWithTaskNum(taskNum, len(job.Tasks)) + } + return taskNumScore +} + +func scoreHyperNodeWithIndex(index int, minIndex int, maxIndex int) float64 { + // Use logarithmic operations to calculate scores and map the original values to the range between 0 and 1. + // Adjust the formula to ensure that the scores meet the requirements (excluding 0 and 1). + return (math.Log(float64(maxIndex)) - math.Log(float64(index))) / (math.Log(float64(maxIndex)) - math.Log(float64(minIndex))) +} + +func scoreHyperNodeWithTaskNum(taskNum int, maxTaskNum int) float64 { + // Calculate task distribution rate as score and make sure the score to the range between 0 and 1. + return float64(taskNum) / float64(maxTaskNum) +} diff --git a/pkg/scheduler/plugins/networktopologyaware/networktopologyaware_test.go b/pkg/scheduler/plugins/networktopologyaware/networktopologyaware_test.go new file mode 100644 index 0000000000..985e41342b --- /dev/null +++ b/pkg/scheduler/plugins/networktopologyaware/networktopologyaware_test.go @@ -0,0 +1,580 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package networktopologyaware + +import ( + "math" + "testing" + + "volcano.sh/volcano/pkg/scheduler/api" + "volcano.sh/volcano/pkg/scheduler/conf" + "volcano.sh/volcano/pkg/scheduler/framework" + "volcano.sh/volcano/pkg/scheduler/uthelper" +) + +const ( + eps = 1e-1 +) + +func TestArguments(t *testing.T) { + framework.RegisterPluginBuilder(PluginName, New) + defer framework.CleanupPluginBuilders() + + arguments := framework.Arguments{ + "weight": 2, + } + + builder, ok := framework.GetPluginBuilder(PluginName) + if !ok { + t.Fatalf("should have plugin named %s", PluginName) + } + + plugin := builder(arguments) + networkTopologyAware, ok := plugin.(*networkTopologyAwarePlugin) + if !ok { + t.Fatalf("plugin should be %T, but not %T", networkTopologyAware, plugin) + } + weight := calculateWeight(networkTopologyAware.pluginArguments) + if weight != 2 { + t.Errorf("weight should be 2, but get %v", weight) + } +} + +func TestNetworkTopologyAwareHyperNodeScore(t *testing.T) { + tests := []struct { + name string + uthelper.TestCommonStruct + arguments framework.Arguments + hyperNodes map[string][]*api.NodeInfo + hyperNodeTree []map[string][]string + jobHyperNode string + tasks map[string]string + expected map[string]float64 + }{ + // test case 1:Job first scheduler when the `LCAHyperNode` of the job is empty and we are looking for the Lowest Common Ancestor (LCA) of job with the hyperNode, + // now the LCA hyperNode is the hyperNode self, so it is expected to return 100.0 for the score of the hyperNode. + { + name: "Job LCAHyperNode empty for leaf node, hyperNode score should be 100.0", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 1, + }, + hyperNodes: map[string][]*api.NodeInfo{ + "hyperNode3": nil, + "hyperNode4": nil, + "hyperNode5": nil, + }, + jobHyperNode: "", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{"node1", "node2"}, + "hyperNode4": []string{"node3", "node4"}, + "hyperNode5": []string{"node5", "node6"}, + "hyperNode6": []string{"node7", "node8"}, + }, + }, + expected: map[string]float64{ + "hyperNode3": 100.0, + "hyperNode4": 100.0, + "hyperNode5": 100.0, + }, + }, + // test case 2:Job is not first scheduler when the `LCAHyperNode` of the job is not empty and the currentLCAHyperNode hyperNode3, + // for the hyperNode3, it is equls to the LCAHyperNode, it is the best choice for the job, so it is expected to return 100.0 for the score. + // for the hyperNode4, find the LCA hyperNode is the hyperNode1 of tier index 2, it is a not good choice. according to calculate to return 36.9 for the score. + // for the hyperNode5, find the LCA hyperNode is the hyperNode0 of tier index 3, it is a worst choice. so it is expected to return 0.0 for the score. + { + name: "Normal LCA hyperNode to score for hyperNodes", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 1, + }, + hyperNodes: map[string][]*api.NodeInfo{ + "hyperNode3": nil, + "hyperNode4": nil, + "hyperNode5": nil, + }, + jobHyperNode: "hyperNode3", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{"node1", "node2"}, + "hyperNode4": []string{"node3", "node4"}, + "hyperNode5": []string{"node5", "node6"}, + "hyperNode6": []string{"node7", "node8"}, + }, + }, + expected: map[string]float64{ + "hyperNode3": 100.0, + "hyperNode4": 36.9, + "hyperNode5": 0.0, + }, + }, + { + // test case 3:Job is not first scheduler and the plugin weight is 2. + name: "Normal LCA hyperNode to score for hyperNodes with weight 2", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 2, + }, + hyperNodes: map[string][]*api.NodeInfo{ + "hyperNode3": nil, + "hyperNode4": nil, + "hyperNode5": nil, + }, + jobHyperNode: "hyperNode3", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{"node1", "node2"}, + "hyperNode4": []string{"node3", "node4"}, + "hyperNode5": []string{"node5", "node6"}, + "hyperNode6": []string{"node7", "node8"}, + }, + }, + expected: map[string]float64{ + "hyperNode3": 200.0, + "hyperNode4": 73.8, + "hyperNode5": 0.0, + }, + }, + { + name: "HyperNode has same LCAhyperNode for the job and the hyperNode will has the same score", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 1, + }, + hyperNodes: map[string][]*api.NodeInfo{ + "hyperNode3": nil, + "hyperNode4": nil, + "hyperNode5": nil, + }, + jobHyperNode: "hyperNode1", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{"node1", "node2"}, + "hyperNode4": []string{"node3", "node4"}, + "hyperNode5": []string{"node5", "node6"}, + "hyperNode6": []string{"node7", "node8"}, + }, + }, + expected: map[string]float64{ + "hyperNode3": 36.9, + "hyperNode4": 36.9, + "hyperNode5": 0.0, + }, + }, + { + name: "HyperNode has same LCAhyperNode for the job and the hyperNode will has the same score", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 1, + }, + hyperNodes: map[string][]*api.NodeInfo{ + "hyperNode3": nil, + "hyperNode4": nil, + "hyperNode5": nil, + }, + jobHyperNode: "hyperNode1", + tasks: map[string]string{ + "task1": "node1", + "task2": "node2", + "task3": "node3", + "test4": "", + }, + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{"node1", "node2"}, + "hyperNode4": []string{"node3", "node4"}, + "hyperNode5": []string{"node5", "node6"}, + "hyperNode6": []string{"node7", "node8"}, + }, + }, + expected: map[string]float64{ + "hyperNode3": 41.9, + "hyperNode4": 39.4, + "hyperNode5": 0.0, + }, + }, + } + + trueValue := true + for i, test := range tests { + tiers := []conf.Tier{ + { + Plugins: []conf.PluginOption{ + { + Name: PluginName, + EnabledNetworkTopology: &trueValue, + Arguments: test.arguments, + }, + }, + }, + } + ssn := test.RegisterSession(tiers, nil) + ssn.HyperNodes = test.hyperNodes + // mock for test + HyperNodeTree = test.hyperNodeTree + job := &api.JobInfo{ + Name: "test-job", + PodGroup: &api.PodGroup{}, + } + job.PodGroup.SetAnnotations(map[string]string{api.TopologyAllocateLCAHyperNode: test.jobHyperNode}) + job.Tasks = make(map[api.TaskID]*api.TaskInfo) + for name, node := range test.tasks { + taskInfo := &api.TaskInfo{ + UID: api.TaskID(name), + Name: name, + Job: job.UID, + } + job.Tasks[taskInfo.UID] = taskInfo + taskInfo.NodeName = node + } + scores, err := ssn.HyperNodeOrderMapFn(job, ssn.HyperNodes) + if err != nil { + t.Errorf("case%d: task %s has err %v", i, test.Name, err) + continue + } + hyperNodesScore := scores[PluginName] + for hypernode, expected := range test.expected { + if math.Abs(hyperNodesScore[hypernode]-expected) > eps { + t.Errorf("case%d: task %s on hypernode %s expect have score %v, but get %v", i+1, test.name, hypernode, expected, hyperNodesScore[hypernode]) + } + } + } +} + +func TestNetworkTopologyAwareNodeScore(t *testing.T) { + tests := []struct { + name string + uthelper.TestCommonStruct + arguments framework.Arguments + nodes []*api.NodeInfo + hyperNodeTree []map[string][]string + jobHyperNode string + tasks map[string]string + expected map[string]float64 + }{ + { + name: "job first scheduler when the `LCAHyperNode` of the job is empty when node score is 100.0", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 1, + }, + nodes: []*api.NodeInfo{ + { + Name: "node1", + }, + { + Name: "node3", + }, + { + Name: "node5", + }, + }, + jobHyperNode: "", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{"node1", "node2"}, + "hyperNode4": []string{"node3", "node4"}, + "hyperNode5": []string{"node5", "node6"}, + "hyperNode6": []string{"node7", "node8"}, + }, + }, + expected: map[string]float64{ + "node1": 100.0, + "node3": 100.0, + "node5": 100.0, + }, + }, + { + name: "Normal LCA hyperNode to score for nodes", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 1, + }, + nodes: []*api.NodeInfo{ + { + Name: "node1", + }, + { + Name: "node3", + }, + { + Name: "node5", + }, + }, + jobHyperNode: "hyperNode3", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{"node1", "node2"}, + "hyperNode4": []string{"node3", "node4"}, + "hyperNode5": []string{"node5", "node6"}, + "hyperNode6": []string{"node7", "node8"}, + }, + }, + expected: map[string]float64{ + "node1": 100.0, + "node3": 36.9, + "node5": 0.0, + }, + }, + { + name: "Normal LCA hyperNode to score for nodes with weight 2", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 2, + }, + nodes: []*api.NodeInfo{ + { + Name: "node1", + }, + { + Name: "node3", + }, + { + Name: "node5", + }, + }, + jobHyperNode: "hyperNode3", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{"node1", "node2"}, + "hyperNode4": []string{"node3", "node4"}, + "hyperNode5": []string{"node5", "node6"}, + "hyperNode6": []string{"node7", "node8"}, + }, + }, + expected: map[string]float64{ + "node1": 200.0, + "node3": 73.8, + "node5": 0.0, + }, + }, + { + name: "If the calculated LCAHyperNode is the same, the scores will be the same.", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 1, + }, + nodes: []*api.NodeInfo{ + { + Name: "node1", + }, + { + Name: "node3", + }, + { + Name: "node5", + }, + }, + jobHyperNode: "hyperNode1", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{"node1", "node2"}, + "hyperNode4": []string{"node3", "node4"}, + "hyperNode5": []string{"node5", "node6"}, + "hyperNode6": []string{"node7", "node8"}, + }, + }, + expected: map[string]float64{ + "node1": 36.9, + "node3": 36.9, + "node5": 0.0, + }, + }, + { + name: "If the calculated LCAHyperNode is the same, but the hyperNode has more tasks running, the score will be higher.", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 1, + }, + nodes: []*api.NodeInfo{ + { + Name: "node1", + }, + { + Name: "node3", + }, + { + Name: "node5", + }, + }, + jobHyperNode: "hyperNode1", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{"node1", "node2"}, + "hyperNode4": []string{"node3", "node4"}, + "hyperNode5": []string{"node5", "node6"}, + "hyperNode6": []string{"node7", "node8"}, + }, + }, + tasks: map[string]string{ + "task1": "node1", + "task2": "node2", + "task3": "node3", + "test4": "", + }, + expected: map[string]float64{ + "node1": 41.9, + "node3": 39.4, + "node5": 0.0, + }, + }, + } + + trueValue := true + for i, test := range tests { + tiers := []conf.Tier{ + { + Plugins: []conf.PluginOption{ + { + Name: PluginName, + EnabledNodeOrder: &trueValue, + Arguments: test.arguments, + }, + }, + }, + } + ssn := test.RegisterSession(tiers, nil) + // mock for test + HyperNodeTree = test.hyperNodeTree + // mock job + job := &api.JobInfo{ + UID: "test-job", + Name: "test-job", + PodGroup: &api.PodGroup{}, + } + job.PodGroup.SetAnnotations(map[string]string{api.TopologyAllocateLCAHyperNode: test.jobHyperNode}) + ssn.Jobs = map[api.JobID]*api.JobInfo{ + job.UID: job, + } + job.Tasks = make(map[api.TaskID]*api.TaskInfo) + for name, node := range test.tasks { + taskInfo := &api.TaskInfo{ + UID: api.TaskID(name), + Name: name, + Job: job.UID, + } + job.Tasks[taskInfo.UID] = taskInfo + taskInfo.NodeName = node + } + // mock task + task := &api.TaskInfo{ + Name: "test4", + Job: job.UID, + } + scores, err := ssn.BatchNodeOrderFn(task, test.nodes) + if err != nil { + t.Errorf("case%d: task %s has err %v", i, test.Name, err) + continue + } + for node, expected := range test.expected { + if math.Abs(scores[node]-expected) > eps { + t.Errorf("case%d: task %s on node %s expect have score %v, but get %v", i+1, test.name, node, scores[node], expected) + } + } + } +} diff --git a/pkg/scheduler/uthelper/helper.go b/pkg/scheduler/uthelper/helper.go index a534c0f465..61a2e72d1e 100644 --- a/pkg/scheduler/uthelper/helper.go +++ b/pkg/scheduler/uthelper/helper.go @@ -189,14 +189,6 @@ func (test *TestCommonStruct) CheckBind(caseIndex int) error { } } - if test.MinimalBindCheck { - return nil - } - - if test.ExpectBindsNum != len(test.ExpectBindMap) { - return fmt.Errorf("invalid setting for binding check: want bind count %d, want bind result length %d", test.ExpectBindsNum, len(test.ExpectBindMap)) - } - // in case expected test.BindsNum is 0, but actually there is a binding and wait the binding goroutine to run select { case <-time.After(50 * time.Millisecond): diff --git a/pkg/scheduler/util/scheduler_helper.go b/pkg/scheduler/util/scheduler_helper.go index 4ef4ce8914..9a1c128f4d 100644 --- a/pkg/scheduler/util/scheduler_helper.go +++ b/pkg/scheduler/util/scheduler_helper.go @@ -137,29 +137,31 @@ func PrioritizeHyperNodes(candidateHyperNodes map[string][]*api.NodeInfo, nodeSc if err != nil { return nil, err } - // plugin scores of hyperNode. for pluginName, scores := range mapScores { for hyperNode, score := range scores { klog.V(5).InfoS("Add plugin score at hypeNode", "jobName", job.UID, "pluginName", pluginName, "hyperNodeName", hyperNode, "score", score) hyperNodesScoreMap[hyperNode] += score + } } - // accumulate node scores in NodeOrder and hyperNode score itself as the final score of each hyperNode. for hyperNodeName, score := range nodeScoresInHyperNode { klog.V(5).InfoS("Add node level scores to final hyperNode score", "jobName", job.UID, "hyperNodeName", hyperNodeName, "score", score) hyperNodesScoreMap[hyperNodeName] += score } - + hyperNodeScores := make(map[float64][]string) + hyperNodeScoreMap := make(map[string]float64) + for hyperNodeName := range candidateHyperNodes { + // If no plugin is applied to this node, the default is 0.0 + score := 0.0 + if value, ok := hyperNodesScoreMap[hyperNodeName]; ok { score += value } hyperNodeScores[score] = append(hyperNodeScores[score], hyperNodeName) - if klog.V(5).Enabled() { hyperNodeScoreMap[hyperNodeName] = score } } - klog.V(5).InfoS("Prioritize hyperNode score map for job", "jobName", job.UID, "scoreMap", hyperNodeScoreMap) return hyperNodeScores, nil } @@ -215,24 +217,6 @@ func SelectBestHyperNode(hyperNodeScores map[float64][]string) string { return bestHyperNodes[rand.Intn(len(bestHyperNodes))] } -// SelectBestHyperNode return the best hyperNode name whose score is highest, pick one randomly if there are many hyperNodes with same score. -func SelectBestHyperNode(hyperNodeScores map[float64][]string) string { - var bestHyperNodes []string - maxScore := -1.0 - for score, hyperNodes := range hyperNodeScores { - if score > maxScore { - maxScore = score - bestHyperNodes = hyperNodes - } - } - - if len(bestHyperNodes) == 0 { - return "" - } - - return bestHyperNodes[rand.Intn(len(bestHyperNodes))] -} - // GetNodeList returns values of the map 'nodes' func GetNodeList(nodes map[string]*api.NodeInfo, nodeList []string) []*api.NodeInfo { result := make([]*api.NodeInfo, 0, len(nodeList)) @@ -306,3 +290,88 @@ func ConvertRes2ResList(res *api.Resource) v1.ResourceList { } return rl } + +// Find the hyperNode to which the node belongs. +func FindHyperNodeOfNode(nodeName string, hyperNodeTree []map[string][]string) string { + for hyperNode, nodes := range hyperNodeTree[len(hyperNodeTree)-1] { + for _, node := range nodes { + if node == nodeName { + return hyperNode + } + } + } + return "" +} + +func FindJobTaskNumOfHyperNode(hyperNodeName string, job *api.JobInfo, hyperNodeTree []map[string][]string) int { + revertHyperNodeTree := make([]map[string][]string, len(hyperNodeTree)) + for i := len(hyperNodeTree) - 1; i >= 0; i-- { + revertHyperNodeTree[len(hyperNodeTree)-1-i] = hyperNodeTree[i] + } + + hyperNodesMap := make(map[string]sets.Set[string]) + for i := 0; i < len(revertHyperNodeTree); i++ { + for name, children := range revertHyperNodeTree[i] { + hyperNodesMap[name] = sets.Set[string]{} + hyperNodesMap[name].Insert(name) + for _, child := range children { + hyperNodesMap[name].Insert(child) + if v, ok := hyperNodesMap[child]; ok { + hyperNodesMap[name] = hyperNodesMap[name].Union(v) + } + } + } + } + // find the hyperNodeMap of the hyperNodeName + hyperNodeMap := hyperNodesMap[hyperNodeName] + taskCount := 0 + for _, task := range job.Tasks { + if hyperNodeMap.Has(task.NodeName) { + taskCount++ + } + } + return taskCount +} + +// FindOutRootHyperNode find out the root hypernode of the job when the hypernode join the job. +func FindLCAHyperNode(hyperNodeName string, jobHyperNode string, hyperNodeTree []map[string][]string) (string, int) { + revertHyperNodeTree := make([]map[string][]string, len(hyperNodeTree)) + for i := len(hyperNodeTree) - 1; i >= 0; i-- { + revertHyperNodeTree[len(hyperNodeTree)-1-i] = hyperNodeTree[i] + } + + hyperNodesMap := make(map[string]sets.Set[string]) + for i := 0; i < len(revertHyperNodeTree); i++ { + for name, children := range revertHyperNodeTree[i] { + hyperNodesMap[name] = sets.Set[string]{} + hyperNodesMap[name].Insert(name) + for _, child := range children { + hyperNodesMap[name].Insert(child) + if v, ok := hyperNodesMap[child]; ok { + hyperNodesMap[name] = hyperNodesMap[name].Union(v) + } + } + } + } + + hyperNodesListByTier := [][]string{} + for i := 0; i < len(revertHyperNodeTree); i++ { + hyperNodes := []string{} + for name := range revertHyperNodeTree[i] { + hyperNodes = append(hyperNodes, name) + } + hyperNodesListByTier = append(hyperNodesListByTier, hyperNodes) + } + + for index, tierHyperNodes := range hyperNodesListByTier { + for _, hyperNode := range tierHyperNodes { + hyperNodeSet := hyperNodesMap[hyperNode] + if hyperNodeSet.Has(hyperNodeName) { + if jobHyperNode == "" || hyperNodeSet.Has(jobHyperNode) { + return hyperNode, index + 1 + } + } + } + } + return "", -1 +} diff --git a/pkg/scheduler/util/scheduler_helper_test.go b/pkg/scheduler/util/scheduler_helper_test.go index 3697384a62..dce5cbe84e 100644 --- a/pkg/scheduler/util/scheduler_helper_test.go +++ b/pkg/scheduler/util/scheduler_helper_test.go @@ -19,8 +19,6 @@ package util import ( "testing" - "k8s.io/apimachinery/pkg/util/sets" - "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/util/sets" @@ -242,3 +240,163 @@ func TestGetHyperNodeList(t *testing.T) { }) } } + +func TestFindLCAHyperNode(t *testing.T) { + testCases := []struct { + name string + hyperNodeName string + currentJobLCAHyperNode string + hyperNodeTree []map[string][]string + expectedNode string + expectedLevel int + }{ + // test case 1:When the `LCAHyperNode` of the job is empty and we are looking for the Lowest Common Ancestor (LCA) of a leaf node, it is expected to return the corresponding node and the level should be 1. + { + name: "Job LCAHyperNode empty for leaf node", + hyperNodeName: "hyperNode3", + currentJobLCAHyperNode: "", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{}, + "hyperNode4": []string{}, + "hyperNode5": []string{}, + "hyperNode6": []string{}, + }, + }, + expectedNode: "hyperNode3", + expectedLevel: 1, + }, + // Test case 2: When the `LCAHyperNode` of the job is the same as the name of the input hypernode, it is expected to return that hypernode and the level should be 1. + { + name: "Job LCAHyperNode equals input hyperNodeName", + hyperNodeName: "hyperNode0", + currentJobLCAHyperNode: "", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{}, + "hyperNode4": []string{}, + "hyperNode5": []string{}, + "hyperNode6": []string{}, + }, + }, + expectedNode: "hyperNode0", + expectedLevel: 3, + }, + // Test case 3: In the situation of normally looking for the Lowest Common Ancestor (LCA) hypernode of a non-leaf node, it is expected to return the correct LCA hypernode and its corresponding level. + { + name: "Normal LCA find for non-leaf node", + hyperNodeName: "hyperNode4", + currentJobLCAHyperNode: "", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{}, + "hyperNode4": []string{}, + "hyperNode5": []string{}, + "hyperNode6": []string{}, + }, + }, + expectedNode: "hyperNode0", + expectedLevel: 3, + }, + // test case4:Look for the Lowest Common Ancestor (LCA) hypernode of nodes on different branches, and it is expected to return the correct result and its corresponding level. + { + name: "LCA find for nodes in different branches", + hyperNodeName: "hyperNode5", + currentJobLCAHyperNode: "", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{}, + "hyperNode4": []string{}, + "hyperNode5": []string{}, + "hyperNode6": []string{}, + }, + }, + expectedNode: "hyperNode0", + expectedLevel: 3, + }, + // test case5:Look for the Lowest Common Ancestor (LCA) hypernode of nodes on different branches, and it is expected to return the correct result and its corresponding level. + { + name: "LCA find for nodes in different branches", + hyperNodeName: "hyperNode1", + currentJobLCAHyperNode: "", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{}, + "hyperNode4": []string{}, + "hyperNode5": []string{}, + "hyperNode6": []string{}, + }, + }, + expectedNode: "hyperNode0", + expectedLevel: 3, + }, + // test case6:In the case where the LCA hypernode is not found (when a non-existent node name is passed in), it is expected to return an empty string and -1. + { + name: "No LCA found for non-existent node", + hyperNodeName: "nonExistentNode", + currentJobLCAHyperNode: "", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{}, + "hyperNode4": []string{}, + "hyperNode5": []string{}, + "hyperNode6": []string{}, + }, + }, + expectedNode: "", + expectedLevel: -1, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + resultNode, resultLevel := FindLCAHyperNode(tc.hyperNodeName, tc.currentJobLCAHyperNode, tc.hyperNodeTree) + if resultNode != tc.expectedNode || resultLevel != tc.expectedLevel { + t.Errorf("Test case '%s' failed. Expected node: %s, level: %d. Got node: %s, level: %d", + tc.name, tc.expectedNode, tc.expectedLevel, resultNode, resultLevel) + } + }) + } +}