From e893b75e06a9fb698178eca8b80139571b33fb46 Mon Sep 17 00:00:00 2001 From: ecosysbin <14729934+ecosysbin@user.noreply.gitee.com> Date: Tue, 24 Dec 2024 20:18:29 +0800 Subject: [PATCH] Add network-topology-aware plugin and hyperNode score callback Signed-off-by: ecosysbin <14729934+ecosysbin@user.noreply.gitee.com> --- pkg/scheduler/actions/allocate/allocate.go | 9 + pkg/scheduler/api/well_known_labels.go | 3 + pkg/scheduler/cache/cache.go | 6 +- pkg/scheduler/conf/scheduler_conf.go | 2 + pkg/scheduler/framework/job_updater.go | 4 +- pkg/scheduler/framework/session.go | 22 +- .../networktopologyaware.go | 151 +++++ .../networktopologyaware_test.go | 580 ++++++++++++++++++ pkg/scheduler/util/scheduler_helper.go | 85 +++ pkg/scheduler/util/scheduler_helper_test.go | 163 ++++- 10 files changed, 1013 insertions(+), 12 deletions(-) create mode 100644 pkg/scheduler/plugins/networktopologyaware/networktopologyaware.go create mode 100644 pkg/scheduler/plugins/networktopologyaware/networktopologyaware_test.go diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 205b3fbf0b..5c95370647 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -233,6 +233,8 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu hyperNodesWithLeftTasks := make(map[string]*util.PriorityQueue) ssn := alloc.session selectedTier := 0 + jobNewHyperNodeMap := map[string]string{} + jobHyperNode := job.PodGroup.Annotations[api.TopologyAllocateLCAHyperNode] // Find a suitable hyperNode in one tier from down to top everytime to ensure that the selected hyperNode spans the least tier. for index, tier := range alloc.hyperNodesTiers { @@ -245,6 +247,9 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu break } for _, hyperNodeName := range ssn.HyperNodesListByTier[tier] { + jobNewHyperNode, _ := util.FindLCAHyperNode(hyperNodeName, jobHyperNode, nil) + jobNewHyperNodeMap[hyperNodeName] = jobNewHyperNode + nodes, ok := ssn.HyperNodes[hyperNodeName] if !ok { klog.ErrorS(nil, "HyperNode not exists.", "jobName", job.UID, "name", hyperNodeName, "tier", tier) @@ -286,6 +291,10 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu klog.V(4).InfoS("Find available hyperNodes for job", "jobName", job.UID, "tier", selectedTier, "hyperNodes", hyperNodes) } stmt, hyperNode := alloc.selectBestHyperNode(jobStmtsByTier[selectedTier], job) + + // set the LCA hyperNode for job,next scheduling will use this hyperNode as the base for selecting hyperNode. + jobNewHyperNode := jobNewHyperNodeMap[hyperNode] + job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] = jobNewHyperNode return stmt, hyperNodesWithLeftTasks[hyperNode] } diff --git a/pkg/scheduler/api/well_known_labels.go b/pkg/scheduler/api/well_known_labels.go index 2e1db71c74..5801c5b6ce 100644 --- a/pkg/scheduler/api/well_known_labels.go +++ b/pkg/scheduler/api/well_known_labels.go @@ -42,4 +42,7 @@ const ( // topologyDecisionAnnotation is the key of topology decision about pod request resource topologyDecisionAnnotation = "volcano.sh/topology-decision" + + // TopologyAllocateLCAHyperNode is the key to the lowest common ancestor of the network topology to which the tasks assigned to a job belong. + TopologyAllocateLCAHyperNode = "volcano.sh/allocate-lca-hypernode" ) diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 6ba27cc3a8..4e340647ad 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -64,6 +64,7 @@ import ( vcinformerv1 "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1" "volcano.sh/volcano/cmd/scheduler/app/options" "volcano.sh/volcano/pkg/features" + "volcano.sh/volcano/pkg/scheduler/api" schedulingapi "volcano.sh/volcano/pkg/scheduler/api" volumescheduling "volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding" "volcano.sh/volcano/pkg/scheduler/metrics" @@ -1536,9 +1537,12 @@ func (sc *SchedulerCache) UpdateJobStatus(job *schedulingapi.JobInfo, updatePG b if err != nil { return nil, err } + sc.Mutex.Lock() + sc.Jobs[job.UID].PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] = job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] + sc.Mutex.Unlock() job.PodGroup = pg } - + sc.Jobs[job.UID] = job sc.RecordJobStatusEvent(job, updatePG) return job, nil diff --git a/pkg/scheduler/conf/scheduler_conf.go b/pkg/scheduler/conf/scheduler_conf.go index e911e0b69e..182501ed8f 100644 --- a/pkg/scheduler/conf/scheduler_conf.go +++ b/pkg/scheduler/conf/scheduler_conf.go @@ -87,6 +87,8 @@ type PluginOption struct { EnabledOverused *bool `yaml:"enabledOverused"` // EnabledAllocatable defines whether allocatable is enabled EnabledAllocatable *bool `yaml:"enabledAllocatable"` + // EnabledNetworkTopology defines whether network topology is enabled + EnabledNetworkTopology *bool `yaml:"enabledNetworkTopology"` // Arguments defines the different arguments that can be given to different plugins Arguments map[string]interface{} `yaml:"arguments"` } diff --git a/pkg/scheduler/framework/job_updater.go b/pkg/scheduler/framework/job_updater.go index 80ba52987d..cb10c39822 100644 --- a/pkg/scheduler/framework/job_updater.go +++ b/pkg/scheduler/framework/job_updater.go @@ -91,10 +91,12 @@ func isPodGroupStatusUpdated(newStatus, oldStatus scheduling.PodGroupStatus) boo func (ju *jobUpdater) updateJob(index int) { job := ju.jobQueue[index] ssn := ju.ssn + oldHyperNode := ssn.podGroupAnnotations[job.UID][api.TopologyAllocateLCAHyperNode] job.PodGroup.Status = jobStatus(ssn, job) oldStatus, found := ssn.podGroupStatus[job.UID] - updatePG := !found || isPodGroupStatusUpdated(job.PodGroup.Status, oldStatus) + updatePG := !found || isPodGroupStatusUpdated(job.PodGroup.Status, oldStatus) || oldHyperNode != job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] + if _, err := ssn.cache.UpdateJobStatus(job, updatePG); err != nil { klog.Errorf("Failed to update job <%s/%s>: %v", job.Namespace, job.Name, err) diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index ebacefb696..1c929d04f5 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -60,6 +60,9 @@ type Session struct { // This should not be mutated after initiated podGroupStatus map[api.JobID]scheduling.PodGroupStatus + // recored old annotations for podgroup, used to detect changes + podGroupAnnotations map[api.JobID]map[string]string + Jobs map[api.JobID]*api.JobInfo Nodes map[string]*api.NodeInfo CSINodesStatus map[string]*api.CSINodeStatusInfo @@ -125,15 +128,15 @@ func openSession(cache cache.Cache) *Session { cache: cache, informerFactory: cache.SharedInformerFactory(), - TotalResource: api.EmptyResource(), - TotalGuarantee: api.EmptyResource(), - podGroupStatus: map[api.JobID]scheduling.PodGroupStatus{}, - - Jobs: map[api.JobID]*api.JobInfo{}, - Nodes: map[string]*api.NodeInfo{}, - CSINodesStatus: map[string]*api.CSINodeStatusInfo{}, - RevocableNodes: map[string]*api.NodeInfo{}, - Queues: map[api.QueueID]*api.QueueInfo{}, + TotalResource: api.EmptyResource(), + TotalGuarantee: api.EmptyResource(), + podGroupStatus: map[api.JobID]scheduling.PodGroupStatus{}, + podGroupAnnotations: map[api.JobID]map[string]string{}, + Jobs: map[api.JobID]*api.JobInfo{}, + Nodes: map[string]*api.NodeInfo{}, + CSINodesStatus: map[string]*api.CSINodeStatusInfo{}, + RevocableNodes: map[string]*api.NodeInfo{}, + Queues: map[api.QueueID]*api.QueueInfo{}, plugins: map[string]Plugin{}, jobOrderFns: map[string]api.CompareFn{}, @@ -171,6 +174,7 @@ func openSession(cache cache.Cache) *Session { for _, job := range ssn.Jobs { if job.PodGroup != nil { ssn.podGroupStatus[job.UID] = *job.PodGroup.Status.DeepCopy() + ssn.podGroupAnnotations[job.UID] = job.PodGroup.GetAnnotations() } if vjr := ssn.JobValid(job); vjr != nil { diff --git a/pkg/scheduler/plugins/networktopologyaware/networktopologyaware.go b/pkg/scheduler/plugins/networktopologyaware/networktopologyaware.go new file mode 100644 index 0000000000..b018800129 --- /dev/null +++ b/pkg/scheduler/plugins/networktopologyaware/networktopologyaware.go @@ -0,0 +1,151 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package networktopologyaware + +import ( + "math" + + "k8s.io/klog/v2" + + "volcano.sh/volcano/pkg/scheduler/api" + "volcano.sh/volcano/pkg/scheduler/framework" + "volcano.sh/volcano/pkg/scheduler/util" +) + +const ( + // PluginName indicates name of volcano scheduler plugin. + PluginName = "networktopologyaware" + BaseScore = 100 + TaskBaseScore = 10 + NetworkTopologyWeight = "weight" +) + +// HyperNodeTree is the hypernode tree of all hypernodes in the cluster. +// currentJobLCAHyperNode is the hypernode of the job's LCAHyperNode. +var ( + HyperNodeTree []map[string][]string +) + +type networkTopologyAwarePlugin struct { + // Arguments given for the plugin + pluginArguments framework.Arguments +} + +// New function returns prioritizePlugin object +func New(arguments framework.Arguments) framework.Plugin { + return &networkTopologyAwarePlugin{ + pluginArguments: arguments, + } +} + +func (nta *networkTopologyAwarePlugin) Name() string { + return PluginName +} + +func calculateWeight(args framework.Arguments) int { + weight := 1 + args.GetInt(&weight, NetworkTopologyWeight) + return weight +} + +func (nta *networkTopologyAwarePlugin) OnSessionOpen(ssn *framework.Session) { + klog.V(5).Infof("Enter networkTopologyAwarePlugin plugin ...") + defer func() { + klog.V(5).Infof("Leaving networkTopologyAware plugin ...") + }() + + weight := calculateWeight(nta.pluginArguments) + hyperNodeFn := func(job *api.JobInfo, hyperNodes map[string][]*api.NodeInfo) (map[string]float64, error) { + hyperNodeScores := make(map[string]float64) + for hyperNode := range hyperNodes { + score := networkTopologyAwareScore(hyperNode, job, HyperNodeTree) + score *= float64(weight) + hyperNodeScores[hyperNode] = score + } + + klog.V(1).Infof("networkTopologyAware score is: %v", hyperNodeScores) + return hyperNodeScores, nil + } + + nodeFn := func(task *api.TaskInfo, nodes []*api.NodeInfo) (map[string]float64, error) { + taskJob := ssn.Jobs[task.Job] + nodeScores := make(map[string]float64) + for _, node := range nodes { + hyperNode := util.FindHyperNodeOfNode(node.Name, HyperNodeTree) + score := networkTopologyAwareScore(hyperNode, taskJob, HyperNodeTree) + score *= float64(weight) + nodeScores[node.Name] = score + } + klog.V(1).Infof("networkTopologyAware score is: %v", nodeScores) + return nodeScores, nil + } + + ssn.AddHyperNodeOrederFn(nta.Name(), hyperNodeFn) + ssn.AddBatchNodeOrderFn(nta.Name(), nodeFn) +} + +func (bp *networkTopologyAwarePlugin) OnSessionClose(ssn *framework.Session) { +} + +// networkTopologyAwareScore use the best fit polices during scheduling. + +// Explanation: +// The currentJobLCAHyperNode is a property of job and that serves as the smallest root in the hypernode tree of job. +// A job has multiple tasks, each belonging to a hypernode. This LCAHyperNode is the topmost and lowest common ancestor among the hypernodes of all tasks within the job. + +// Goals: +// - The tier index to which the LCAHyperNode of a job belongs should be as low as possible. +// - Tasks under a job should be scheduled to one hyperNode as much as possible. +func networkTopologyAwareScore(hyperNodeName string, job *api.JobInfo, hyperNodeTree []map[string][]string) float64 { + jobHyperNode := job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] + // job fist first scheduler. + if jobHyperNode == "" { + return BaseScore + } + + if jobHyperNode == hyperNodeName { + return BaseScore + } + + // Calculate hyperNode tier index score. + _, index := util.FindLCAHyperNode(hyperNodeName, jobHyperNode, hyperNodeTree) + if index <= 0 { + klog.V(4).Infof("find LCAhyperNode failed wtih %s in hyperNodeTree", hyperNodeName) + return 0.0 + } + tierIndexScore := BaseScore * scoreHyperNodeWithIndex(index, 1, len(hyperNodeTree)) + + // Calculate tasks num score. + taskNum := util.FindJobTaskNumOfHyperNode(hyperNodeName, job, hyperNodeTree) + taskNumScore := 0.0 + if len(job.Tasks) > 0 { + taskNumScore = TaskBaseScore * scoreHyperNodeWithTaskNum(taskNum, len(job.Tasks)) + } + // aggregate scores + return tierIndexScore + taskNumScore +} + +func scoreHyperNodeWithIndex(index int, minIndex int, maxIndex int) float64 { + // Use logarithmic operations to calculate scores and map the original values to the range between 0 and 1. + // Adjust the formula to ensure that the scores meet the requirements (excluding 0 and 1). + return (math.Log(float64(maxIndex)) - math.Log(float64(index))) / (math.Log(float64(maxIndex)) - math.Log(float64(minIndex))) +} + +func scoreHyperNodeWithTaskNum(taskNum int, maxTaskNum int) float64 { + // Calculate task distribution rate as score and make sure the score to the range between 0 and 1. + return float64(taskNum) / float64(maxTaskNum) +} diff --git a/pkg/scheduler/plugins/networktopologyaware/networktopologyaware_test.go b/pkg/scheduler/plugins/networktopologyaware/networktopologyaware_test.go new file mode 100644 index 0000000000..985e41342b --- /dev/null +++ b/pkg/scheduler/plugins/networktopologyaware/networktopologyaware_test.go @@ -0,0 +1,580 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package networktopologyaware + +import ( + "math" + "testing" + + "volcano.sh/volcano/pkg/scheduler/api" + "volcano.sh/volcano/pkg/scheduler/conf" + "volcano.sh/volcano/pkg/scheduler/framework" + "volcano.sh/volcano/pkg/scheduler/uthelper" +) + +const ( + eps = 1e-1 +) + +func TestArguments(t *testing.T) { + framework.RegisterPluginBuilder(PluginName, New) + defer framework.CleanupPluginBuilders() + + arguments := framework.Arguments{ + "weight": 2, + } + + builder, ok := framework.GetPluginBuilder(PluginName) + if !ok { + t.Fatalf("should have plugin named %s", PluginName) + } + + plugin := builder(arguments) + networkTopologyAware, ok := plugin.(*networkTopologyAwarePlugin) + if !ok { + t.Fatalf("plugin should be %T, but not %T", networkTopologyAware, plugin) + } + weight := calculateWeight(networkTopologyAware.pluginArguments) + if weight != 2 { + t.Errorf("weight should be 2, but get %v", weight) + } +} + +func TestNetworkTopologyAwareHyperNodeScore(t *testing.T) { + tests := []struct { + name string + uthelper.TestCommonStruct + arguments framework.Arguments + hyperNodes map[string][]*api.NodeInfo + hyperNodeTree []map[string][]string + jobHyperNode string + tasks map[string]string + expected map[string]float64 + }{ + // test case 1:Job first scheduler when the `LCAHyperNode` of the job is empty and we are looking for the Lowest Common Ancestor (LCA) of job with the hyperNode, + // now the LCA hyperNode is the hyperNode self, so it is expected to return 100.0 for the score of the hyperNode. + { + name: "Job LCAHyperNode empty for leaf node, hyperNode score should be 100.0", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 1, + }, + hyperNodes: map[string][]*api.NodeInfo{ + "hyperNode3": nil, + "hyperNode4": nil, + "hyperNode5": nil, + }, + jobHyperNode: "", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{"node1", "node2"}, + "hyperNode4": []string{"node3", "node4"}, + "hyperNode5": []string{"node5", "node6"}, + "hyperNode6": []string{"node7", "node8"}, + }, + }, + expected: map[string]float64{ + "hyperNode3": 100.0, + "hyperNode4": 100.0, + "hyperNode5": 100.0, + }, + }, + // test case 2:Job is not first scheduler when the `LCAHyperNode` of the job is not empty and the currentLCAHyperNode hyperNode3, + // for the hyperNode3, it is equls to the LCAHyperNode, it is the best choice for the job, so it is expected to return 100.0 for the score. + // for the hyperNode4, find the LCA hyperNode is the hyperNode1 of tier index 2, it is a not good choice. according to calculate to return 36.9 for the score. + // for the hyperNode5, find the LCA hyperNode is the hyperNode0 of tier index 3, it is a worst choice. so it is expected to return 0.0 for the score. + { + name: "Normal LCA hyperNode to score for hyperNodes", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 1, + }, + hyperNodes: map[string][]*api.NodeInfo{ + "hyperNode3": nil, + "hyperNode4": nil, + "hyperNode5": nil, + }, + jobHyperNode: "hyperNode3", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{"node1", "node2"}, + "hyperNode4": []string{"node3", "node4"}, + "hyperNode5": []string{"node5", "node6"}, + "hyperNode6": []string{"node7", "node8"}, + }, + }, + expected: map[string]float64{ + "hyperNode3": 100.0, + "hyperNode4": 36.9, + "hyperNode5": 0.0, + }, + }, + { + // test case 3:Job is not first scheduler and the plugin weight is 2. + name: "Normal LCA hyperNode to score for hyperNodes with weight 2", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 2, + }, + hyperNodes: map[string][]*api.NodeInfo{ + "hyperNode3": nil, + "hyperNode4": nil, + "hyperNode5": nil, + }, + jobHyperNode: "hyperNode3", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{"node1", "node2"}, + "hyperNode4": []string{"node3", "node4"}, + "hyperNode5": []string{"node5", "node6"}, + "hyperNode6": []string{"node7", "node8"}, + }, + }, + expected: map[string]float64{ + "hyperNode3": 200.0, + "hyperNode4": 73.8, + "hyperNode5": 0.0, + }, + }, + { + name: "HyperNode has same LCAhyperNode for the job and the hyperNode will has the same score", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 1, + }, + hyperNodes: map[string][]*api.NodeInfo{ + "hyperNode3": nil, + "hyperNode4": nil, + "hyperNode5": nil, + }, + jobHyperNode: "hyperNode1", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{"node1", "node2"}, + "hyperNode4": []string{"node3", "node4"}, + "hyperNode5": []string{"node5", "node6"}, + "hyperNode6": []string{"node7", "node8"}, + }, + }, + expected: map[string]float64{ + "hyperNode3": 36.9, + "hyperNode4": 36.9, + "hyperNode5": 0.0, + }, + }, + { + name: "HyperNode has same LCAhyperNode for the job and the hyperNode will has the same score", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 1, + }, + hyperNodes: map[string][]*api.NodeInfo{ + "hyperNode3": nil, + "hyperNode4": nil, + "hyperNode5": nil, + }, + jobHyperNode: "hyperNode1", + tasks: map[string]string{ + "task1": "node1", + "task2": "node2", + "task3": "node3", + "test4": "", + }, + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{"node1", "node2"}, + "hyperNode4": []string{"node3", "node4"}, + "hyperNode5": []string{"node5", "node6"}, + "hyperNode6": []string{"node7", "node8"}, + }, + }, + expected: map[string]float64{ + "hyperNode3": 41.9, + "hyperNode4": 39.4, + "hyperNode5": 0.0, + }, + }, + } + + trueValue := true + for i, test := range tests { + tiers := []conf.Tier{ + { + Plugins: []conf.PluginOption{ + { + Name: PluginName, + EnabledNetworkTopology: &trueValue, + Arguments: test.arguments, + }, + }, + }, + } + ssn := test.RegisterSession(tiers, nil) + ssn.HyperNodes = test.hyperNodes + // mock for test + HyperNodeTree = test.hyperNodeTree + job := &api.JobInfo{ + Name: "test-job", + PodGroup: &api.PodGroup{}, + } + job.PodGroup.SetAnnotations(map[string]string{api.TopologyAllocateLCAHyperNode: test.jobHyperNode}) + job.Tasks = make(map[api.TaskID]*api.TaskInfo) + for name, node := range test.tasks { + taskInfo := &api.TaskInfo{ + UID: api.TaskID(name), + Name: name, + Job: job.UID, + } + job.Tasks[taskInfo.UID] = taskInfo + taskInfo.NodeName = node + } + scores, err := ssn.HyperNodeOrderMapFn(job, ssn.HyperNodes) + if err != nil { + t.Errorf("case%d: task %s has err %v", i, test.Name, err) + continue + } + hyperNodesScore := scores[PluginName] + for hypernode, expected := range test.expected { + if math.Abs(hyperNodesScore[hypernode]-expected) > eps { + t.Errorf("case%d: task %s on hypernode %s expect have score %v, but get %v", i+1, test.name, hypernode, expected, hyperNodesScore[hypernode]) + } + } + } +} + +func TestNetworkTopologyAwareNodeScore(t *testing.T) { + tests := []struct { + name string + uthelper.TestCommonStruct + arguments framework.Arguments + nodes []*api.NodeInfo + hyperNodeTree []map[string][]string + jobHyperNode string + tasks map[string]string + expected map[string]float64 + }{ + { + name: "job first scheduler when the `LCAHyperNode` of the job is empty when node score is 100.0", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 1, + }, + nodes: []*api.NodeInfo{ + { + Name: "node1", + }, + { + Name: "node3", + }, + { + Name: "node5", + }, + }, + jobHyperNode: "", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{"node1", "node2"}, + "hyperNode4": []string{"node3", "node4"}, + "hyperNode5": []string{"node5", "node6"}, + "hyperNode6": []string{"node7", "node8"}, + }, + }, + expected: map[string]float64{ + "node1": 100.0, + "node3": 100.0, + "node5": 100.0, + }, + }, + { + name: "Normal LCA hyperNode to score for nodes", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 1, + }, + nodes: []*api.NodeInfo{ + { + Name: "node1", + }, + { + Name: "node3", + }, + { + Name: "node5", + }, + }, + jobHyperNode: "hyperNode3", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{"node1", "node2"}, + "hyperNode4": []string{"node3", "node4"}, + "hyperNode5": []string{"node5", "node6"}, + "hyperNode6": []string{"node7", "node8"}, + }, + }, + expected: map[string]float64{ + "node1": 100.0, + "node3": 36.9, + "node5": 0.0, + }, + }, + { + name: "Normal LCA hyperNode to score for nodes with weight 2", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 2, + }, + nodes: []*api.NodeInfo{ + { + Name: "node1", + }, + { + Name: "node3", + }, + { + Name: "node5", + }, + }, + jobHyperNode: "hyperNode3", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{"node1", "node2"}, + "hyperNode4": []string{"node3", "node4"}, + "hyperNode5": []string{"node5", "node6"}, + "hyperNode6": []string{"node7", "node8"}, + }, + }, + expected: map[string]float64{ + "node1": 200.0, + "node3": 73.8, + "node5": 0.0, + }, + }, + { + name: "If the calculated LCAHyperNode is the same, the scores will be the same.", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 1, + }, + nodes: []*api.NodeInfo{ + { + Name: "node1", + }, + { + Name: "node3", + }, + { + Name: "node5", + }, + }, + jobHyperNode: "hyperNode1", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{"node1", "node2"}, + "hyperNode4": []string{"node3", "node4"}, + "hyperNode5": []string{"node5", "node6"}, + "hyperNode6": []string{"node7", "node8"}, + }, + }, + expected: map[string]float64{ + "node1": 36.9, + "node3": 36.9, + "node5": 0.0, + }, + }, + { + name: "If the calculated LCAHyperNode is the same, but the hyperNode has more tasks running, the score will be higher.", + TestCommonStruct: uthelper.TestCommonStruct{ + Plugins: map[string]framework.PluginBuilder{PluginName: New}, + }, + arguments: framework.Arguments{ + "weight": 1, + }, + nodes: []*api.NodeInfo{ + { + Name: "node1", + }, + { + Name: "node3", + }, + { + Name: "node5", + }, + }, + jobHyperNode: "hyperNode1", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{"node1", "node2"}, + "hyperNode4": []string{"node3", "node4"}, + "hyperNode5": []string{"node5", "node6"}, + "hyperNode6": []string{"node7", "node8"}, + }, + }, + tasks: map[string]string{ + "task1": "node1", + "task2": "node2", + "task3": "node3", + "test4": "", + }, + expected: map[string]float64{ + "node1": 41.9, + "node3": 39.4, + "node5": 0.0, + }, + }, + } + + trueValue := true + for i, test := range tests { + tiers := []conf.Tier{ + { + Plugins: []conf.PluginOption{ + { + Name: PluginName, + EnabledNodeOrder: &trueValue, + Arguments: test.arguments, + }, + }, + }, + } + ssn := test.RegisterSession(tiers, nil) + // mock for test + HyperNodeTree = test.hyperNodeTree + // mock job + job := &api.JobInfo{ + UID: "test-job", + Name: "test-job", + PodGroup: &api.PodGroup{}, + } + job.PodGroup.SetAnnotations(map[string]string{api.TopologyAllocateLCAHyperNode: test.jobHyperNode}) + ssn.Jobs = map[api.JobID]*api.JobInfo{ + job.UID: job, + } + job.Tasks = make(map[api.TaskID]*api.TaskInfo) + for name, node := range test.tasks { + taskInfo := &api.TaskInfo{ + UID: api.TaskID(name), + Name: name, + Job: job.UID, + } + job.Tasks[taskInfo.UID] = taskInfo + taskInfo.NodeName = node + } + // mock task + task := &api.TaskInfo{ + Name: "test4", + Job: job.UID, + } + scores, err := ssn.BatchNodeOrderFn(task, test.nodes) + if err != nil { + t.Errorf("case%d: task %s has err %v", i, test.Name, err) + continue + } + for node, expected := range test.expected { + if math.Abs(scores[node]-expected) > eps { + t.Errorf("case%d: task %s on node %s expect have score %v, but get %v", i+1, test.name, node, scores[node], expected) + } + } + } +} diff --git a/pkg/scheduler/util/scheduler_helper.go b/pkg/scheduler/util/scheduler_helper.go index f8a80e98b2..6c551bd274 100644 --- a/pkg/scheduler/util/scheduler_helper.go +++ b/pkg/scheduler/util/scheduler_helper.go @@ -222,6 +222,91 @@ func SelectBestHyperNode(hyperNodeScores map[float64][]string) string { return bestHyperNodes[rand.Intn(len(bestHyperNodes))] } +// Find the hyperNode to which the node belongs. +func FindHyperNodeOfNode(nodeName string, hyperNodeTree []map[string][]string) string { + for hyperNode, nodes := range hyperNodeTree[len(hyperNodeTree)-1] { + for _, node := range nodes { + if node == nodeName { + return hyperNode + } + } + } + return "" +} + +func FindJobTaskNumOfHyperNode(hyperNodeName string, job *api.JobInfo, hyperNodeTree []map[string][]string) int { + revertHyperNodeTree := make([]map[string][]string, len(hyperNodeTree)) + for i := len(hyperNodeTree) - 1; i >= 0; i-- { + revertHyperNodeTree[len(hyperNodeTree)-1-i] = hyperNodeTree[i] + } + + hyperNodesMap := make(map[string]sets.Set[string]) + for i := 0; i < len(revertHyperNodeTree); i++ { + for name, children := range revertHyperNodeTree[i] { + hyperNodesMap[name] = sets.Set[string]{} + hyperNodesMap[name].Insert(name) + for _, child := range children { + hyperNodesMap[name].Insert(child) + if v, ok := hyperNodesMap[child]; ok { + hyperNodesMap[name] = hyperNodesMap[name].Union(v) + } + } + } + } + // find the hyperNodeMap of the hyperNodeName + hyperNodeMap := hyperNodesMap[hyperNodeName] + taskCount := 0 + for _, task := range job.Tasks { + if hyperNodeMap.Has(task.NodeName) { + taskCount++ + } + } + return taskCount +} + +// FindOutRootHyperNode find out the root hypernode of the job when the hypernode join the job. +func FindLCAHyperNode(hyperNodeName string, jobHyperNode string, hyperNodeTree []map[string][]string) (string, int) { + revertHyperNodeTree := make([]map[string][]string, len(hyperNodeTree)) + for i := len(hyperNodeTree) - 1; i >= 0; i-- { + revertHyperNodeTree[len(hyperNodeTree)-1-i] = hyperNodeTree[i] + } + + hyperNodesMap := make(map[string]sets.Set[string]) + for i := 0; i < len(revertHyperNodeTree); i++ { + for name, children := range revertHyperNodeTree[i] { + hyperNodesMap[name] = sets.Set[string]{} + hyperNodesMap[name].Insert(name) + for _, child := range children { + hyperNodesMap[name].Insert(child) + if v, ok := hyperNodesMap[child]; ok { + hyperNodesMap[name] = hyperNodesMap[name].Union(v) + } + } + } + } + + hyperNodesListByTier := [][]string{} + for i := 0; i < len(revertHyperNodeTree); i++ { + hyperNodes := []string{} + for name := range revertHyperNodeTree[i] { + hyperNodes = append(hyperNodes, name) + } + hyperNodesListByTier = append(hyperNodesListByTier, hyperNodes) + } + + for index, tierHyperNodes := range hyperNodesListByTier { + for _, hyperNode := range tierHyperNodes { + hyperNodeSet := hyperNodesMap[hyperNode] + if hyperNodeSet.Has(hyperNodeName) { + if jobHyperNode == "" || hyperNodeSet.Has(jobHyperNode) { + return hyperNode, index + 1 + } + } + } + } + return "", -1 +} + // GetNodeList returns values of the map 'nodes' func GetNodeList(nodes map[string]*api.NodeInfo, nodeList []string) []*api.NodeInfo { result := make([]*api.NodeInfo, 0, len(nodeList)) diff --git a/pkg/scheduler/util/scheduler_helper_test.go b/pkg/scheduler/util/scheduler_helper_test.go index a61f625fdb..0c247a7fee 100644 --- a/pkg/scheduler/util/scheduler_helper_test.go +++ b/pkg/scheduler/util/scheduler_helper_test.go @@ -17,9 +17,10 @@ limitations under the License. package util import ( - "k8s.io/apimachinery/pkg/util/sets" "testing" + "k8s.io/apimachinery/pkg/util/sets" + "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/api/equality" @@ -240,3 +241,163 @@ func TestGetHyperNodeList(t *testing.T) { }) } } + +func TestFindLCAHyperNode(t *testing.T) { + testCases := []struct { + name string + hyperNodeName string + currentJobLCAHyperNode string + hyperNodeTree []map[string][]string + expectedNode string + expectedLevel int + }{ + // test case 1:When the `LCAHyperNode` of the job is empty and we are looking for the Lowest Common Ancestor (LCA) of a leaf node, it is expected to return the corresponding node and the level should be 1. + { + name: "Job LCAHyperNode empty for leaf node", + hyperNodeName: "hyperNode3", + currentJobLCAHyperNode: "", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{}, + "hyperNode4": []string{}, + "hyperNode5": []string{}, + "hyperNode6": []string{}, + }, + }, + expectedNode: "hyperNode3", + expectedLevel: 1, + }, + // Test case 2: When the `LCAHyperNode` of the job is the same as the name of the input hypernode, it is expected to return that hypernode and the level should be 1. + { + name: "Job LCAHyperNode equals input hyperNodeName", + hyperNodeName: "hyperNode0", + currentJobLCAHyperNode: "", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{}, + "hyperNode4": []string{}, + "hyperNode5": []string{}, + "hyperNode6": []string{}, + }, + }, + expectedNode: "hyperNode0", + expectedLevel: 3, + }, + // Test case 3: In the situation of normally looking for the Lowest Common Ancestor (LCA) hypernode of a non-leaf node, it is expected to return the correct LCA hypernode and its corresponding level. + { + name: "Normal LCA find for non-leaf node", + hyperNodeName: "hyperNode4", + currentJobLCAHyperNode: "", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{}, + "hyperNode4": []string{}, + "hyperNode5": []string{}, + "hyperNode6": []string{}, + }, + }, + expectedNode: "hyperNode0", + expectedLevel: 3, + }, + // test case4:Look for the Lowest Common Ancestor (LCA) hypernode of nodes on different branches, and it is expected to return the correct result and its corresponding level. + { + name: "LCA find for nodes in different branches", + hyperNodeName: "hyperNode5", + currentJobLCAHyperNode: "", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{}, + "hyperNode4": []string{}, + "hyperNode5": []string{}, + "hyperNode6": []string{}, + }, + }, + expectedNode: "hyperNode0", + expectedLevel: 3, + }, + // test case5:Look for the Lowest Common Ancestor (LCA) hypernode of nodes on different branches, and it is expected to return the correct result and its corresponding level. + { + name: "LCA find for nodes in different branches", + hyperNodeName: "hyperNode1", + currentJobLCAHyperNode: "", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{}, + "hyperNode4": []string{}, + "hyperNode5": []string{}, + "hyperNode6": []string{}, + }, + }, + expectedNode: "hyperNode0", + expectedLevel: 3, + }, + // test case6:In the case where the LCA hypernode is not found (when a non-existent node name is passed in), it is expected to return an empty string and -1. + { + name: "No LCA found for non-existent node", + hyperNodeName: "nonExistentNode", + currentJobLCAHyperNode: "", + hyperNodeTree: []map[string][]string{ + { + "hyperNode0": []string{"hyperNode1", "hyperNode2"}, + }, + { + "hyperNode1": []string{"hyperNode3", "hyperNode4"}, + "hyperNode2": []string{"hyperNode5", "hyperNode6"}, + }, + { + "hyperNode3": []string{}, + "hyperNode4": []string{}, + "hyperNode5": []string{}, + "hyperNode6": []string{}, + }, + }, + expectedNode: "", + expectedLevel: -1, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + resultNode, resultLevel := FindLCAHyperNode(tc.hyperNodeName, tc.currentJobLCAHyperNode, tc.hyperNodeTree) + if resultNode != tc.expectedNode || resultLevel != tc.expectedLevel { + t.Errorf("Test case '%s' failed. Expected node: %s, level: %d. Got node: %s, level: %d", + tc.name, tc.expectedNode, tc.expectedLevel, resultNode, resultLevel) + } + }) + } +}