From 5decc43a6cc4080e6d937076ed266d7b46a2c5d2 Mon Sep 17 00:00:00 2001 From: chenkaiyue Date: Thu, 9 Jun 2022 15:46:00 +0800 Subject: [PATCH] add log and debug --- pkg/agent/agent.go | 2 +- pkg/ensurance/analyzer/analyzer.go | 23 +++- .../collector/cadvisor/cadvisor_linux.go | 18 ++- pkg/ensurance/collector/collector.go | 23 ++-- .../collector/nodelocal/nodelocal.go | 2 +- pkg/ensurance/executor/cpu_usage.go | 43 ++++--- pkg/ensurance/executor/evict.go | 23 +++- pkg/ensurance/executor/executor.go | 8 +- pkg/ensurance/executor/interface.go | 3 +- pkg/ensurance/executor/metric.go | 3 +- pkg/ensurance/executor/pod-info/pod_info.go | 6 +- pkg/ensurance/executor/release_resource.go | 3 + pkg/ensurance/executor/throttle.go | 66 +++++++--- pkg/ensurance/executor/waterline.go | 121 ++++++++++-------- pkg/utils/ref.go | 2 +- 15 files changed, 218 insertions(+), 128 deletions(-) diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 2c9a34915..3af477e15 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -89,7 +89,7 @@ func NewAgent(ctx context.Context, managers = appendManagerIfNotNil(managers, stateCollector) analyzerManager := analyzer.NewAnormalyAnalyzer(kubeClient, nodeName, podInformer, nodeInformer, nepInformer, actionInformer, stateCollector.AnalyzerChann, noticeCh) managers = appendManagerIfNotNil(managers, analyzerManager) - avoidanceManager := executor.NewActionExecutor(kubeClient, nodeName, podInformer, nodeInformer, noticeCh, runtimeEndpoint, stateCollector.GetStateFunc(), executeExcess) + avoidanceManager := executor.NewActionExecutor(kubeClient, nodeName, podInformer, nodeInformer, noticeCh, runtimeEndpoint, stateCollector.State, executeExcess) managers = appendManagerIfNotNil(managers, avoidanceManager) if nodeResource := utilfeature.DefaultFeatureGate.Enabled(features.CraneNodeResource); nodeResource { diff --git a/pkg/ensurance/analyzer/analyzer.go b/pkg/ensurance/analyzer/analyzer.go index d80659fc8..14dd374b2 100644 --- a/pkg/ensurance/analyzer/analyzer.go +++ b/pkg/ensurance/analyzer/analyzer.go @@ -180,7 +180,7 @@ func (s *AnormalyAnalyzer) Analyze(state map[string][]common.TimeSeries) { } } - klog.V(6).Infof("Analyze actionContexts: %v", actionContexts) + klog.V(6).Infof("Analyze actionContexts: %#v", actionContexts) //step 3 : merge avoidanceAction := s.merge(state, avoidanceMaps, actionContexts) @@ -208,7 +208,7 @@ func (s *AnormalyAnalyzer) trigger(series []common.TimeSeries, object ensurancea for _, ts := range series { triggered = s.evaluator.EvalWithMetric(object.MetricRule.Name, float64(object.MetricRule.Value.Value()), ts.Samples[0].Value) - klog.V(6).Infof("Anormaly detection result %v, Name: %s, Value: %.2f, %s/%s", triggered, + klog.V(4).Infof("Anormaly detection result %v, Name: %s, Value: %.2f, %s/%s", triggered, object.MetricRule.Name, ts.Samples[0].Value, common.GetValueByName(ts.Labels, common.LabelNamePodNamespace), @@ -238,7 +238,7 @@ func (s *AnormalyAnalyzer) analyze(key string, object ensuranceapi.ObjectiveEnsu //step2: check if triggered for NodeQOSEnsurance threshold := s.trigger(series, object) - klog.V(4).Infof("for NodeQOS %s, metrics reach the threshold: %v", key, threshold) + klog.V(4).Infof("For NodeQOS %s, metrics reach the threshold: %v", key, threshold) //step3: check is triggered action or restored, set the detection s.computeActionContext(threshold, key, object, &ac) @@ -314,6 +314,9 @@ func (s *AnormalyAnalyzer) merge(stateMap map[string][]common.TimeSeries, avoida combineEvictDuplicate(&ae.EvictExecutor, evictPods) } } + ae.StateMap = stateMap + + klog.V(6).Infof("ThrottleExecutor is %#v, EvictExecutor is %#v", ae.ThrottleExecutor, ae.EvictExecutor) return ae } @@ -495,8 +498,8 @@ func combineThrottleDuplicate(e *executor.ThrottleExecutor, throttlePods, thrott } } } - for _, t := range throttleUpPods { + for _, t := range throttleUpPods { if i := e.ThrottleUpPods.Find(t.PodKey); i == -1 { e.ThrottleUpPods = append(e.ThrottleUpPods, t) } else { @@ -543,6 +546,10 @@ func combineThrottleWaterLine(e *executor.ThrottleExecutor, ac ecache.ActionCont } } + for waterLineMetric, waterlines := range e.ThrottleDownWaterLine { + klog.V(6).Infof("ThrottleDownWaterLine info: metric: %s, value: %#v", waterLineMetric, waterlines) + } + if enableSchedule && ac.Restored { for _, ensurance := range ac.Nep.Spec.ObjectiveEnsurances { if ensurance.Name == ac.ObjectiveEnsuranceName { @@ -556,6 +563,10 @@ func combineThrottleWaterLine(e *executor.ThrottleExecutor, ac ecache.ActionCont } } } + + for waterLineMetric, waterlines := range e.ThrottleUpWaterLine { + klog.V(6).Infof("ThrottleUpWaterLine info: metric: %s, value: %#v", waterLineMetric, waterlines) + } } func combineEvictWaterLine(e *executor.EvictExecutor, ac ecache.ActionContext) { @@ -571,5 +582,9 @@ func combineEvictWaterLine(e *executor.EvictExecutor, ac ecache.ActionContext) { heap.Push(e.EvictWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)], ensurance.MetricRule.Value) } } + + for waterLineMetric, waterlines := range e.EvictWaterLine { + klog.V(6).Infof("EvictWaterLine info: metric: %s, value: %#v", waterLineMetric, waterlines) + } } } diff --git a/pkg/ensurance/collector/cadvisor/cadvisor_linux.go b/pkg/ensurance/collector/cadvisor/cadvisor_linux.go index 2c3fe7805..ad97e844b 100644 --- a/pkg/ensurance/collector/cadvisor/cadvisor_linux.go +++ b/pkg/ensurance/collector/cadvisor/cadvisor_linux.go @@ -4,7 +4,9 @@ package cadvisor import ( + "math" "net/http" + "reflect" "strconv" "time" @@ -127,6 +129,16 @@ func (c *CadvisorCollector) Collect() (map[string][]common.TimeSeries, error) { for key, v := range containers { containerId := utils.GetContainerIdFromKey(key) containerName := utils.GetContainerNameFromPod(pod, containerId) + klog.V(6).Infof("Key is %s, containerId is %s, containerName is %s", key, containerId, containerName) + + if reflect.DeepEqual(cadvisorapiv2.ContainerInfo{}, v) { + klog.Warning("ContainerInfo is nil") + } else { + if len(v.Stats) == 0 { + klog.Warning("ContainerInfo.Stats is empty") + } + } + // Filter the sandbox container if (containerId != "") && (containerName == "") { continue @@ -145,10 +157,11 @@ func (c *CadvisorCollector) Collect() (map[string][]common.TimeSeries, error) { } if state, ok := c.latestContainersStates[key]; ok { + klog.V(8).Infof("For key %s, LatestContainersStates exist", key) var containerLabels = GetContainerLabels(pod, containerId, containerName, hasExtRes) cpuUsageSample, schedRunqueueTime := caculateCPUUsage(&v, &state) - if cpuUsageSample == 0 && schedRunqueueTime == 0 { + if cpuUsageSample == 0 && schedRunqueueTime == 0 || math.IsNaN(cpuUsageSample) { continue } if hasExtRes { @@ -160,7 +173,7 @@ func (c *CadvisorCollector) Collect() (map[string][]common.TimeSeries, error) { addSampleToStateMap(types.MetricNameContainerCpuQuota, composeSample(containerLabels, float64(containerInfoV1.Spec.Cpu.Quota), now), stateMap) addSampleToStateMap(types.MetricNameContainerCpuPeriod, composeSample(containerLabels, float64(containerInfoV1.Spec.Cpu.Period), now), stateMap) - klog.V(10).Infof("Pod: %s, containerName: %s, key %s, scheduler run queue time %.2f", klog.KObj(pod), containerName, key, schedRunqueueTime) + klog.V(8).Infof("Pod: %s, containerName: %s, key %s, scheduler run queue time %.2f, container_cpu_total_usage %#v", klog.KObj(pod), containerName, key, schedRunqueueTime, cpuUsageSample) } containerStates[key] = ContainerState{stat: v, timestamp: now} } @@ -202,6 +215,7 @@ func caculateCPUUsage(info *cadvisorapiv2.ContainerInfo, state *ContainerState) cpuUsageIncrease := info.Stats[0].Cpu.Usage.Total - state.stat.Stats[0].Cpu.Usage.Total schedRunqueueTimeIncrease := info.Stats[0].Cpu.Schedstat.RunqueueTime - state.stat.Stats[0].Cpu.Schedstat.RunqueueTime timeIncrease := info.Stats[0].Timestamp.UnixNano() - state.stat.Stats[0].Timestamp.UnixNano() + cpuUsageSample := float64(cpuUsageIncrease) / float64(timeIncrease) schedRunqueueTime := float64(schedRunqueueTimeIncrease) * 1000 * 1000 / float64(timeIncrease) return cpuUsageSample, schedRunqueueTime diff --git a/pkg/ensurance/collector/collector.go b/pkg/ensurance/collector/collector.go index 528281504..e21024d0d 100644 --- a/pkg/ensurance/collector/collector.go +++ b/pkg/ensurance/collector/collector.go @@ -36,7 +36,7 @@ type StateCollector struct { AnalyzerChann chan map[string][]common.TimeSeries NodeResourceChann chan map[string][]common.TimeSeries PodResourceChann chan map[string][]common.TimeSeries - state map[string][]common.TimeSeries + State map[string][]common.TimeSeries rw sync.RWMutex } @@ -45,6 +45,7 @@ func NewStateCollector(nodeName string, nepLister ensuranceListers.NodeQOSEnsura analyzerChann := make(chan map[string][]common.TimeSeries) nodeResourceChann := make(chan map[string][]common.TimeSeries) podResourceChann := make(chan map[string][]common.TimeSeries) + State := make(map[string][]common.TimeSeries) return &StateCollector{ nodeName: nodeName, nepLister: nepLister, @@ -59,6 +60,7 @@ func NewStateCollector(nodeName string, nepLister ensuranceListers.NodeQOSEnsura collectors: &sync.Map{}, cadvisorManager: manager, exclusiveCPUSet: exclusiveCPUSet, + State: State, } } @@ -113,8 +115,6 @@ func (s *StateCollector) Collect(waterLine bool) { wg := sync.WaitGroup{} start := time.Now() - var data = make(map[string][]common.TimeSeries) - s.collectors.Range(func(key, value interface{}) bool { c := value.(Collector) @@ -132,26 +132,21 @@ func (s *StateCollector) Collect(waterLine bool) { } s.rw.Unlock() } - }(c, data) + }(c, s.State) return true }) wg.Wait() - // If Collect is not called by waterline related logic but StateCollector.Run, AnalyzerChann should not get update, which will trigger recursive analyzes and executes - if !waterLine { - s.AnalyzerChann <- data - } - - s.state = data + s.AnalyzerChann <- s.State if nodeResource := utilfeature.DefaultFeatureGate.Enabled(features.CraneNodeResource); nodeResource { - s.NodeResourceChann <- data + s.NodeResourceChann <- s.State } if podResource := utilfeature.DefaultFeatureGate.Enabled(features.CranePodResource); podResource { - s.PodResourceChann <- data + s.PodResourceChann <- s.State } } @@ -255,8 +250,6 @@ func CheckMetricNameExist(name string) bool { func (s *StateCollector) GetStateFunc() func() map[string][]common.TimeSeries { return func() map[string][]common.TimeSeries { s.Collect(true) - s.rw.RLock() - defer s.rw.RUnlock() - return s.state + return s.State } } diff --git a/pkg/ensurance/collector/nodelocal/nodelocal.go b/pkg/ensurance/collector/nodelocal/nodelocal.go index d4857d752..c11092864 100644 --- a/pkg/ensurance/collector/nodelocal/nodelocal.go +++ b/pkg/ensurance/collector/nodelocal/nodelocal.go @@ -81,7 +81,7 @@ func (n *NodeLocal) Collect() (map[string][]common.TimeSeries, error) { } } - klog.V(10).Info("Node local collecting, status: %v", status) + klog.V(6).Info("Node local collecting, status: %#v", status) return status, nil } diff --git a/pkg/ensurance/executor/cpu_usage.go b/pkg/ensurance/executor/cpu_usage.go index 69bff0d27..1d890b00b 100644 --- a/pkg/ensurance/executor/cpu_usage.go +++ b/pkg/ensurance/executor/cpu_usage.go @@ -43,13 +43,15 @@ func throttleOnePodCpu(ctx *ExecuteContext, index int, ThrottleDownPods Throttle // Throttle for CPU metrics + klog.V(6).Infof("index %d, containerusage is %#v", index, ThrottleDownPods[index].ContainerCPUUsages) + for _, v := range ThrottleDownPods[index].ContainerCPUUsages { // pause container to skip if v.ContainerName == "" { continue } - klog.V(4).Infof("ThrottleExecutor1 avoid container %s/%s", klog.KObj(pod), v.ContainerName) + klog.V(4).Infof("ThrottleExecutor begin to avoid container %s/%s", klog.KObj(pod), v.ContainerName) containerCPUQuota, err := podinfo.GetUsageById(ThrottleDownPods[index].ContainerCPUQuotas, v.ContainerId) if err != nil { @@ -88,8 +90,8 @@ func throttleOnePodCpu(ctx *ExecuteContext, index int, ThrottleDownPods Throttle } } - klog.V(6).Infof("Prior update container resources containerCPUQuotaNew %.2f, containerCPUQuota.Value %.2f,containerCPUPeriod %.2f", - containerCPUQuotaNew, containerCPUQuota.Value, containerCPUPeriod.Value) + klog.V(6).Infof("Prior update container resources containerCPUQuotaNew %.2f, containerCPUQuota.Value %.2f,containerCPUPeriod %.2f,ContainerCPUUsages %.2f", + containerCPUQuotaNew, containerCPUQuota.Value, containerCPUPeriod.Value, v.Value) if !utils.AlmostEqual(containerCPUQuotaNew*containerCPUPeriod.Value, containerCPUQuota.Value) { err = cruntime.UpdateContainerResources(ctx.RuntimeClient, v.ContainerId, cruntime.UpdateOptions{CPUQuota: int64(containerCPUQuotaNew * containerCPUPeriod.Value)}) @@ -98,11 +100,14 @@ func throttleOnePodCpu(ctx *ExecuteContext, index int, ThrottleDownPods Throttle continue } else { klog.V(4).Infof("ThrottleExecutor avoid pod %s, container %s, set cpu quota %.2f.", - klog.KObj(pod), v.ContainerName, containerCPUQuotaNew) + klog.KObj(pod), v.ContainerName, containerCPUQuotaNew*containerCPUPeriod.Value) + + released = ConstructCpuUsageRelease(ThrottleDownPods[index], containerCPUQuotaNew, v.Value) + klog.V(6).Infof("For pod %s, container %s, release %f cpu usage", ThrottleDownPods[index].PodKey.String(), container.Name, released[CpuUsage]) + + totalReleasedResource.Add(released) } } - released = ConstructCpuUsageRelease(ThrottleDownPods[index], containerCPUQuotaNew, v.Value) - totalReleasedResource.Add(released) } return } @@ -164,8 +169,8 @@ func restoreOnePodCpu(ctx *ExecuteContext, index int, ThrottleUpPods ThrottlePod } - klog.V(6).Infof("Prior update container resources containerCPUQuotaNew %.2f,containerCPUQuota %.2f,containerCPUPeriod %.2f", - klog.KObj(pod), containerCPUQuotaNew, containerCPUQuota.Value, containerCPUPeriod.Value) + klog.V(6).Infof("Prior update container resources containerCPUQuotaNew %.2f,containerCPUQuota %.2f,containerCPUPeriod %.2f,ContainerCPUUsages %.2f", + containerCPUQuotaNew, containerCPUQuota.Value, containerCPUPeriod.Value, v.Value) if !utils.AlmostEqual(containerCPUQuotaNew*containerCPUPeriod.Value, containerCPUQuota.Value) { @@ -182,10 +187,14 @@ func restoreOnePodCpu(ctx *ExecuteContext, index int, ThrottleUpPods ThrottlePod errPodKeys = append(errPodKeys, fmt.Sprintf("Failed to updateResource, err %s", err.Error()), ThrottleUpPods[index].PodKey.String()) continue } + klog.V(4).Infof("ThrottleExecutor restore pod %s, container %s, set cpu quota %.2f, .", + klog.KObj(pod), v.ContainerName, containerCPUQuotaNew*containerCPUPeriod.Value) + released = ConstructCpuUsageRelease(ThrottleUpPods[index], containerCPUQuotaNew, v.Value) + klog.V(6).Infof("For pod %s, container %s, restore %f cpu usage", ThrottleUpPods[index].PodKey, container.Name, released[CpuUsage]) + + totalReleasedResource.Add(released) } } - released = ConstructCpuUsageRelease(ThrottleUpPods[index], containerCPUQuotaNew, v.Value) - totalReleasedResource.Add(released) } return @@ -194,6 +203,10 @@ func restoreOnePodCpu(ctx *ExecuteContext, index int, ThrottleUpPods ThrottlePod func evictOnePodCpu(wg *sync.WaitGroup, ctx *ExecuteContext, index int, totalReleasedResource *ReleaseResource, EvictPods EvictPods) (errPodKeys []string, released ReleaseResource) { wg.Add(1) + // Calculate release resources + released = ConstructCpuUsageRelease(EvictPods[index], 0.0, 0.0) + totalReleasedResource.Add(released) + go func(evictPod podinfo.PodContext) { defer wg.Done() @@ -213,10 +226,6 @@ func evictOnePodCpu(wg *sync.WaitGroup, ctx *ExecuteContext, index int, totalRel metrics.ExecutorEvictCountsInc() klog.V(4).Infof("Pod %s is evicted", klog.KObj(pod)) - - // Calculate release resources - released = ConstructCpuUsageRelease(evictPod, 0.0, 0.0) - totalReleasedResource.Add(released) }(EvictPods[index]) return } @@ -224,11 +233,11 @@ func evictOnePodCpu(wg *sync.WaitGroup, ctx *ExecuteContext, index int, totalRel func ConstructCpuUsageRelease(pod podinfo.PodContext, containerCPUQuotaNew, currentContainerCpuUsage float64) ReleaseResource { if pod.PodType == podinfo.Evict { return ReleaseResource{ - CpuUsage: pod.PodCPUUsage, + CpuUsage: pod.PodCPUUsage * CpuQuotaCoefficient, } } if pod.PodType == podinfo.ThrottleDown { - reduction := currentContainerCpuUsage - containerCPUQuotaNew + reduction := (currentContainerCpuUsage - containerCPUQuotaNew) * CpuQuotaCoefficient if reduction > 0 { return ReleaseResource{ CpuUsage: reduction, @@ -237,7 +246,7 @@ func ConstructCpuUsageRelease(pod podinfo.PodContext, containerCPUQuotaNew, curr return ReleaseResource{} } if pod.PodType == podinfo.ThrottleUp { - reduction := containerCPUQuotaNew - currentContainerCpuUsage + reduction := (containerCPUQuotaNew - currentContainerCpuUsage) * CpuQuotaCoefficient if reduction > 0 { return ReleaseResource{ CpuUsage: reduction, diff --git a/pkg/ensurance/executor/evict.go b/pkg/ensurance/executor/evict.go index 55583490f..181fde695 100644 --- a/pkg/ensurance/executor/evict.go +++ b/pkg/ensurance/executor/evict.go @@ -50,7 +50,7 @@ func (e *EvictExecutor) Avoid(ctx *ExecuteContext) error { var errPodKeys, errKeys []string // TODO: totalReleasedResource used for prom metrics - var totalReleased ReleaseResource + totalReleased := ReleaseResource{} /* The step to evict: 1. If EvictWaterLine has metrics that can't be quantified, select a evictable metric which has the highest action priority, use its EvictFunc to evict all selected pods, then return @@ -62,39 +62,54 @@ func (e *EvictExecutor) Avoid(ctx *ExecuteContext) error { metricsEvictQuantified, MetricsNotEvcitQuantified := e.EvictWaterLine.DivideMetricsByEvictQuantified() - // There is a metric that can't be ThrottleQuantified, so throttle all selected pods + // There is a metric that can't be EvictQuantified, so evict all selected pods if len(MetricsNotEvcitQuantified) != 0 { + klog.V(6).Info("There is a metric that can't be EvcitQuantified") + highestPrioriyMetric := e.EvictWaterLine.GetHighestPriorityEvictAbleMetric() if highestPrioriyMetric != "" { + klog.V(6).Infof("The highestPrioriyMetric is %s", highestPrioriyMetric) errPodKeys = e.evictPods(ctx, &totalReleased, highestPrioriyMetric) } } else { - _, _, ctx.EvictGapToWaterLines = buildGapToWaterLine(ctx.getStateFunc(), ThrottleExecutor{}, *e, ctx.executeExcessPercent) + _, _, ctx.EvictGapToWaterLines = buildGapToWaterLine(ctx.stateMap, ThrottleExecutor{}, *e, ctx.executeExcessPercent) if ctx.EvictGapToWaterLines.HasUsageMissedMetric() { + klog.V(6).Infof("There is a metric usage missed") highestPrioriyMetric := e.EvictWaterLine.GetHighestPriorityEvictAbleMetric() if highestPrioriyMetric != "" { errPodKeys = e.evictPods(ctx, &totalReleased, highestPrioriyMetric) } } else { - // The metrics in ThrottoleDownGapToWaterLines are all in WaterLineMetricsCanBeQuantified and has current usage, then throttle precisely + // The metrics in EvictGapToWaterLines are can be EvictQuantified and has current usage, then evict precisely var released ReleaseResource wg := sync.WaitGroup{} for _, m := range metricsEvictQuantified { + klog.V(6).Infof("Evict precisely on metric %s", m) if MetricMap[m].SortAble { MetricMap[m].SortFunc(e.EvictPods) } else { execsort.GeneralSorter(e.EvictPods) } + klog.V(6).Info("After sort, the sequence to evict is ") + for _, pc := range e.EvictPods { + klog.V(6).Info(pc.PodKey.String()) + } + for !ctx.EvictGapToWaterLines.TargetGapsRemoved(m) { + klog.V(6).Infof("For metric %s, there is still gap to waterlines: %f", m, ctx.EvictGapToWaterLines[m]) if podinfo.HasNoExecutedPod(e.EvictPods) { index := podinfo.GetFirstNoExecutedPod(e.EvictPods) errKeys, released = MetricMap[m].EvictFunc(&wg, ctx, index, &totalReleased, e.EvictPods) errPodKeys = append(errPodKeys, errKeys...) + klog.V(6).Infof("Evict pods %s, released %f resource", e.EvictPods[index].PodKey, released[m]) e.EvictPods[index].HasBeenActioned = true ctx.EvictGapToWaterLines[m] -= released[m] + } else { + klog.V(6).Info("There is no pod that can be evicted") + break } } } diff --git a/pkg/ensurance/executor/executor.go b/pkg/ensurance/executor/executor.go index fa9f8a103..8dd5b7944 100644 --- a/pkg/ensurance/executor/executor.go +++ b/pkg/ensurance/executor/executor.go @@ -32,14 +32,14 @@ type ActionExecutor struct { runtimeClient pb.RuntimeServiceClient runtimeConn *grpc.ClientConn - getStateFunc func() map[string][]common.TimeSeries + stateMap map[string][]common.TimeSeries executeExcessPercent float64 } // NewActionExecutor create enforcer manager func NewActionExecutor(client clientset.Interface, nodeName string, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, - noticeCh <-chan AvoidanceExecutor, runtimeEndpoint string, getStateFunc func() map[string][]common.TimeSeries, executeExcess string) *ActionExecutor { + noticeCh <-chan AvoidanceExecutor, runtimeEndpoint string, stateMap map[string][]common.TimeSeries, executeExcess string) *ActionExecutor { runtimeClient, runtimeConn, err := cruntime.GetRuntimeClient(runtimeEndpoint) if err != nil { @@ -63,7 +63,7 @@ func NewActionExecutor(client clientset.Interface, nodeName string, podInformer nodeSynced: nodeInformer.Informer().HasSynced, runtimeClient: runtimeClient, runtimeConn: runtimeConn, - getStateFunc: getStateFunc, + stateMap: stateMap, executeExcessPercent: executeExcessPercent, } } @@ -117,7 +117,7 @@ func (a *ActionExecutor) execute(ae AvoidanceExecutor, _ <-chan struct{}) error NodeLister: a.nodeLister, RuntimeClient: a.runtimeClient, RuntimeConn: a.runtimeConn, - getStateFunc: a.getStateFunc, + stateMap: ae.StateMap, executeExcessPercent: a.executeExcessPercent, } diff --git a/pkg/ensurance/executor/interface.go b/pkg/ensurance/executor/interface.go index 748c2d0dc..b51016271 100644 --- a/pkg/ensurance/executor/interface.go +++ b/pkg/ensurance/executor/interface.go @@ -18,6 +18,7 @@ type AvoidanceExecutor struct { ScheduleExecutor ScheduleExecutor ThrottleExecutor ThrottleExecutor EvictExecutor EvictExecutor + StateMap map[string][]common.TimeSeries } type ExecuteContext struct { @@ -36,7 +37,7 @@ type ExecuteContext struct { // key is the metric name, value is (actual used)-(the lowest waterline for NodeQOSEnsurancePolicies which use evict action) EvictGapToWaterLines GapToWaterLines - getStateFunc func() map[string][]common.TimeSeries + stateMap map[string][]common.TimeSeries executeExcessPercent float64 } diff --git a/pkg/ensurance/executor/metric.go b/pkg/ensurance/executor/metric.go index d4aed516f..7c1cb972c 100644 --- a/pkg/ensurance/executor/metric.go +++ b/pkg/ensurance/executor/metric.go @@ -26,7 +26,8 @@ type metric struct { EvictAble bool EvictQuantified bool - EvictFunc func(wg *sync.WaitGroup, ctx *ExecuteContext, index int, totalReleasedResource *ReleaseResource, EvictPods EvictPods) (errPodKeys []string, released ReleaseResource) + // If use goroutine to evcit, make sure to calculate release resources outside the goroutine + EvictFunc func(wg *sync.WaitGroup, ctx *ExecuteContext, index int, totalReleasedResource *ReleaseResource, EvictPods EvictPods) (errPodKeys []string, released ReleaseResource) } var MetricMap = make(map[WaterLineMetric]metric) diff --git a/pkg/ensurance/executor/pod-info/pod_info.go b/pkg/ensurance/executor/pod-info/pod_info.go index dfe76720b..067181e86 100644 --- a/pkg/ensurance/executor/pod-info/pod_info.go +++ b/pkg/ensurance/executor/pod-info/pod_info.go @@ -137,8 +137,10 @@ func BuildPodBasicInfo(pod *v1.Pod, stateMap map[string][]common.TimeSeries, act podContext.ExtCpuBeUsed, podContext.ExtCpuLimit, podContext.ExtCpuRequest = utils.ExtResourceAllocated(pod, v1.ResourceCPU) podContext.StartTime = pod.Status.StartTime - podContext.CPUThrottle.MinCPURatio = uint64(action.Spec.Throttle.CPUThrottle.MinCPURatio) - podContext.CPUThrottle.StepCPURatio = uint64(action.Spec.Throttle.CPUThrottle.StepCPURatio) + if action.Spec.Throttle != nil { + podContext.CPUThrottle.MinCPURatio = uint64(action.Spec.Throttle.CPUThrottle.MinCPURatio) + podContext.CPUThrottle.StepCPURatio = uint64(action.Spec.Throttle.CPUThrottle.StepCPURatio) + } podContext.PodType = podType diff --git a/pkg/ensurance/executor/release_resource.go b/pkg/ensurance/executor/release_resource.go index 9466441c6..fe245b5ce 100644 --- a/pkg/ensurance/executor/release_resource.go +++ b/pkg/ensurance/executor/release_resource.go @@ -4,6 +4,9 @@ type ReleaseResource map[WaterLineMetric]float64 func (r ReleaseResource) Add(new ReleaseResource) { for metric, value := range new { + if _, ok := r[metric]; !ok { + r[metric] = 0.0 + } r[metric] += value } } diff --git a/pkg/ensurance/executor/throttle.go b/pkg/ensurance/executor/throttle.go index 9525b78dd..d38f04fa7 100644 --- a/pkg/ensurance/executor/throttle.go +++ b/pkg/ensurance/executor/throttle.go @@ -54,7 +54,7 @@ func (t *ThrottleExecutor) Avoid(ctx *ExecuteContext) error { metrics.UpdateLastTimeWithSubComponent(string(known.ModuleActionExecutor), string(metrics.SubComponentThrottle), metrics.StepAvoid, start) defer metrics.UpdateDurationFromStartWithSubComponent(string(known.ModuleActionExecutor), string(metrics.SubComponentThrottle), metrics.StepAvoid, start) - klog.V(6).Info("ThrottleExecutor avoid, %v", *t) + klog.V(6).Infof("ThrottleExecutor avoid, %#v", *t) if len(t.ThrottleDownPods) == 0 { metrics.UpdateExecutorStatus(metrics.SubComponentThrottle, metrics.StepAvoid, 0) @@ -65,7 +65,7 @@ func (t *ThrottleExecutor) Avoid(ctx *ExecuteContext) error { var errPodKeys, errKeys []string // TODO: totalReleasedResource used for prom metrics - var totalReleased ReleaseResource + totalReleased := ReleaseResource{} /* The step to throttle: 1. If ThrottleDownWaterLine has metrics that can't be quantified, select a throttleable metric which has the highest action priority, use its throttlefunc to throttle all ThrottleDownPods, then return @@ -78,14 +78,18 @@ func (t *ThrottleExecutor) Avoid(ctx *ExecuteContext) error { // There is a metric that can't be ThrottleQuantified, so throttle all selected pods if len(MetricsNotThrottleQuantified) != 0 { + klog.V(6).Info("ThrottleDown: There is a metric that can't be ThrottleQuantified") + highestPrioriyMetric := t.ThrottleDownWaterLine.GetHighestPriorityThrottleAbleMetric() if highestPrioriyMetric != "" { + klog.V(6).Infof("The highestPrioriyMetric is %s", highestPrioriyMetric) errPodKeys = t.throttlePods(ctx, &totalReleased, highestPrioriyMetric) } } else { - ctx.ThrottoleDownGapToWaterLines, _, _ = buildGapToWaterLine(ctx.getStateFunc(), *t, EvictExecutor{}, ctx.executeExcessPercent) + ctx.ThrottoleDownGapToWaterLines, _, _ = buildGapToWaterLine(ctx.stateMap, *t, EvictExecutor{}, ctx.executeExcessPercent) if ctx.ThrottoleDownGapToWaterLines.HasUsageMissedMetric() { + klog.V(6).Info("There is a metric usage missed") highestPrioriyMetric := t.ThrottleDownWaterLine.GetHighestPriorityThrottleAbleMetric() if highestPrioriyMetric != "" { errPodKeys = t.throttlePods(ctx, &totalReleased, highestPrioriyMetric) @@ -94,18 +98,28 @@ func (t *ThrottleExecutor) Avoid(ctx *ExecuteContext) error { // The metrics in ThrottoleDownGapToWaterLines are all in WaterLineMetricsCanBeQuantified and has current usage, then throttle precisely var released ReleaseResource for _, m := range metricsThrottleQuantified { + klog.V(6).Infof("ThrottleDown precisely on metric %s", m) if MetricMap[m].SortAble { MetricMap[m].SortFunc(t.ThrottleDownPods) } else { execsort.GeneralSorter(t.ThrottleDownPods) } - for !ctx.ThrottoleDownGapToWaterLines.TargetGapsRemoved(m) { - for index := range t.ThrottleDownPods { - errKeys, released = MetricMap[m].ThrottleFunc(ctx, index, t.ThrottleDownPods, &totalReleased) - errPodKeys = append(errPodKeys, errKeys...) - ctx.ThrottoleDownGapToWaterLines[m] -= released[m] - } + klog.V(6).Info("After sort, the sequence to throttle is ") + for _, pc := range t.ThrottleDownPods { + klog.V(6).Info(pc.PodKey.String(), pc.ContainerCPUUsages) + } + + index := 0 + for !ctx.ThrottoleDownGapToWaterLines.TargetGapsRemoved(m) && index < len(t.ThrottleDownPods) { + klog.V(6).Infof("For metric %s, there is still gap to waterlines: %f", m, ctx.ThrottoleDownGapToWaterLines[m]) + + errKeys, released = MetricMap[m].ThrottleFunc(ctx, index, t.ThrottleDownPods, &totalReleased) + klog.V(6).Infof("ThrottleDown pods %s, released %f resource", t.ThrottleDownPods[index].PodKey, released[m]) + errPodKeys = append(errPodKeys, errKeys...) + + ctx.ThrottoleDownGapToWaterLines[m] -= released[m] + index = index + 1 } } } @@ -143,7 +157,7 @@ func (t *ThrottleExecutor) Restore(ctx *ExecuteContext) error { var errPodKeys, errKeys []string // TODO: totalReleasedResource used for prom metrics - var totalReleased ReleaseResource + totalReleased := ReleaseResource{} /* The step to restore: 1. If ThrottleUpWaterLine has metrics that can't be quantified, select a throttleable metric which has the highest action priority, use its RestoreFunc to restore all ThrottleUpPods, then return @@ -156,14 +170,18 @@ func (t *ThrottleExecutor) Restore(ctx *ExecuteContext) error { // There is a metric that can't be ThrottleQuantified, so restore all selected pods if len(MetricsNotThrottleQuantified) != 0 { + klog.V(6).Info("ThrottleUp: There is a metric that can't be ThrottleQuantified") + highestPrioriyMetric := t.ThrottleUpWaterLine.GetHighestPriorityThrottleAbleMetric() if highestPrioriyMetric != "" { + klog.V(6).Infof("The highestPrioriyMetric is %s", highestPrioriyMetric) errPodKeys = t.restorePods(ctx, &totalReleased, highestPrioriyMetric) } } else { - _, ctx.ThrottoleUpGapToWaterLines, _ = buildGapToWaterLine(ctx.getStateFunc(), *t, EvictExecutor{}, ctx.executeExcessPercent) + _, ctx.ThrottoleUpGapToWaterLines, _ = buildGapToWaterLine(ctx.stateMap, *t, EvictExecutor{}, ctx.executeExcessPercent) if ctx.ThrottoleUpGapToWaterLines.HasUsageMissedMetric() { + klog.V(6).Info("There is a metric usage missed") highestPrioriyMetric := t.ThrottleUpWaterLine.GetHighestPriorityThrottleAbleMetric() if highestPrioriyMetric != "" { errPodKeys = t.restorePods(ctx, &totalReleased, highestPrioriyMetric) @@ -172,19 +190,29 @@ func (t *ThrottleExecutor) Restore(ctx *ExecuteContext) error { // The metrics in ThrottoleUpGapToWaterLines are all in WaterLineMetricsCanBeQuantified and has current usage, then throttle precisely var released ReleaseResource for _, m := range metricsThrottleQuantified { + klog.V(6).Infof("ThrottleUp precisely on metric %s", m) if MetricMap[m].SortAble { MetricMap[m].SortFunc(t.ThrottleUpPods) } else { execsort.GeneralSorter(t.ThrottleUpPods) } - t.ThrottleUpPods = Reverse(t.ThrottleUpPods) - - for !ctx.ThrottoleUpGapToWaterLines.TargetGapsRemoved(m) { - for index := range t.ThrottleUpPods { - errKeys, released = MetricMap[m].RestoreFunc(ctx, index, t.ThrottleUpPods, &totalReleased) - errPodKeys = append(errPodKeys, errKeys...) - ctx.ThrottoleUpGapToWaterLines[m] -= released[m] - } + //t.ThrottleUpPods = Reverse(t.ThrottleUpPods) + + klog.V(6).Info("After sort, the sequence to throttle is ") + for _, pc := range t.ThrottleUpPods { + klog.V(6).Info(pc.PodKey.String()) + } + + index := 0 + for !ctx.ThrottoleUpGapToWaterLines.TargetGapsRemoved(m) && index < len(t.ThrottleUpPods) { + klog.V(6).Infof("For metric %s, there is still gap to waterlines: %f", m, ctx.ThrottoleUpGapToWaterLines[m]) + + errKeys, released = MetricMap[m].RestoreFunc(ctx, index, t.ThrottleUpPods, &totalReleased) + klog.V(6).Infof("ThrottleUp pods %s, released %f resource", t.ThrottleUpPods[index].PodKey, released[m]) + errPodKeys = append(errPodKeys, errKeys...) + + ctx.ThrottoleUpGapToWaterLines[m] -= released[m] + index = index + 1 } } } diff --git a/pkg/ensurance/executor/waterline.go b/pkg/ensurance/executor/waterline.go index 7d0a3a9a2..107302012 100644 --- a/pkg/ensurance/executor/waterline.go +++ b/pkg/ensurance/executor/waterline.go @@ -2,6 +2,7 @@ package executor import ( "math" + "reflect" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/klog/v2" @@ -132,71 +133,78 @@ func buildGapToWaterLine(stateMap map[string][]common.TimeSeries, throttleDownGapToWaterLines, throttleUpGapToWaterLines, eviceGapToWaterLines = make(map[WaterLineMetric]float64), make(map[WaterLineMetric]float64), make(map[WaterLineMetric]float64) - // Traverse EvictAbleMetric but not evictExecutor.EvictWaterLine can make it easier when users use the wrong metric name in NEP, cause this limit metrics - // must come from EvictAbleMetrics - for _, m := range GetEvictAbleMetricName() { - // Get the series for each metric - series, ok := stateMap[string(m)] - if !ok { - klog.Warningf("Metric %s not found from collector stateMap", string(m)) - // Can't get current usage, so can not do actions precisely, just evict every evictedPod; - eviceGapToWaterLines[m] = MissedCurrentUsage - continue - } + if !reflect.DeepEqual(evictExecutor, EvictExecutor{}) { + // Traverse EvictAbleMetric but not evictExecutor.EvictWaterLine can make it easier when users use the wrong metric name in NEP, cause this limit metrics + // must come from EvictAbleMetrics + for _, m := range GetEvictAbleMetricName() { + // Get the series for each metric + series, ok := stateMap[string(m)] + if !ok { + klog.Warningf("BuildEvictWaterLineGap: Evict Metric %s not found from collector stateMap", string(m)) + // Can't get current usage, so can not do actions precisely, just evict every evictedPod; + eviceGapToWaterLines[m] = MissedCurrentUsage + continue + } - // Find the biggest used value - var maxUsed float64 - if series[0].Samples[0].Value > maxUsed { - maxUsed = series[0].Samples[0].Value - } + // Find the biggest used value + var maxUsed float64 + if series[0].Samples[0].Value > maxUsed { + maxUsed = series[0].Samples[0].Value + } - // Get the waterLine for each metric in WaterLineMetricsCanBeQuantified - evictWaterLine, evictExist := evictExecutor.EvictWaterLine[m] + // Get the waterLine for each metric in WaterLineMetricsCanBeQuantified + evictWaterLine, evictExist := evictExecutor.EvictWaterLine[m] - // If metric not exist in EvictWaterLine, eviceGapToWaterLines of metric will can't be calculated - if !evictExist { - delete(eviceGapToWaterLines, m) - } else { - eviceGapToWaterLines[m] = executeExcessPercent * (maxUsed - float64(evictWaterLine.PopSmallest().Value())) + // If metric not exist in EvictWaterLine, eviceGapToWaterLines of metric will can't be calculated + if !evictExist { + delete(eviceGapToWaterLines, m) + } else { + klog.V(6).Infof("BuildEvictWaterLineGap: For metrics %s, maxUsed is %f, waterline is %f", m, maxUsed, float64(evictWaterLine.PopSmallest().Value())) + eviceGapToWaterLines[m] = (1 + executeExcessPercent) * (maxUsed - float64(evictWaterLine.PopSmallest().Value())) + } } } - // Traverse ThrottleAbleMetricName but not throttleExecutor.ThrottleDownWaterLine can make it easier when users use the wrong metric name in NEP, cause this limit metrics - // must come from ThrottleAbleMetrics - for _, m := range GetThrottleAbleMetricName() { - // Get the series for each metric - series, ok := stateMap[string(m)] - if !ok { - klog.Warningf("Metric %s not found from collector stateMap", string(m)) - // Can't get current usage, so can not do actions precisely, just evict every evictedPod; - throttleDownGapToWaterLines[m] = MissedCurrentUsage - throttleUpGapToWaterLines[m] = MissedCurrentUsage - continue - } + if !reflect.DeepEqual(throttleExecutor, ThrottleExecutor{}) { + // Traverse ThrottleAbleMetricName but not throttleExecutor.ThrottleDownWaterLine can make it easier when users use the wrong metric name in NEP, cause this limit metrics + // must come from ThrottleAbleMetrics + for _, m := range GetThrottleAbleMetricName() { + // Get the series for each metric + series, ok := stateMap[string(m)] + if !ok { + klog.Warningf("BuildThrottleWaterLineGap: Metric %s not found from collector stateMap", string(m)) + // Can't get current usage, so can not do actions precisely, just evict every evictedPod; + throttleDownGapToWaterLines[m] = MissedCurrentUsage + throttleUpGapToWaterLines[m] = MissedCurrentUsage + continue + } - // Find the biggest used value - var maxUsed float64 - if series[0].Samples[0].Value > maxUsed { - maxUsed = series[0].Samples[0].Value - } + // Find the biggest used value + var maxUsed float64 + if series[0].Samples[0].Value > maxUsed { + maxUsed = series[0].Samples[0].Value + } - // Get the waterLine for each metric in WaterLineMetricsCanBeQuantified - throttleDownWaterLine, throttleDownExist := throttleExecutor.ThrottleDownWaterLine[m] - throttleUpWaterLine, throttleUpExist := throttleExecutor.ThrottleUpWaterLine[m] + // Get the waterLine for each metric in WaterLineMetricsCanBeQuantified + throttleDownWaterLine, throttleDownExist := throttleExecutor.ThrottleDownWaterLine[m] + throttleUpWaterLine, throttleUpExist := throttleExecutor.ThrottleUpWaterLine[m] - // If a metric does not exist in ThrottleDownWaterLine, throttleDownGapToWaterLines of this metric will can't be calculated - if !throttleDownExist { - delete(throttleDownGapToWaterLines, m) - } else { - throttleDownGapToWaterLines[m] = executeExcessPercent * (maxUsed - float64(throttleDownWaterLine.PopSmallest().Value())) - } + // If a metric does not exist in ThrottleDownWaterLine, throttleDownGapToWaterLines of this metric will can't be calculated + if !throttleDownExist { + delete(throttleDownGapToWaterLines, m) + } else { + klog.V(6).Infof("BuildThrottleDownWaterLineGap: For metrics %s, maxUsed is %f, waterline is %f", m, maxUsed, float64(throttleDownWaterLine.PopSmallest().Value())) + throttleDownGapToWaterLines[m] = (1 + executeExcessPercent) * (maxUsed - float64(throttleDownWaterLine.PopSmallest().Value())) + } - // If metric not exist in ThrottleUpWaterLine, throttleUpGapToWaterLines of metric will can't be calculated - if !throttleUpExist { - delete(throttleUpGapToWaterLines, m) - } else { - // Attention: different with throttleDown and evict - throttleUpGapToWaterLines[m] = executeExcessPercent * (float64(throttleUpWaterLine.PopSmallest().Value()) - maxUsed) + // If metric not exist in ThrottleUpWaterLine, throttleUpGapToWaterLines of metric will can't be calculated + if !throttleUpExist { + delete(throttleUpGapToWaterLines, m) + } else { + klog.V(6).Infof("BuildThrottleUpWaterLineGap: For metrics %s, maxUsed is %f, waterline is %f", m, maxUsed, float64(throttleUpWaterLine.PopSmallest().Value())) + // Attention: different with throttleDown and evict, use waterline - used + throttleUpGapToWaterLines[m] = (1 + executeExcessPercent) * (float64(throttleUpWaterLine.PopSmallest().Value()) - maxUsed) + } } } return @@ -226,8 +234,9 @@ func (g GapToWaterLines) TargetGapsRemoved(metric WaterLineMetric) bool { // Whether there is a metric that can't get usage in GapToWaterLines func (g GapToWaterLines) HasUsageMissedMetric() bool { - for _, v := range g { + for m, v := range g { if v == MissedCurrentUsage { + klog.V(6).Infof("Metric %s usage missed", m) return true } } diff --git a/pkg/utils/ref.go b/pkg/utils/ref.go index 1222fc246..00f342768 100644 --- a/pkg/utils/ref.go +++ b/pkg/utils/ref.go @@ -25,7 +25,7 @@ func GetContainerIdFromKey(key string) string { subPaths := strings.Split(key, "/") if len(subPaths) > 0 { - // if the latest sub path is pod-xxx-xxx, we regard as it od path + // if the latest sub path is pod-xxx-xxx, we regard as it pod path // if not we used the latest sub path as the containerId if strings.HasPrefix(subPaths[len(subPaths)-1], CgroupPodPrefix) { return ""