Skip to content

Commit

Permalink
Update topologyspreadconstraint.go
Browse files Browse the repository at this point in the history
  • Loading branch information
nitindagar0 authored Jul 26, 2023
1 parent f7b7f50 commit ba4b275
Showing 1 changed file with 71 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"k8s.io/klog/v2"
utilpointer "k8s.io/utils/pointer"

v1helper "k8s.io/component-helpers/scheduling/corev1"
nodeaffinity "k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
"sigs.k8s.io/descheduler/pkg/descheduler/node"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
Expand Down Expand Up @@ -79,6 +81,52 @@ func New(args runtime.Object, handle frameworktypes.Handle) (frameworktypes.Plug
}, nil
}

type topologyConstraintSet struct {
constraint v1.TopologySpreadConstraint
podNodeAffinity nodeaffinity.RequiredNodeAffinity
podTolerations []v1.Toleration
}

// DoNotScheduleTaintsFilterFunc returns the filter function that can
// filter out the node taints that reject scheduling Pod on a Node.
func DoNotScheduleTaintsFilterFunc() func(t *v1.Taint) bool {
return func(t *v1.Taint) bool {
// PodToleratesNodeTaints is only interested in NoSchedule and NoExecute taints.
return t.Effect == v1.TaintEffectNoSchedule || t.Effect == v1.TaintEffectNoExecute
}
}

func filterEligibleNodes(nodes []*v1.Node, constraintSet topologyConstraintSet) []*v1.Node {
constraint := constraintSet.constraint
nodeAffinity := constraintSet.podNodeAffinity
tolerations := constraintSet.podTolerations
var eligibleNodes []*v1.Node
for _, node := range nodes {
if matchNodeInclusionPolicies(&constraint, tolerations, node, nodeAffinity) {
eligibleNodes = append(eligibleNodes, node)
}
}
return eligibleNodes
}

func matchNodeInclusionPolicies(tsc *v1.TopologySpreadConstraint, tolerations []v1.Toleration, node *v1.Node, require nodeaffinity.RequiredNodeAffinity) bool {
// Nil is equivalent to honor
if tsc.NodeAffinityPolicy == nil || *tsc.NodeAffinityPolicy == v1.NodeInclusionPolicyHonor {
// We ignore parsing errors here for backwards compatibility.
if match, _ := require.Match(node); !match {
return false
}
}

// Nil is equivalent to ignore
if tsc.NodeTaintsPolicy != nil && *tsc.NodeTaintsPolicy == v1.NodeInclusionPolicyHonor {
if _, untolerated := v1helper.FindMatchingUntoleratedTaint(node.Spec.Taints, tolerations, DoNotScheduleTaintsFilterFunc()); untolerated {
return false
}
}
return true
}

// Name retrieves the plugin name
func (d *RemovePodsViolatingTopologySpreadConstraint) Name() string {
return PluginName
Expand Down Expand Up @@ -132,7 +180,7 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex
}

