Skip to content

Commit

Permalink
Pod sorting enhancement and precise execution
Browse files Browse the repository at this point in the history
  • Loading branch information
kaiyuechen authored and chenkaiyue committed Jun 13, 2022
1 parent a0f1148 commit 64f6342
Show file tree
Hide file tree
Showing 28 changed files with 1,502 additions and 461 deletions.
4 changes: 2 additions & 2 deletions cmd/crane-agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ func Run(ctx context.Context, opts *options.Options) error {
actionInformer := craneInformerFactory.Ensurance().V1alpha1().AvoidanceActions()
tspInformer := craneInformerFactory.Prediction().V1alpha1().TimeSeriesPredictions()

newAgent, err := agent.NewAgent(ctx, hostname, opts.RuntimeEndpoint, opts.CgroupDriver, kubeClient, craneClient,
podInformer, nodeInformer, nepInformer, actionInformer, tspInformer, opts.NodeResourceReserved, opts.Ifaces, healthCheck, opts.CollectInterval)
newAgent, err := agent.NewAgent(ctx, hostname, opts.RuntimeEndpoint, opts.CgroupDriver, kubeClient, craneClient, podInformer, nodeInformer,
nepInformer, actionInformer, tspInformer, opts.NodeResourceReserved, opts.Ifaces, healthCheck, opts.CollectInterval, opts.ExecuteExcess)

if err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions cmd/crane-agent/app/options/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type Options struct {
// Ifaces is the network devices to collect metric
Ifaces []string
NodeResourceReserved map[string]string
// ExecuteExcess is the percentage of executions that exceed the gap between current usage and waterlines
ExecuteExcess string
}

// NewOptions builds an empty options.
Expand Down Expand Up @@ -54,4 +56,5 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
flags.StringArrayVar(&o.Ifaces, "ifaces", []string{"eth0"}, "The network devices to collect metric, use comma to separated, default: eth0")
flags.Var(cliflag.NewMapStringString(&o.NodeResourceReserved), "node-resource-reserved", "A set of ResourceName=Percent (e.g. cpu=40%,memory=40%)")
flags.DurationVar(&o.MaxInactivity, "max-inactivity", 5*time.Minute, "Maximum time from last recorded activity before automatic restart, default: 5min")
flags.StringVar(&o.ExecuteExcess, "execute-excess", "10%", "The percentage of executions that exceed the gap between current usage and waterlines, default: 10%.")
}
4 changes: 3 additions & 1 deletion pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func NewAgent(ctx context.Context,
ifaces []string,
healthCheck *metrics.HealthCheck,
CollectInterval time.Duration,
executeExcess string,
) (*Agent, error) {
var managers []manager.Manager
var noticeCh = make(chan executor.AvoidanceExecutor)
Expand All @@ -88,8 +89,9 @@ func NewAgent(ctx context.Context,
managers = appendManagerIfNotNil(managers, stateCollector)
analyzerManager := analyzer.NewAnormalyAnalyzer(kubeClient, nodeName, podInformer, nodeInformer, nepInformer, actionInformer, stateCollector.AnalyzerChann, noticeCh)
managers = appendManagerIfNotNil(managers, analyzerManager)
avoidanceManager := executor.NewActionExecutor(kubeClient, nodeName, podInformer, nodeInformer, noticeCh, runtimeEndpoint)
avoidanceManager := executor.NewActionExecutor(kubeClient, nodeName, podInformer, nodeInformer, noticeCh, runtimeEndpoint, stateCollector.State, executeExcess)
managers = appendManagerIfNotNil(managers, avoidanceManager)

if nodeResource := utilfeature.DefaultFeatureGate.Enabled(features.CraneNodeResource); nodeResource {
tspName, err := agent.CreateNodeResourceTsp()
if err != nil {
Expand Down
147 changes: 97 additions & 50 deletions pkg/ensurance/analyzer/analyzer.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package analyzer

import (
"container/heap"
"fmt"
"sort"
"strings"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
Expand All @@ -25,8 +24,8 @@ import (
"github.com/gocrane/crane/pkg/common"
"github.com/gocrane/crane/pkg/ensurance/analyzer/evaluator"
ecache "github.com/gocrane/crane/pkg/ensurance/cache"
stypes "github.com/gocrane/crane/pkg/ensurance/collector/types"
"github.com/gocrane/crane/pkg/ensurance/executor"
podinfo "github.com/gocrane/crane/pkg/ensurance/executor/pod-info"
"github.com/gocrane/crane/pkg/known"
"github.com/gocrane/crane/pkg/metrics"
"github.com/gocrane/crane/pkg/utils"
Expand Down Expand Up @@ -74,6 +73,7 @@ func NewAnormalyAnalyzer(kubeClient *kubernetes.Clientset,
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "crane-agent"})

return &AnormalyAnalyzer{
nodeName: nodeName,
evaluator: expressionEvaluator,
Expand Down Expand Up @@ -180,7 +180,7 @@ func (s *AnormalyAnalyzer) Analyze(state map[string][]common.TimeSeries) {
}
}

klog.V(6).Infof("Analyze actionContexts: %v", actionContexts)
klog.V(6).Infof("Analyze actionContexts: %#v", actionContexts)

//step 3 : merge
avoidanceAction := s.merge(state, avoidanceMaps, actionContexts)
Expand All @@ -189,7 +189,7 @@ func (s *AnormalyAnalyzer) Analyze(state map[string][]common.TimeSeries) {
return
}

//step 4 :notice the enforcer manager
//step 4 : notice the enforcer manager
s.notify(avoidanceAction)

return
Expand All @@ -208,7 +208,7 @@ func (s *AnormalyAnalyzer) trigger(series []common.TimeSeries, object ensurancea
for _, ts := range series {
triggered = s.evaluator.EvalWithMetric(object.MetricRule.Name, float64(object.MetricRule.Value.Value()), ts.Samples[0].Value)

klog.V(6).Infof("Anormaly detection result %v, Name: %s, Value: %.2f, %s/%s", triggered,
klog.V(4).Infof("Anormaly detection result %v, Name: %s, Value: %.2f, %s/%s", triggered,
object.MetricRule.Name,
ts.Samples[0].Value,
common.GetValueByName(ts.Labels, common.LabelNamePodNamespace),
Expand Down Expand Up @@ -238,7 +238,7 @@ func (s *AnormalyAnalyzer) analyze(key string, object ensuranceapi.ObjectiveEnsu
//step2: check if triggered for NodeQOSEnsurance
threshold := s.trigger(series, object)

klog.V(4).Infof("for NodeQOS %s, metrics reach the threshold: %v", key, threshold)
klog.V(4).Infof("For NodeQOS %s, metrics reach the threshold: %v", key, threshold)

//step3: check is triggered action or restored, set the detection
s.computeActionContext(threshold, key, object, &ac)
Expand Down Expand Up @@ -297,24 +297,26 @@ func (s *AnormalyAnalyzer) merge(stateMap map[string][]common.TimeSeries, avoida
//step3 get and deduplicate throttlePods, throttleUpPods
if action.Spec.Throttle != nil {
throttlePods, throttleUpPods := s.getThrottlePods(enableSchedule, ac, action, stateMap)

// combine the throttle waterline
combineThrottleWaterLine(&ae.ThrottleExecutor, ac, enableSchedule)
// combine the replicated pod
combineThrottleDuplicate(&ae.ThrottleExecutor, throttlePods, throttleUpPods)
}

//step4 get and deduplicate evictPods
if action.Spec.Eviction != nil {
evictPods := s.getEvictPods(ac.Triggered, action)
evictPods := s.getEvictPods(ac.Triggered, action, stateMap)

// combine the evict waterline
combineEvictWaterLine(&ae.EvictExecutor, ac)
// combine the replicated pod
combineEvictDuplicate(&ae.EvictExecutor, evictPods)
}
}
ae.StateMap = stateMap

// sort the throttle executor by pod qos priority
sort.Sort(ae.ThrottleExecutor.ThrottleDownPods)
sort.Sort(sort.Reverse(ae.ThrottleExecutor.ThrottleUpPods))

// sort the evict executor by pod qos priority
sort.Sort(ae.EvictExecutor.EvictPods)
klog.V(6).Infof("ThrottleExecutor is %#v, EvictExecutor is %#v", ae.ThrottleExecutor, ae.EvictExecutor)

return ae
}
Expand Down Expand Up @@ -367,9 +369,7 @@ func (s *AnormalyAnalyzer) getTimeSeriesFromMap(state []common.TimeSeries, selec
}

func (s *AnormalyAnalyzer) notify(as executor.AvoidanceExecutor) {
//step1: check need to notice enforcer manager

//step2: notice by channel
//step1: notice by channel
s.actionCh <- as
return
}
Expand All @@ -389,9 +389,13 @@ func (s *AnormalyAnalyzer) actionTriggered(ac ecache.ActionContext) bool {
}

func (s *AnormalyAnalyzer) getThrottlePods(enableSchedule bool, ac ecache.ActionContext,
action *ensuranceapi.AvoidanceAction, stateMap map[string][]common.TimeSeries) ([]executor.ThrottlePod, []executor.ThrottlePod) {
action *ensuranceapi.AvoidanceAction, stateMap map[string][]common.TimeSeries) ([]podinfo.PodContext, []podinfo.PodContext) {

throttlePods, throttleUpPods := []podinfo.PodContext{}, []podinfo.PodContext{}

throttlePods, throttleUpPods := []executor.ThrottlePod{}, []executor.ThrottlePod{}
if !(ac.Triggered || (enableSchedule && ac.Restored)) {
return throttlePods, throttleUpPods
}

allPods, err := s.podLister.List(labels.Everything())
if err != nil {
Expand All @@ -401,18 +405,18 @@ func (s *AnormalyAnalyzer) getThrottlePods(enableSchedule bool, ac ecache.Action

for _, pod := range allPods {
if ac.Triggered {
throttlePods = append(throttlePods, throttlePodConstruct(pod, stateMap, action))
throttlePods = append(throttlePods, podinfo.BuildPodBasicInfo(pod, stateMap, action, podinfo.ThrottleDown))
}
if enableSchedule && ac.Restored {
throttleUpPods = append(throttleUpPods, throttlePodConstruct(pod, stateMap, action))
throttleUpPods = append(throttleUpPods, podinfo.BuildPodBasicInfo(pod, stateMap, action, podinfo.ThrottleUp))
}
}

return throttlePods, throttleUpPods
}

func (s *AnormalyAnalyzer) getEvictPods(triggered bool, action *ensuranceapi.AvoidanceAction) []executor.EvictPod {
evictPods := []executor.EvictPod{}
func (s *AnormalyAnalyzer) getEvictPods(triggered bool, action *ensuranceapi.AvoidanceAction, stateMap map[string][]common.TimeSeries) []podinfo.PodContext {
evictPods := []podinfo.PodContext{}

if triggered {
allPods, err := s.podLister.List(labels.Everything())
Expand All @@ -421,10 +425,8 @@ func (s *AnormalyAnalyzer) getEvictPods(triggered bool, action *ensuranceapi.Avo
return evictPods
}

for _, v := range allPods {
var classAndPriority = executor.ClassAndPriority{PodQOSClass: v.Status.QOSClass, PriorityClassValue: utils.GetInt32withDefault(v.Spec.Priority, 0)}
evictPods = append(evictPods, executor.EvictPod{DeletionGracePeriodSeconds: action.Spec.Eviction.TerminationGracePeriodSeconds,
PodKey: types.NamespacedName{Name: v.Name, Namespace: v.Namespace}, ClassAndPriority: classAndPriority})
for _, pod := range allPods {
evictPods = append(evictPods, podinfo.BuildPodBasicInfo(pod, stateMap, action, podinfo.Evict))
}
}
return evictPods
Expand All @@ -435,7 +437,7 @@ func (s *AnormalyAnalyzer) disableSchedulingMerge(acsFiltered []ecache.ActionCon

// If any rules are triggered, the avoidance is true,otherwise the avoidance is false.
// If all rules are not triggered and some rules are restored, the restore is true,otherwise the restore is false.
// If the restore is true and the cool downtime reached, the enableScheduling is true,otherwise the enableScheduling is false.
// If the restore is true and the cool downtime reached, the enableScheduling is true,otherwise the enableScheduling is false.
var enableScheduling, avoidance, restore bool

defer func() {
Expand Down Expand Up @@ -472,36 +474,19 @@ func (s *AnormalyAnalyzer) disableSchedulingMerge(acsFiltered []ecache.ActionCon

if avoidance {
s.lastTriggeredTime = now
ae.ScheduleExecutor.DisableClassAndPriority = &executor.ClassAndPriority{PodQOSClass: v1.PodQOSBestEffort, PriorityClassValue: 0}
ae.ScheduleExecutor.DisableClassAndPriority = &podinfo.ClassAndPriority{PodQOSClass: v1.PodQOSBestEffort, PriorityClassValue: 0}
}

if enableScheduling {
ae.ScheduleExecutor.RestoreClassAndPriority = &executor.ClassAndPriority{PodQOSClass: v1.PodQOSBestEffort, PriorityClassValue: 0}
ae.ScheduleExecutor.RestoreClassAndPriority = &podinfo.ClassAndPriority{PodQOSClass: v1.PodQOSBestEffort, PriorityClassValue: 0}
}

return enableScheduling
}

func throttlePodConstruct(pod *v1.Pod, stateMap map[string][]common.TimeSeries, action *ensuranceapi.AvoidanceAction) executor.ThrottlePod {
var throttlePod executor.ThrottlePod
var qosPriority = executor.ClassAndPriority{PodQOSClass: pod.Status.QOSClass, PriorityClassValue: utils.GetInt32withDefault(pod.Spec.Priority, 0)}

throttlePod.PodTypes = types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
throttlePod.CPUThrottle.MinCPURatio = uint64(action.Spec.Throttle.CPUThrottle.MinCPURatio)
throttlePod.CPUThrottle.StepCPURatio = uint64(action.Spec.Throttle.CPUThrottle.StepCPURatio)

throttlePod.PodCPUUsage, throttlePod.ContainerCPUUsages = executor.GetPodUsage(string(stypes.MetricNameContainerCpuTotalUsage), stateMap, pod)
throttlePod.PodCPUShare, throttlePod.ContainerCPUShares = executor.GetPodUsage(string(stypes.MetricNameContainerCpuLimit), stateMap, pod)
throttlePod.PodCPUQuota, throttlePod.ContainerCPUQuotas = executor.GetPodUsage(string(stypes.MetricNameContainerCpuQuota), stateMap, pod)
throttlePod.PodCPUPeriod, throttlePod.ContainerCPUPeriods = executor.GetPodUsage(string(stypes.MetricNameContainerCpuPeriod), stateMap, pod)
throttlePod.PodQOSPriority = qosPriority

return throttlePod
}

func combineThrottleDuplicate(e *executor.ThrottleExecutor, throttlePods, throttleUpPods executor.ThrottlePods) {
for _, t := range throttlePods {
if i := e.ThrottleDownPods.Find(t.PodTypes); i == -1 {
if i := e.ThrottleDownPods.Find(t.PodKey); i == -1 {
e.ThrottleDownPods = append(e.ThrottleDownPods, t)
} else {
if t.CPUThrottle.MinCPURatio > e.ThrottleDownPods[i].CPUThrottle.MinCPURatio {
Expand All @@ -513,9 +498,9 @@ func combineThrottleDuplicate(e *executor.ThrottleExecutor, throttlePods, thrott
}
}
}
for _, t := range throttleUpPods {

if i := e.ThrottleUpPods.Find(t.PodTypes); i == -1 {
for _, t := range throttleUpPods {
if i := e.ThrottleUpPods.Find(t.PodKey); i == -1 {
e.ThrottleUpPods = append(e.ThrottleUpPods, t)
} else {
if t.CPUThrottle.MinCPURatio > e.ThrottleUpPods[i].CPUThrottle.MinCPURatio {
Expand All @@ -541,3 +526,65 @@ func combineEvictDuplicate(e *executor.EvictExecutor, evictPods executor.EvictPo
}
}
}

func combineThrottleWaterLine(e *executor.ThrottleExecutor, ac ecache.ActionContext, enableSchedule bool) {
if !(ac.Triggered || (enableSchedule && ac.Restored)) {
return
}
if ac.Triggered {
for _, ensurance := range ac.Nep.Spec.ObjectiveEnsurances {
if ensurance.Name == ac.ObjectiveEnsuranceName {
if e.ThrottleDownWaterLine == nil {
e.ThrottleDownWaterLine = make(map[executor.WaterLineMetric]*executor.WaterLine)
}
// Use a heap here, so we don't need to use <nepName>-<MetricRuleName> as value, just use <MetricRuleName>
if e.ThrottleDownWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] == nil {
e.ThrottleDownWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] = &executor.WaterLine{}
}
heap.Push(e.ThrottleDownWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)], ensurance.MetricRule.Value)
}
}
}

for waterLineMetric, waterlines := range e.ThrottleDownWaterLine {
klog.V(6).Infof("ThrottleDownWaterLine info: metric: %s, value: %#v", waterLineMetric, waterlines)
}

if enableSchedule && ac.Restored {
for _, ensurance := range ac.Nep.Spec.ObjectiveEnsurances {
if ensurance.Name == ac.ObjectiveEnsuranceName {
if e.ThrottleUpWaterLine == nil {
e.ThrottleUpWaterLine = make(map[executor.WaterLineMetric]*executor.WaterLine)
}
if e.ThrottleUpWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] == nil {
e.ThrottleUpWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] = &executor.WaterLine{}
}
heap.Push(e.ThrottleUpWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)], ensurance.MetricRule.Value)
}
}
}

for waterLineMetric, waterlines := range e.ThrottleUpWaterLine {
klog.V(6).Infof("ThrottleUpWaterLine info: metric: %s, value: %#v", waterLineMetric, waterlines)
}
}

func combineEvictWaterLine(e *executor.EvictExecutor, ac ecache.ActionContext) {
if ac.Triggered {
for _, ensurance := range ac.Nep.Spec.ObjectiveEnsurances {
if ensurance.Name == ac.ObjectiveEnsuranceName {
if e.EvictWaterLine == nil {
e.EvictWaterLine = make(map[executor.WaterLineMetric]*executor.WaterLine)
}
if e.EvictWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] == nil {
e.EvictWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] = &executor.WaterLine{}
}
heap.Push(e.EvictWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)], ensurance.MetricRule.Value)
}
}

for waterLineMetric, waterlines := range e.EvictWaterLine {
klog.V(6).Infof("EvictWaterLine info: metric: %s, value: %#v", waterLineMetric, waterlines)
}
}
}
18 changes: 16 additions & 2 deletions pkg/ensurance/collector/cadvisor/cadvisor_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
package cadvisor

import (
"math"
"net/http"
"reflect"
"strconv"
"time"

Expand Down Expand Up @@ -127,6 +129,16 @@ func (c *CadvisorCollector) Collect() (map[string][]common.TimeSeries, error) {
for key, v := range containers {
containerId := utils.GetContainerIdFromKey(key)
containerName := utils.GetContainerNameFromPod(pod, containerId)
klog.V(6).Infof("Key is %s, containerId is %s, containerName is %s", key, containerId, containerName)

if reflect.DeepEqual(cadvisorapiv2.ContainerInfo{}, v) {
klog.Warning("ContainerInfo is nil")
} else {
if len(v.Stats) == 0 {
klog.Warning("ContainerInfo.Stats is empty")
}
}

// Filter the sandbox container
if (containerId != "") && (containerName == "") {
continue
Expand All @@ -145,10 +157,11 @@ func (c *CadvisorCollector) Collect() (map[string][]common.TimeSeries, error) {
}

if state, ok := c.latestContainersStates[key]; ok {
klog.V(8).Infof("For key %s, LatestContainersStates exist", key)
var containerLabels = GetContainerLabels(pod, containerId, containerName, hasExtRes)

cpuUsageSample, schedRunqueueTime := caculateCPUUsage(&v, &state)
if cpuUsageSample == 0 && schedRunqueueTime == 0 {
if cpuUsageSample == 0 && schedRunqueueTime == 0 || math.IsNaN(cpuUsageSample) {
continue
}
if hasExtRes {
Expand All @@ -160,7 +173,7 @@ func (c *CadvisorCollector) Collect() (map[string][]common.TimeSeries, error) {
addSampleToStateMap(types.MetricNameContainerCpuQuota, composeSample(containerLabels, float64(containerInfoV1.Spec.Cpu.Quota), now), stateMap)
addSampleToStateMap(types.MetricNameContainerCpuPeriod, composeSample(containerLabels, float64(containerInfoV1.Spec.Cpu.Period), now), stateMap)

klog.V(10).Infof("Pod: %s, containerName: %s, key %s, scheduler run queue time %.2f", klog.KObj(pod), containerName, key, schedRunqueueTime)
klog.V(8).Infof("Pod: %s, containerName: %s, key %s, scheduler run queue time %.2f, container_cpu_total_usage %#v", klog.KObj(pod), containerName, key, schedRunqueueTime, cpuUsageSample)
}
containerStates[key] = ContainerState{stat: v, timestamp: now}
}
Expand Down Expand Up @@ -202,6 +215,7 @@ func caculateCPUUsage(info *cadvisorapiv2.ContainerInfo, state *ContainerState)
cpuUsageIncrease := info.Stats[0].Cpu.Usage.Total - state.stat.Stats[0].Cpu.Usage.Total
schedRunqueueTimeIncrease := info.Stats[0].Cpu.Schedstat.RunqueueTime - state.stat.Stats[0].Cpu.Schedstat.RunqueueTime
timeIncrease := info.Stats[0].Timestamp.UnixNano() - state.stat.Stats[0].Timestamp.UnixNano()

cpuUsageSample := float64(cpuUsageIncrease) / float64(timeIncrease)
schedRunqueueTime := float64(schedRunqueueTimeIncrease) * 1000 * 1000 / float64(timeIncrease)
return cpuUsageSample, schedRunqueueTime
Expand Down
Loading

0 comments on commit 64f6342

Please sign in to comment.