Skip to content

Commit

Permalink
Merge pull request #539 from chenkaiyue/schedule-enable
Browse files Browse the repository at this point in the history
Update schedule action determines logic & Add label selector in PodQOS
  • Loading branch information
mfanjie authored Aug 30, 2022
2 parents b655937 + ffba2f4 commit a741f08
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 19 deletions.
24 changes: 13 additions & 11 deletions pkg/ensurance/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,27 +520,29 @@ func (s *AnomalyAnalyzer) mergeSchedulingActions(actionContexts []ecache.ActionC
if len(actionContexts) == 0 {
s.ToggleScheduleSetting(avoidanceExecutor, false)
} else {
// If there is any actionContext is Triggered , or is Restored but in the CoolDownSeconds period, schedule should be disable;
// Otherwise, schedule shoule be able.
for _, ac := range actionContexts {
klog.V(4).Infof("actionContext %+v", ac)
klog.V(4).Infof("trigger times %+v, restore times %+v", s.triggered, s.restored)
if ac.Triggered {
metrics.UpdateAnalyzerStatus(metrics.AnalyzeTypeEnableScheduling, float64(0))
s.ToggleScheduleSetting(avoidanceExecutor, true)
break
}

action, ok := actionMap[ac.ActionName]
if !ok {
klog.Warningf("Action %s defined in nodeQOS %s is not found", ac.ActionName, ac.NodeQOS.Name)
continue
}

if ac.Triggered {
if ac.Restored && !now.After(s.lastTriggeredTime.Add(time.Duration(action.Spec.CoolDownSeconds)*time.Second)) {
metrics.UpdateAnalyzerStatus(metrics.AnalyzeTypeEnableScheduling, float64(0))
s.ToggleScheduleSetting(avoidanceExecutor, true)
}

if ac.Restored {
if now.After(s.lastTriggeredTime.Add(time.Duration(action.Spec.CoolDownSeconds) * time.Second)) {
metrics.UpdateAnalyzerStatus(metrics.AnalyzeTypeEnableScheduling, float64(1))
s.ToggleScheduleSetting(avoidanceExecutor, false)
}
break
}
}

metrics.UpdateAnalyzerStatus(metrics.AnalyzeTypeEnableScheduling, float64(1))
s.ToggleScheduleSetting(avoidanceExecutor, false)
}
}

Expand Down
62 changes: 62 additions & 0 deletions pkg/ensurance/analyzer/podqos-fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package analyzer

import (
"fmt"
"reflect"
"strconv"
"strings"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/klog/v2"
Expand All @@ -21,6 +23,52 @@ type ObjectIdentity struct {
Labels map[string]string
}

func labelMatch(labelSelector metav1.LabelSelector, matchLabels map[string]string) bool {
for k, v := range labelSelector.MatchLabels {
if matchLabels[k] != v {
return false
}
}

for _, expr := range labelSelector.MatchExpressions {
switch expr.Operator {
case metav1.LabelSelectorOpExists:
if _, exists := matchLabels[expr.Key]; !exists {
return false
}
case metav1.LabelSelectorOpDoesNotExist:
if _, exists := matchLabels[expr.Key]; exists {
return false
}
case metav1.LabelSelectorOpIn:
if v, exists := matchLabels[expr.Key]; !exists {
return false
} else {
var found bool
for i := range expr.Values {
if expr.Values[i] == v {
found = true
break
}
}
if !found {
return false
}
}
case metav1.LabelSelectorOpNotIn:
if v, exists := matchLabels[expr.Key]; exists {
for i := range expr.Values {
if expr.Values[i] == v {
return false
}
}
}
}
}

return true
}

func match(pod *v1.Pod, podQOS *ensuranceapi.PodQOS) bool {

if podQOS.Spec.ScopeSelector == nil &&
Expand All @@ -29,6 +77,20 @@ func match(pod *v1.Pod, podQOS *ensuranceapi.PodQOS) bool {
return false
}

if !reflect.DeepEqual(podQOS.Spec.LabelSelector, metav1.LabelSelector{}) {
matchLabels := map[string]string{}
for k, v := range pod.Labels {
matchLabels[k] = v
}
if !labelMatch(podQOS.Spec.LabelSelector, matchLabels) {
return false
}
}

if podQOS.Spec.ScopeSelector == nil {
return true
}

// AND of the selectors
var nameSpaceSelectors, prioritySelectors, qosClassSelectors []ensuranceapi.ScopedResourceSelectorRequirement
for _, ss := range podQOS.Spec.ScopeSelector.MatchExpressions {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ensurance/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (s *StateCollector) UpdateCollectors() {

break
}
// if node resource controller is enabled, it indicates local metrics need to be collected no matter nep is defined or not
// if node resource controller is enabled, it indicates local metrics need to be collected no matter nodeqos is defined or not
if nodeResourceGate := utilfeature.DefaultFeatureGate.Enabled(features.CraneNodeResource); nodeResourceGate {
if _, exists := s.collectors.Load(types.NodeLocalCollectorType); !exists {
nc := nodelocal.NewNodeLocal(s.ifaces, s.exclusiveCPUSet)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ensurance/executor/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type ExecuteContext struct {
// Key is the metric name, value is (actual used)-(the lowest watermark for NodeQOSEnsurancePolicies which use throttleDown action)
ToBeThrottleDown Gaps
// Key is the metric name, value is (actual used)-(the lowest watermark for NodeQOSEnsurancePolicies which use throttleUp action)
TOBEThrottleUp Gaps
ToBeThrottleUp Gaps
// key is the metric name, value is (actual used)-(the lowest watermark for NodeQOSEnsurancePolicies which use evict action)
ToBeEvict Gaps

Expand Down
12 changes: 6 additions & 6 deletions pkg/ensurance/executor/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,16 +177,16 @@ func (t *ThrottleExecutor) Restore(ctx *ExecuteContext) error {
errPodKeys = t.restorePods(ctx, &totalReleased, highestPrioriyMetric)
}
} else {
ctx.TOBEThrottleUp = calculateGaps(ctx.stateMap, t, nil, ctx.executeExcessPercent)
ctx.ToBeThrottleUp = calculateGaps(ctx.stateMap, t, nil, ctx.executeExcessPercent)

if ctx.TOBEThrottleUp.HasUsageMissedMetric() {
if ctx.ToBeThrottleUp.HasUsageMissedMetric() {
klog.V(6).Info("There is a metric usage missed")
highestPrioriyMetric := t.ThrottleUpWatermark.GetHighestPriorityThrottleAbleMetric()
if highestPrioriyMetric != "" {
errPodKeys = t.restorePods(ctx, &totalReleased, highestPrioriyMetric)
}
} else {
// The metrics in TOBEThrottleUp are all in WatermarkMetricsCanBeQuantified and has current usage, then throttle precisely
// The metrics in ToBeThrottleUp are all in WatermarkMetricsCanBeQuantified and has current usage, then throttle precisely
var released ReleaseResource
for _, m := range metricsThrottleQuantified {
klog.V(6).Infof("ThrottleUp precisely on metric %s", m)
Expand All @@ -202,14 +202,14 @@ func (t *ThrottleExecutor) Restore(ctx *ExecuteContext) error {
klog.V(6).Info(pc.Key.String())
}

for index := 0; !ctx.TOBEThrottleUp.TargetGapsRemoved(m) && index < len(t.ThrottleUpPods); index++ {
klog.V(6).Infof("For metric %s, there is still gap to watermarks: %f", m, ctx.TOBEThrottleUp[m])
for index := 0; !ctx.ToBeThrottleUp.TargetGapsRemoved(m) && index < len(t.ThrottleUpPods); index++ {
klog.V(6).Infof("For metric %s, there is still gap to watermarks: %f", m, ctx.ToBeThrottleUp[m])

errKeys, released = metricMap[m].RestoreFunc(ctx, index, t.ThrottleUpPods, &totalReleased)
klog.V(6).Infof("ThrottleUp pods %s, released %f resource", t.ThrottleUpPods[index].Key, released[m])
errPodKeys = append(errPodKeys, errKeys...)

ctx.TOBEThrottleUp[m] -= released[m]
ctx.ToBeThrottleUp[m] -= released[m]
}
}
}
Expand Down

0 comments on commit a741f08

Please sign in to comment.