Skip to content

Commit

Permalink
simplify function of calculating gaps
Browse files Browse the repository at this point in the history
  • Loading branch information
mfanjie committed Aug 29, 2022
1 parent 4f0deec commit 0dea480
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 136 deletions.
34 changes: 17 additions & 17 deletions pkg/ensurance/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (s *AnomalyAnalyzer) computeActionContext(aboveThreshold bool, key string,
s.restored[key] = uint64(object.RestoreThreshold)
}
// only do restore action after trigger x times and restore y times
if s.triggered[key] == uint64(object.AvoidanceThreshold) &&
if s.triggered[key] >= uint64(object.AvoidanceThreshold) &&
s.restored[key] >= uint64(object.RestoreThreshold) {
// reset trigger count when restored
s.triggered[key] = 0
Expand All @@ -300,13 +300,13 @@ func (s *AnomalyAnalyzer) computeActionContext(aboveThreshold bool, key string,
}
}

func (s *AnomalyAnalyzer) filterDryRun(acs []ecache.ActionContext) []ecache.ActionContext {
func (s *AnomalyAnalyzer) filterDryRun(actionContexts []ecache.ActionContext) []ecache.ActionContext {
var dcsFiltered []ecache.ActionContext
now := time.Now()
for _, ac := range acs {
s.logEvent(ac, now)
if !(ac.Strategy == ensuranceapi.AvoidanceActionStrategyPreview) {
dcsFiltered = append(dcsFiltered, ac)
for _, actionContext := range actionContexts {
s.logEvent(actionContext, now)
if !(actionContext.Strategy == ensuranceapi.AvoidanceActionStrategyPreview) {
dcsFiltered = append(dcsFiltered, actionContext)
}
}
return dcsFiltered
Expand All @@ -323,29 +323,29 @@ func (s *AnomalyAnalyzer) merge(stateMap map[string][]common.TimeSeries, actionM
//step2 do DisableScheduled merge
s.mergeSchedulingActions(filteredActionContext, actionMap, &executor)

for _, actionCtx := range filteredActionContext {
action, ok := actionMap[actionCtx.ActionName]
for _, context := range filteredActionContext {
action, ok := actionMap[context.ActionName]
if !ok {
klog.Warningf("Action %s is triggered, but the AvoidanceAction is not defined.", actionCtx.ActionName)
klog.Warningf("Action %s is triggered, but the AvoidanceAction is not defined.", context.ActionName)
continue
}

//step3 get and deduplicate throttlePods, throttleUpPods
if action.Spec.Throttle != nil {
throttlePods, throttleUpPods := s.getThrottlePods(actionCtx, action, stateMap)
throttlePods, throttleUpPods := s.getThrottlePods(context, action, stateMap)

// combine the throttle watermark
combineThrottleWatermark(&executor.ThrottleExecutor, actionCtx)
combineThrottleWatermark(&executor.ThrottleExecutor, context)
// combine the replicated pod
combineThrottleDuplicate(&executor.ThrottleExecutor, throttlePods, throttleUpPods)
}

//step4 get and deduplicate evictPods
if action.Spec.Eviction != nil {
evictPods := s.getEvictPods(actionCtx.Triggered, action, stateMap)
evictPods := s.getEvictPods(context.Triggered, action, stateMap)

// combine the evict watermark
combineEvictWatermark(&executor.EvictExecutor, actionCtx)
combineEvictWatermark(&executor.EvictExecutor, context)
// combine the replicated pod
combineEvictDuplicate(&executor.EvictExecutor, evictPods)
}
Expand Down Expand Up @@ -512,13 +512,13 @@ func contains(s []string, e string) bool {
return false
}

func (s *AnomalyAnalyzer) mergeSchedulingActions(actionContexts []ecache.ActionContext, actionMap map[string]*ensuranceapi.AvoidanceAction, ae *executor.AvoidanceExecutor) {
func (s *AnomalyAnalyzer) mergeSchedulingActions(actionContexts []ecache.ActionContext, actionMap map[string]*ensuranceapi.AvoidanceAction, avoidanceExecutor *executor.AvoidanceExecutor) {
var now = time.Now()

// If the ensurance rules are empty, it must be recovered soon.
// So we set enableScheduling true
if len(actionContexts) == 0 {
s.ToggleScheduleSetting(ae, false)
s.ToggleScheduleSetting(avoidanceExecutor, false)
} else {
for _, ac := range actionContexts {
klog.V(4).Infof("actionContext %+v", ac)
Expand All @@ -531,13 +531,13 @@ func (s *AnomalyAnalyzer) mergeSchedulingActions(actionContexts []ecache.ActionC

if ac.Triggered {
metrics.UpdateAnalyzerStatus(metrics.AnalyzeTypeEnableScheduling, float64(0))
s.ToggleScheduleSetting(ae, true)
s.ToggleScheduleSetting(avoidanceExecutor, true)
}

if ac.Restored {
if now.After(s.lastTriggeredTime.Add(time.Duration(action.Spec.CoolDownSeconds) * time.Second)) {
metrics.UpdateAnalyzerStatus(metrics.AnalyzeTypeEnableScheduling, float64(1))
s.ToggleScheduleSetting(ae, false)
s.ToggleScheduleSetting(avoidanceExecutor, false)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ensurance/analyzer/podqos-fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func match(pod *v1.Pod, podQOS *ensuranceapi.PodQOS) bool {
return false
}
if !match {
klog.V(6).Infof("SvcQOS %s namespace selector not match pod %s/%s", podQOS.Name, pod.Namespace, pod.Name)
klog.V(6).Infof("PodQOS %s namespace selector not match pod %s/%s", podQOS.Name, pod.Namespace, pod.Name)
return false
}
}
Expand Down Expand Up @@ -173,7 +173,7 @@ func match(pod *v1.Pod, podQOS *ensuranceapi.PodQOS) bool {
qosClassMatch = false
}
if !match {
klog.V(6).Infof("SvcQOS %s qosclass selector not match pod %s/%s", podQOS.Name, pod.Namespace, pod.Name)
klog.V(6).Infof("PodQOS %s qosclass selector not match pod %s/%s", podQOS.Name, pod.Namespace, pod.Name)
qosClassMatch = false
}
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/ensurance/executor/cpu_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var cpuUsage = metric{

Evictable: true,
EvictQuantified: true,
EvictFunc: evictOnePodCpu,
EvictFunc: evictPod,
}

func throttleOnePodCpu(ctx *ExecuteContext, index int, ThrottleDownPods ThrottlePods, totalReleasedResource *ReleaseResource) (errPodKeys []string, released ReleaseResource) {
Expand All @@ -43,15 +43,15 @@ func throttleOnePodCpu(ctx *ExecuteContext, index int, ThrottleDownPods Throttle

// Throttle for CPU metrics

klog.V(6).Infof("index %d, containerusage is %#v", index, ThrottleDownPods[index].ContainerCPUUsages)
klog.V(6).Infof("index %d, ContainerCPUUsages is %#v", index, ThrottleDownPods[index].ContainerCPUUsages)

for _, v := range ThrottleDownPods[index].ContainerCPUUsages {
// pause container to skip
// skip pause container
if v.ContainerName == "" {
continue
}

klog.V(4).Infof("ThrottleExecutor begin to avoid container %s/%s", klog.KObj(pod), v.ContainerName)
klog.V(4).Infof("Begin to avoid container %s/%s", klog.KObj(pod), v.ContainerName)

containerCPUQuota, err := podinfo.GetUsageById(ThrottleDownPods[index].ContainerCPUQuotas, v.ContainerId)
if err != nil {
Expand Down Expand Up @@ -102,7 +102,7 @@ func throttleOnePodCpu(ctx *ExecuteContext, index int, ThrottleDownPods Throttle
klog.V(4).Infof("ThrottleExecutor avoid pod %s, container %s, set cpu quota %.2f.",
klog.KObj(pod), v.ContainerName, containerCPUQuotaNew*containerCPUPeriod.Value)

released = ConstructCpuUsageRelease(ThrottleDownPods[index], containerCPUQuotaNew, v.Value)
released = releaseCPUUsage(ThrottleDownPods[index], containerCPUQuotaNew, v.Value)
klog.V(6).Infof("For pod %s, container %s, release %f cpu usage", ThrottleDownPods[index].Key.String(), container.Name, released[CpuUsage])

totalReleasedResource.Add(released)
Expand Down Expand Up @@ -187,7 +187,7 @@ func restoreOnePodCpu(ctx *ExecuteContext, index int, ThrottleUpPods ThrottlePod
}
klog.V(4).Infof("ThrottleExecutor restore pod %s, container %s, set cpu quota %.2f, .",
klog.KObj(pod), v.ContainerName, containerCPUQuotaNew*containerCPUPeriod.Value)
released = ConstructCpuUsageRelease(ThrottleUpPods[index], containerCPUQuotaNew, v.Value)
released = releaseCPUUsage(ThrottleUpPods[index], containerCPUQuotaNew, v.Value)
klog.V(6).Infof("For pod %s, container %s, restore %f cpu usage", ThrottleUpPods[index].Key, container.Name, released[CpuUsage])

totalReleasedResource.Add(released)
Expand All @@ -198,11 +198,11 @@ func restoreOnePodCpu(ctx *ExecuteContext, index int, ThrottleUpPods ThrottlePod
return
}

func evictOnePodCpu(wg *sync.WaitGroup, ctx *ExecuteContext, index int, totalReleasedResource *ReleaseResource, EvictPods EvictPods) (errPodKeys []string, released ReleaseResource) {
func evictPod(wg *sync.WaitGroup, ctx *ExecuteContext, index int, totalReleasedResource *ReleaseResource, EvictPods EvictPods) (errPodKeys []string, released ReleaseResource) {
wg.Add(1)

// Calculate release resources
released = ConstructCpuUsageRelease(EvictPods[index], 0.0, 0.0)
released = releaseCPUUsage(EvictPods[index], 0.0, 0.0)
totalReleasedResource.Add(released)

go func(evictPod podinfo.PodContext) {
Expand All @@ -227,7 +227,7 @@ func evictOnePodCpu(wg *sync.WaitGroup, ctx *ExecuteContext, index int, totalRel
return
}

func ConstructCpuUsageRelease(pod podinfo.PodContext, containerCPUQuotaNew, currentContainerCpuUsage float64) ReleaseResource {
func releaseCPUUsage(pod podinfo.PodContext, containerCPUQuotaNew, currentContainerCpuUsage float64) ReleaseResource {
if pod.ActionType == podinfo.Evict {
return ReleaseResource{
CpuUsage: pod.PodCPUUsage * CpuQuotaCoefficient,
Expand Down
31 changes: 15 additions & 16 deletions pkg/ensurance/executor/evict.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,32 +60,32 @@ func (e *EvictExecutor) Avoid(ctx *ExecuteContext) error {
Then evict sorted pods one by one util there is no gap to watermark
*/

metricsEvictQuantified, MetricsNotEvcitQuantified := e.EvictWatermark.DivideMetricsByEvictQuantified()
quantified, notQuantified := e.EvictWatermark.DivideMetricsByEvictQuantified()

// There is a metric that can't be EvictQuantified, so evict all selected pods
if len(MetricsNotEvcitQuantified) != 0 {
if len(notQuantified) != 0 {
klog.V(6).Info("There is a metric that can't be EvcitQuantified")

highestPriorityMetric := e.EvictWatermark.GetHighestPriorityEvictAbleMetric()
highestPriorityMetric := e.EvictWatermark.GetHighestPriorityEvictableMetric()
if highestPriorityMetric != "" {
klog.V(6).Infof("The highestPriorityMetric is %s", highestPriorityMetric)
errPodKeys = e.evictPods(ctx, &totalReleased, highestPriorityMetric)
}
} else {
_, _, ctx.EvictGapToWatermarks = buildGapToWatermark(ctx.stateMap, ThrottleExecutor{}, *e, ctx.executeExcessPercent)
ctx.ToBeEvict = calculateGaps(ctx.stateMap, nil, e, ctx.executeExcessPercent)

if ctx.EvictGapToWatermarks.HasUsageMissedMetric() {
if ctx.ToBeEvict.HasUsageMissedMetric() {
klog.V(6).Infof("There is a metric usage missed")
highestPriorityMetric := e.EvictWatermark.GetHighestPriorityEvictAbleMetric()
highestPriorityMetric := e.EvictWatermark.GetHighestPriorityEvictableMetric()
if highestPriorityMetric != "" {
errPodKeys = e.evictPods(ctx, &totalReleased, highestPriorityMetric)
}
} else {
// The metrics in EvictGapToWatermarks are can be EvictQuantified and has current usage, then evict precisely
// The metrics in ToBeEvict are can be EvictQuantified and has current usage, then evict precisely
var released ReleaseResource
wg := sync.WaitGroup{}
for _, m := range metricsEvictQuantified {
klog.V(6).Infof("Evict precisely on metric %s", m)
for _, m := range quantified {
klog.V(6).Infof("Evict precisely on metric %s, and current gaps are %+v", m, ctx.ToBeEvict)
if metricMap[m].Sortable {
metricMap[m].SortFunc(e.EvictPods)
} else {
Expand All @@ -96,16 +96,15 @@ func (e *EvictExecutor) Avoid(ctx *ExecuteContext) error {
for _, pc := range e.EvictPods {
klog.V(6).Info(pc.Key.String())
}

for !ctx.EvictGapToWatermarks.TargetGapsRemoved(m) {
klog.V(6).Infof("For metric %s, there is still gap to watermarks: %f", m, ctx.EvictGapToWatermarks[m])
if podinfo.HasNoExecutedPod(e.EvictPods) {
index := podinfo.GetFirstNoExecutedPod(e.EvictPods)
for !ctx.ToBeEvict.TargetGapsRemoved(m) {
klog.V(2).Infof("For metric %s, there is more gap to watermarks: %f of %s", m, ctx.ToBeEvict[m], m)
if podinfo.ContainsPendingPod(e.EvictPods) {
index := podinfo.GetFirstPendingPod(e.EvictPods)
errKeys, released = metricMap[m].EvictFunc(&wg, ctx, index, &totalReleased, e.EvictPods)
errPodKeys = append(errPodKeys, errKeys...)
klog.V(6).Infof("Evict pods %s, released %f resource", e.EvictPods[index].Key, released[m])
klog.Warningf("Evicted pods %s, released %f of %s", e.EvictPods[index].Key, released[m], m)
e.EvictPods[index].Executed = true
ctx.EvictGapToWatermarks[m] -= released[m]
ctx.ToBeEvict[m] -= released[m]
} else {
klog.V(6).Info("There is no pod that can be evicted")
break
Expand Down
6 changes: 3 additions & 3 deletions pkg/ensurance/executor/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ type ExecuteContext struct {

// Gap for metrics Evictable/ThrottleAble
// Key is the metric name, value is (actual used)-(the lowest watermark for NodeQOSEnsurancePolicies which use throttleDown action)
ThrottoleDownGapToWatermarks GapToWatermarks
ToBeThrottleDown Gaps
// Key is the metric name, value is (actual used)-(the lowest watermark for NodeQOSEnsurancePolicies which use throttleUp action)
ThrottoleUpGapToWatermarks GapToWatermarks
TOBEThrottleUp Gaps
// key is the metric name, value is (actual used)-(the lowest watermark for NodeQOSEnsurancePolicies which use evict action)
EvictGapToWatermarks GapToWatermarks
ToBeEvict Gaps

stateMap map[string][]common.TimeSeries

Expand Down
18 changes: 0 additions & 18 deletions pkg/ensurance/executor/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,3 @@ var metricMap = make(map[WatermarkMetric]metric)
func registerMetricMap(m metric) {
metricMap[m.Name] = m
}

func GetThrottleAbleMetricName() (throttleAbleMetricList []WatermarkMetric) {
for _, m := range metricMap {
if m.Throttleable {
throttleAbleMetricList = append(throttleAbleMetricList, m.Name)
}
}
return
}

func GetEvictAbleMetricName() (evictAbleMetricList []WatermarkMetric) {
for _, m := range metricMap {
if m.Evictable {
evictAbleMetricList = append(evictAbleMetricList, m.Name)
}
}
return
}
4 changes: 2 additions & 2 deletions pkg/ensurance/executor/podinfo/pod_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ type PodContext struct {
Executed bool
}

func HasNoExecutedPod(pods []PodContext) bool {
func ContainsPendingPod(pods []PodContext) bool {
for _, p := range pods {
if p.Executed == false {
return true
Expand All @@ -103,7 +103,7 @@ func HasNoExecutedPod(pods []PodContext) bool {
return false
}

func GetFirstNoExecutedPod(pods []PodContext) int {
func GetFirstPendingPod(pods []PodContext) int {
for index, p := range pods {
if p.Executed == false {
return index
Expand Down
26 changes: 13 additions & 13 deletions pkg/ensurance/executor/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,17 @@ func (t *ThrottleExecutor) Avoid(ctx *ExecuteContext) error {
errPodKeys = t.throttlePods(ctx, &totalReleased, highestPriorityMetric)
}
} else {
ctx.ThrottoleDownGapToWatermarks, _, _ = buildGapToWatermark(ctx.stateMap, *t, EvictExecutor{}, ctx.executeExcessPercent)
ctx.ToBeThrottleDown = calculateGaps(ctx.stateMap, t, nil, ctx.executeExcessPercent)

if ctx.ThrottoleDownGapToWatermarks.HasUsageMissedMetric() {
if ctx.ToBeThrottleDown.HasUsageMissedMetric() {
klog.V(6).Info("There is a metric usage missed")
// todo remove highest priority
highestPriorityMetric := t.ThrottleDownWatermark.GetHighestPriorityThrottleAbleMetric()
if highestPriorityMetric != "" {
errPodKeys = t.throttlePods(ctx, &totalReleased, highestPriorityMetric)
}
} else {
// The metrics in ThrottoleDownGapToWatermarks are all in WatermarkMetricsCanBeQuantified and has current usage, then throttle precisely
// The metrics in ToBeThrottleDown are all in WatermarkMetricsCanBeQuantified and has current usage, then throttle precisely
var released ReleaseResource
for _, m := range metricsThrottleQuantified {
klog.V(6).Infof("ThrottleDown precisely on metric %s", m)
Expand All @@ -111,14 +111,14 @@ func (t *ThrottleExecutor) Avoid(ctx *ExecuteContext) error {
klog.V(6).Info(pc.Key.String(), pc.ContainerCPUUsages)
}

for index := 0; !ctx.ThrottoleDownGapToWatermarks.TargetGapsRemoved(m) && index < len(t.ThrottleDownPods); index++ {
klog.V(6).Infof("For metric %s, there is still gap to watermarks: %f", m, ctx.ThrottoleDownGapToWatermarks[m])
for index := 0; !ctx.ToBeThrottleDown.TargetGapsRemoved(m) && index < len(t.ThrottleDownPods); index++ {
klog.V(6).Infof("For metric %s, there is still gap to watermarks: %f", m, ctx.ToBeThrottleDown[m])

errKeys, released = metricMap[m].ThrottleFunc(ctx, index, t.ThrottleDownPods, &totalReleased)
klog.V(6).Infof("ThrottleDown pods %s, released %f resource", t.ThrottleDownPods[index].Key, released[m])
errPodKeys = append(errPodKeys, errKeys...)

ctx.ThrottoleDownGapToWatermarks[m] -= released[m]
ctx.ToBeThrottleDown[m] -= released[m]
}
}
}
Expand All @@ -144,7 +144,7 @@ func (t *ThrottleExecutor) Restore(ctx *ExecuteContext) error {
metrics.UpdateLastTimeWithSubComponent(string(known.ModuleActionExecutor), string(metrics.SubComponentThrottle), metrics.StepRestore, start)
defer metrics.UpdateDurationFromStartWithSubComponent(string(known.ModuleActionExecutor), string(metrics.SubComponentThrottle), metrics.StepRestore, start)

klog.V(6).Info("ThrottleExecutor restore, %v", *t)
klog.V(6).Infof("ThrottleExecutor restore, %v", *t)

if len(t.ThrottleUpPods) == 0 {
metrics.UpdateExecutorStatus(metrics.SubComponentThrottle, metrics.StepRestore, 0)
Expand Down Expand Up @@ -177,16 +177,16 @@ func (t *ThrottleExecutor) Restore(ctx *ExecuteContext) error {
errPodKeys = t.restorePods(ctx, &totalReleased, highestPrioriyMetric)
}
} else {
_, ctx.ThrottoleUpGapToWatermarks, _ = buildGapToWatermark(ctx.stateMap, *t, EvictExecutor{}, ctx.executeExcessPercent)
ctx.TOBEThrottleUp = calculateGaps(ctx.stateMap, t, nil, ctx.executeExcessPercent)

if ctx.ThrottoleUpGapToWatermarks.HasUsageMissedMetric() {
if ctx.TOBEThrottleUp.HasUsageMissedMetric() {
klog.V(6).Info("There is a metric usage missed")
highestPrioriyMetric := t.ThrottleUpWatermark.GetHighestPriorityThrottleAbleMetric()
if highestPrioriyMetric != "" {
errPodKeys = t.restorePods(ctx, &totalReleased, highestPrioriyMetric)
}
} else {
// The metrics in ThrottoleUpGapToWatermarks are all in WatermarkMetricsCanBeQuantified and has current usage, then throttle precisely
// The metrics in TOBEThrottleUp are all in WatermarkMetricsCanBeQuantified and has current usage, then throttle precisely
var released ReleaseResource
for _, m := range metricsThrottleQuantified {
klog.V(6).Infof("ThrottleUp precisely on metric %s", m)
Expand All @@ -202,14 +202,14 @@ func (t *ThrottleExecutor) Restore(ctx *ExecuteContext) error {
klog.V(6).Info(pc.Key.String())
}

for index := 0; !ctx.ThrottoleUpGapToWatermarks.TargetGapsRemoved(m) && index < len(t.ThrottleUpPods); index++ {
klog.V(6).Infof("For metric %s, there is still gap to watermarks: %f", m, ctx.ThrottoleUpGapToWatermarks[m])
for index := 0; !ctx.TOBEThrottleUp.TargetGapsRemoved(m) && index < len(t.ThrottleUpPods); index++ {
klog.V(6).Infof("For metric %s, there is still gap to watermarks: %f", m, ctx.TOBEThrottleUp[m])

errKeys, released = metricMap[m].RestoreFunc(ctx, index, t.ThrottleUpPods, &totalReleased)
klog.V(6).Infof("ThrottleUp pods %s, released %f resource", t.ThrottleUpPods[index].Key, released[m])
errPodKeys = append(errPodKeys, errKeys...)

ctx.ThrottoleUpGapToWatermarks[m] -= released[m]
ctx.TOBEThrottleUp[m] -= released[m]
}
}
}
Expand Down
Loading

0 comments on commit 0dea480

Please sign in to comment.