// ...where there is a topology constraint
namespaceTopologySpreadConstraints := []v1.TopologySpreadConstraint{}
namespaceTopologySpreadConstraints := []topologyConstraintSet{}
for _, pod := range namespacedPods[namespace] {
for _, constraint := range pod.Spec.TopologySpreadConstraints {
// Ignore topology constraints if they are not included
Expand All @@ -142,24 +190,35 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex
// Need to check v1.TopologySpreadConstraint deepEquality because
// v1.TopologySpreadConstraint has pointer fields
// and we don't need to go over duplicated constraints later on
if hasIdenticalConstraints(constraint, namespaceTopologySpreadConstraints) {
requiredSchedulingTerm := nodeaffinity.GetRequiredNodeAffinity(pod)
namespaceTopologySpreadConstraint := topologyConstraintSet{
constraint: constraint,
podNodeAffinity: requiredSchedulingTerm,
podTolerations: pod.Spec.Tolerations,
}
if hasIdenticalConstraints(namespaceTopologySpreadConstraint, namespaceTopologySpreadConstraints) {
continue
}
namespaceTopologySpreadConstraints = append(namespaceTopologySpreadConstraints, constraint)
namespaceTopologySpreadConstraints = append(namespaceTopologySpreadConstraints, namespaceTopologySpreadConstraint)
}
}
if len(namespaceTopologySpreadConstraints) == 0 {
continue
}

// 2. for each topologySpreadConstraint in that namespace
for _, constraint := range namespaceTopologySpreadConstraints {
for _, constraintSet := range namespaceTopologySpreadConstraints {
constraint := constraintSet.constraint
nodeAffinity := constraintSet.podNodeAffinity
tolerations := constraintSet.podTolerations
constraintTopologies := make(map[topologyPair][]*v1.Pod)
// pre-populate the topologyPair map with all the topologies available from the nodeMap
// (we can't just build it from existing pods' nodes because a topology may have 0 pods)
for _, node := range nodeMap {
if val, ok := node.Labels[constraint.TopologyKey]; ok {
constraintTopologies[topologyPair{key: constraint.TopologyKey, value: val}] = make([]*v1.Pod, 0)
if matchNodeInclusionPolicies(&constraint, tolerations, node, nodeAffinity) {
constraintTopologies[topologyPair{key: constraint.TopologyKey, value: val}] = make([]*v1.Pod, 0)
}
}
}

Expand Down Expand Up @@ -202,7 +261,7 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex
klog.V(2).InfoS("Skipping topology constraint because it is already balanced", "constraint", constraint)
continue
}
d.balanceDomains(podsForEviction, constraint, constraintTopologies, sumPods, nodes)
d.balanceDomains(podsForEviction, constraintSet, constraintTopologies, sumPods, nodes)
}
}

Expand All @@ -227,9 +286,9 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex
}

// hasIdenticalConstraints checks if we already had an identical TopologySpreadConstraint in namespaceTopologySpreadConstraints slice
func hasIdenticalConstraints(newConstraint v1.TopologySpreadConstraint, namespaceTopologySpreadConstraints []v1.TopologySpreadConstraint) bool {
func hasIdenticalConstraints(newConstraint topologyConstraintSet, namespaceTopologySpreadConstraints []topologyConstraintSet) bool {
for _, constraint := range namespaceTopologySpreadConstraints {
if reflect.DeepEqual(newConstraint, constraint) {
if reflect.DeepEqual(newConstraint.constraint, constraint.constraint) && reflect.DeepEqual(newConstraint.podNodeAffinity, constraint.podNodeAffinity) && reflect.DeepEqual(newConstraint.podTolerations, constraint.podTolerations) {
return true
}
}
Expand Down Expand Up @@ -277,18 +336,20 @@ func topologyIsBalanced(topology map[topologyPair][]*v1.Pod, constraint v1.Topol
// (assuming even distribution by the scheduler of the evicted pods)
func (d *RemovePodsViolatingTopologySpreadConstraint) balanceDomains(
podsForEviction map[*v1.Pod]struct{},
constraint v1.TopologySpreadConstraint,
constraintSet topologyConstraintSet,
constraintTopologies map[topologyPair][]*v1.Pod,
sumPods float64,
nodes []*v1.Node,
) {
constraint := constraintSet.constraint
idealAvg := sumPods / float64(len(constraintTopologies))
isEvictable := d.handle.Evictor().Filter
sortedDomains := sortDomains(constraintTopologies, isEvictable)
getPodsAssignedToNode := d.handle.GetPodsAssignedToNodeFunc()
topologyBalanceNodeFit := utilpointer.BoolDeref(d.args.TopologyBalanceNodeFit, true)

nodesBelowIdealAvg := filterNodesBelowIdealAvg(nodes, sortedDomains, constraint.TopologyKey, idealAvg)
eligibleNodes := filterEligibleNodes(nodes, constraintSet)
nodesBelowIdealAvg := filterNodesBelowIdealAvg(eligibleNodes, sortedDomains, constraint.TopologyKey, idealAvg)

// i is the index for belowOrEqualAvg
// j is the index for aboveAvg
Expand Down

0 comments on commit ba4b275

Please sign in to comment.