Skip to content

Commit

Permalink
add nodegroup plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
wuyueandrew committed Sep 20, 2023
1 parent 015f86b commit 5651c3f
Show file tree
Hide file tree
Showing 3 changed files with 429 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/scheduler/plugins/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"volcano.sh/volcano/pkg/scheduler/plugins/drf"
"volcano.sh/volcano/pkg/scheduler/plugins/extender"
"volcano.sh/volcano/pkg/scheduler/plugins/gang"
"volcano.sh/volcano/pkg/scheduler/plugins/nodegroup"
"volcano.sh/volcano/pkg/scheduler/plugins/nodeorder"
"volcano.sh/volcano/pkg/scheduler/plugins/numaaware"
"volcano.sh/volcano/pkg/scheduler/plugins/overcommit"
Expand Down Expand Up @@ -55,6 +56,7 @@ func init() {
framework.RegisterPluginBuilder(cdp.PluginName, cdp.New)
framework.RegisterPluginBuilder(rescheduling.PluginName, rescheduling.New)
framework.RegisterPluginBuilder(usage.PluginName, usage.New)
framework.RegisterPluginBuilder(nodegroup.PluginName, nodegroup.New)

// Plugins for Queues
framework.RegisterPluginBuilder(proportion.PluginName, proportion.New)
Expand Down
228 changes: 228 additions & 0 deletions pkg/scheduler/plugins/nodegroup/nodegroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package nodegroup

import (
"errors"
"fmt"
"strings"

"k8s.io/klog"
batch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
sch "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/framework"
)

const (
// PluginName indicates name of volcano scheduler plugin.
PluginName = "nodegroup"
NodeGroupNameKey = "volcano.sh/nodegroup-name"
)

type nodeGroupPlugin struct {
// Arguments given for the plugin
pluginArguments framework.Arguments
}

// New function returns prioritize plugin object.
func New(arguments framework.Arguments) framework.Plugin {
return &nodeGroupPlugin{pluginArguments: arguments}
}

func (pp *nodeGroupPlugin) Name() string {
return PluginName
}

type queueGroupAffinity struct {
queueGroupAntiAffinityRequired map[string][]string
queueGroupAntiAffinityPreferred map[string][]string
queueGroupAffinityRequired map[string][]string
queueGroupAffinityPreferred map[string][]string
}

func NewQueueGroupAffinity() queueGroupAffinity {
return queueGroupAffinity{
queueGroupAntiAffinityRequired: make(map[string][]string, 0),
queueGroupAntiAffinityPreferred: make(map[string][]string, 0),
queueGroupAffinityRequired: make(map[string][]string, 0),
queueGroupAffinityPreferred: make(map[string][]string, 0),
}
}
func (q queueGroupAffinity) predicate(queue, group string) error {
if len(queue) == 0 {
return nil
}
if q.queueGroupAffinityRequired != nil {
if groups, ok := q.queueGroupAffinityRequired[queue]; ok {
if !contains(groups, group) {
return errors.New("NodeGroupAffinityRequired")
}
}
}
if q.queueGroupAntiAffinityRequired != nil {
if groups, ok := q.queueGroupAntiAffinityRequired[queue]; ok {
if contains(groups, group) {
return errors.New("NodeGroupAntiAffinityRequired")
}
}
}
return nil
}

func (q queueGroupAffinity) score(queue string, group string) float64 {
nodeScore := 0.0
if len(queue) == 0 {
return nodeScore
}
if q.queueGroupAffinityPreferred != nil {
if groups, ok := q.queueGroupAffinityPreferred[queue]; ok {
if !contains(groups, group) {
return 1.0
}
}
}
if q.queueGroupAntiAffinityPreferred != nil {
if groups, ok := q.queueGroupAntiAffinityPreferred[queue]; ok {
if contains(groups, group) {
return -1.0
}
}
}
return nodeScore
}

func contains(slice []string, item string) bool {
if len(slice) == 0 {
return false
}
for _, s := range slice {
if strings.Compare(s, item) == 0 {
return true
}
}
return false
}

//
// User should specify arguments in the config in this format:
//
// actions: "reclaim, allocate, backfill, preempt"
// tiers:
// - plugins:
// - name: priority
// - name: gang
// - name: conformance
// - plugins:
// - name: drf
// - name: predicates
// - name: proportion
// - name: nodegroup
// enablePredicate: true
// enableNodeOrder: true

func calculateArguments(ssn *framework.Session, args framework.Arguments) queueGroupAffinity {
queueGroupAffinity := NewQueueGroupAffinity()
for _, queue := range ssn.Queues {
affinity := queue.Queue.Spec.Affinity
if nil == affinity {
continue
}
nodeGroupAffinity := affinity.NodeGroupAffinity
if nil != nodeGroupAffinity {
preferreds := nodeGroupAffinity.PreferredDuringSchedulingIgnoredDuringExecution
if len(preferreds) > 0 {
queueGroupAffinity.queueGroupAffinityPreferred[queue.Name] = preferreds
}
requireds := nodeGroupAffinity.RequiredDuringSchedulingIgnoredDuringExecution
if len(requireds) > 0 {
queueGroupAffinity.queueGroupAffinityRequired[queue.Name] = requireds
}
}
nodeGroupAntiAffinity := affinity.NodeGroupAntiAffinity
if nil != nodeGroupAntiAffinity {
preferreds := nodeGroupAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution
if len(preferreds) > 0 {
queueGroupAffinity.queueGroupAntiAffinityPreferred[queue.Name] = preferreds
}
requireds := nodeGroupAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
if len(requireds) > 0 {
queueGroupAffinity.queueGroupAntiAffinityRequired[queue.Name] = requireds
}
}
}
return queueGroupAffinity
}

// There are 3 ways to assign pod to queue for now:
// scheduling.volcano.sh/queue-name support only annotation
// volcano.sh/queue-name support both labels & annotation
// the key should be unified, maybe volcano.sh/queue-name is better
func GetPodQueue(task *api.TaskInfo) string {
if _, ok := task.Pod.Labels[batch.QueueNameKey]; ok {
return task.Pod.Labels[batch.QueueNameKey]
}
if _, ok := task.Pod.Annotations[batch.QueueNameKey]; ok {
return task.Pod.Annotations[batch.QueueNameKey]
}
if _, ok := task.Pod.Annotations[sch.QueueNameAnnotationKey]; ok {
return task.Pod.Annotations[sch.QueueNameAnnotationKey]
}
return ""
}

func (np *nodeGroupPlugin) OnSessionOpen(ssn *framework.Session) {
queueGroupAffinity := calculateArguments(ssn, np.pluginArguments)
klog.V(3).Infof("queueGroupAffinity queueGroupAntiAffinityRequired <%v> queueGroupAntiAffinityPreferred <%v> queueGroupAffinityRequired <%v> queueGroupAffinityPreferred <%v> groupLabelName <%v>",
queueGroupAffinity.queueGroupAntiAffinityRequired, queueGroupAffinity.queueGroupAntiAffinityPreferred,
queueGroupAffinity.queueGroupAffinityRequired, queueGroupAffinity.queueGroupAffinityPreferred, NodeGroupNameKey)
nodeOrderFn := func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) {
group := node.Node.Labels[NodeGroupNameKey]
queue := GetPodQueue(task)
score := queueGroupAffinity.score(queue, group)
klog.V(3).Infof("task <%s>/<%s> queue %s on node %s of nodegroup %s, score %v", task.Namespace, task.Name, queue, node.Name, group, score)
return score, nil
}
ssn.AddNodeOrderFn(np.Name(), nodeOrderFn)

predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
predicateStatus := make([]*api.Status, 0)

group := node.Node.Labels[NodeGroupNameKey]
queue := GetPodQueue(task)
if err := queueGroupAffinity.predicate(queue, group); err != nil {
nodeStatus := &api.Status{
Code: api.UnschedulableAndUnresolvable,
Reason: "node not satisfy",
}
predicateStatus = append(predicateStatus, nodeStatus)
return predicateStatus, fmt.Errorf("<%s> predicates Task <%s/%s> on Node <%s> of nodegroup <%v> failed <%v>", np.Name(), task.Namespace, task.Name, node.Name, group, err)
}
klog.V(3).Infof("task <%s>/<%s> queue %s on node %s of nodegroup %v", task.Namespace, task.Name, queue, node.Name, group)
nodeStatus := &api.Status{
Code: api.Success,
Reason: "node satisfy task",
}
predicateStatus = append(predicateStatus, nodeStatus)
return predicateStatus, nil
}

ssn.AddPredicateFn(np.Name(), predicateFn)
}

func (np *nodeGroupPlugin) OnSessionClose(ssn *framework.Session) {
}
Loading

0 comments on commit 5651c3f

Please sign in to comment.