Skip to content

Commit

Permalink
adapt to nodeqos and podqos api
Browse files Browse the repository at this point in the history
  • Loading branch information
mfanjie committed Aug 19, 2022
1 parent 01ea2e7 commit 9cef689
Show file tree
Hide file tree
Showing 22 changed files with 1,426 additions and 293 deletions.
5 changes: 3 additions & 2 deletions cmd/crane-agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,13 @@ func Run(ctx context.Context, opts *options.Options) error {
nodeInformer := nodeInformerFactory.Core().V1().Nodes()

craneInformerFactory := craneinformers.NewSharedInformerFactory(craneClient, informerSyncPeriod)
nepInformer := craneInformerFactory.Ensurance().V1alpha1().NodeQOSEnsurancePolicies()
nodeQOSInformer := craneInformerFactory.Ensurance().V1alpha1().NodeQOSs()
podQOSInformer := craneInformerFactory.Ensurance().V1alpha1().PodQOSs()
actionInformer := craneInformerFactory.Ensurance().V1alpha1().AvoidanceActions()
tspInformer := craneInformerFactory.Prediction().V1alpha1().TimeSeriesPredictions()

newAgent, err := agent.NewAgent(ctx, hostname, opts.RuntimeEndpoint, opts.CgroupDriver, kubeClient, craneClient, podInformer, nodeInformer,
nepInformer, actionInformer, tspInformer, opts.NodeResourceReserved, opts.Ifaces, healthCheck, opts.CollectInterval, opts.ExecuteExcess)
nodeQOSInformer, podQOSInformer, actionInformer, tspInformer, opts.NodeResourceReserved, opts.Ifaces, healthCheck, opts.CollectInterval, opts.ExecuteExcess)

if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions cmd/crane-agent/app/options/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Options struct {
// Ifaces is the network devices to collect metric
Ifaces []string
NodeResourceReserved map[string]string
// ExecuteExcess is the percentage of executions that exceed the gap between current usage and waterlines
// ExecuteExcess is the percentage of executions that exceed the gap between current usage and watermarks
ExecuteExcess string
}

Expand Down Expand Up @@ -56,5 +56,5 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
flags.StringArrayVar(&o.Ifaces, "ifaces", []string{"eth0"}, "The network devices to collect metric, use comma to separated, default: eth0")
flags.Var(cliflag.NewMapStringString(&o.NodeResourceReserved), "node-resource-reserved", "A set of ResourceName=Percent (e.g. cpu=40%,memory=40%)")
flags.DurationVar(&o.MaxInactivity, "max-inactivity", 5*time.Minute, "Maximum time from last recorded activity before automatic restart, default: 5min")
flags.StringVar(&o.ExecuteExcess, "execute-excess", "10%", "The percentage of executions that exceed the gap between current usage and waterlines, default: 10%.")
flags.StringVar(&o.ExecuteExcess, "execute-excess", "10%", "The percentage of executions that exceed the gap between current usage and watermarks, default: 10%.")
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ require (
)

replace (
github.com/gocrane/api => github.com/chenkaiyue/api v0.9.1-0.20220816093454-5331b34c05f2
github.com/grafana-tools/sdk => github.com/csmarchbanks/sdk v0.0.0-20220120205302-870d00a83f4e
golang.org/x/net => github.com/golang/net v0.0.0-20210825183410-e898025ed96a
k8s.io/api => k8s.io/api v0.22.3
Expand Down
788 changes: 784 additions & 4 deletions go.sum

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ func NewAgent(ctx context.Context,
craneClient *craneclientset.Clientset,
podInformer coreinformers.PodInformer,
nodeInformer coreinformers.NodeInformer,
nepInformer v1alpha1.NodeQOSEnsurancePolicyInformer,
nodeQOSInformer v1alpha1.NodeQOSInformer,
podQOSInformer v1alpha1.PodQOSInformer,
actionInformer v1alpha1.AvoidanceActionInformer,
tspInformer predictionv1.TimeSeriesPredictionInformer,
nodeResourceReserved map[string]string,
Expand All @@ -85,9 +86,9 @@ func NewAgent(ctx context.Context,
exclusiveCPUSet = cpuManager.GetExclusiveCpu
managers = appendManagerIfNotNil(managers, cpuManager)
}
stateCollector := collector.NewStateCollector(nodeName, nepInformer.Lister(), podInformer.Lister(), nodeInformer.Lister(), ifaces, healthCheck, CollectInterval, exclusiveCPUSet, cadvisorManager)
stateCollector := collector.NewStateCollector(nodeName, nodeQOSInformer.Lister(), podInformer.Lister(), nodeInformer.Lister(), ifaces, healthCheck, CollectInterval, exclusiveCPUSet, cadvisorManager)
managers = appendManagerIfNotNil(managers, stateCollector)
analyzerManager := analyzer.NewAnormalyAnalyzer(kubeClient, nodeName, podInformer, nodeInformer, nepInformer, actionInformer, stateCollector.AnalyzerChann, noticeCh)
analyzerManager := analyzer.NewAnormalyAnalyzer(kubeClient, nodeName, podInformer, nodeInformer, nodeQOSInformer, podQOSInformer, actionInformer, stateCollector.AnalyzerChann, noticeCh)
managers = appendManagerIfNotNil(managers, analyzerManager)
avoidanceManager := executor.NewActionExecutor(kubeClient, nodeName, podInformer, nodeInformer, noticeCh, runtimeEndpoint, stateCollector.State, executeExcess)
managers = appendManagerIfNotNil(managers, avoidanceManager)
Expand Down
159 changes: 96 additions & 63 deletions pkg/ensurance/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ type AnormalyAnalyzer struct {
nodeLister corelisters.NodeLister
nodeSynced cache.InformerSynced

nodeQOSLister ensurancelisters.NodeQOSEnsurancePolicyLister
nodeQOSLister ensurancelisters.NodeQOSLister
nodeQOSSynced cache.InformerSynced

podQOSLister ensurancelisters.PodQOSLister
podQOSSynced cache.InformerSynced

avoidanceActionLister ensurancelisters.AvoidanceActionLister
avoidanceActionSynced cache.InformerSynced

Expand All @@ -62,7 +65,8 @@ func NewAnormalyAnalyzer(kubeClient *kubernetes.Clientset,
nodeName string,
podInformer coreinformers.PodInformer,
nodeInformer coreinformers.NodeInformer,
nepInformer v1alpha1.NodeQOSEnsurancePolicyInformer,
nodeQOSInformer v1alpha1.NodeQOSInformer,
podQOSInformer v1alpha1.PodQOSInformer,
actionInformer v1alpha1.AvoidanceActionInformer,
stateChann chan map[string][]common.TimeSeries,
noticeCh chan<- executor.AvoidanceExecutor,
Expand All @@ -83,8 +87,10 @@ func NewAnormalyAnalyzer(kubeClient *kubernetes.Clientset,
podSynced: podInformer.Informer().HasSynced,
nodeLister: nodeInformer.Lister(),
nodeSynced: nodeInformer.Informer().HasSynced,
nodeQOSLister: nepInformer.Lister(),
nodeQOSSynced: nepInformer.Informer().HasSynced,
nodeQOSLister: nodeQOSInformer.Lister(),
nodeQOSSynced: nodeQOSInformer.Informer().HasSynced,
podQOSLister: podQOSInformer.Lister(),
podQOSSynced: podQOSInformer.Informer().HasSynced,
avoidanceActionLister: actionInformer.Lister(),
avoidanceActionSynced: actionInformer.Informer().HasSynced,
stateChann: stateChann,
Expand Down Expand Up @@ -137,18 +143,18 @@ func (s *AnormalyAnalyzer) Analyze(state map[string][]common.TimeSeries) {
return
}

var neps []*ensuranceapi.NodeQOSEnsurancePolicy
allNeps, err := s.nodeQOSLister.List(labels.Everything())
var nodeQOSs []*ensuranceapi.NodeQOS
allNodeQOSs, err := s.nodeQOSLister.List(labels.Everything())
if err != nil {
klog.Errorf("Failed to list NodeQOS: %v", err)
return
}

for _, nep := range allNeps {
if matched, err := utils.LabelSelectorMatched(node.Labels, nep.Spec.Selector); err != nil || !matched {
for _, nodeQOS := range allNodeQOSs {
if matched, err := utils.LabelSelectorMatched(node.Labels, nodeQOS.Spec.Selector); err != nil || !matched {
continue
}
neps = append(neps, nep.DeepCopy())
nodeQOSs = append(nodeQOSs, nodeQOS.DeepCopy())
}

var avoidanceMaps = make(map[string]*ensuranceapi.AvoidanceAction)
Expand All @@ -162,10 +168,10 @@ func (s *AnormalyAnalyzer) Analyze(state map[string][]common.TimeSeries) {
avoidanceMaps[a.Name] = a
}

// step 2: do analyze for neps
// step 2: do analyze for nodeQOSs
var actionContexts []ecache.ActionContext
for _, n := range neps {
for _, v := range n.Spec.ObjectiveEnsurances {
for _, n := range nodeQOSs {
for _, v := range n.Spec.Rules {
var key = strings.Join([]string{n.Name, v.Name}, ".")
actionContext, err := s.analyze(key, v, state)
if err != nil {
Expand All @@ -175,7 +181,7 @@ func (s *AnormalyAnalyzer) Analyze(state map[string][]common.TimeSeries) {
metrics.UpdateAnalyzerWithKeyStatus(metrics.AnalyzeTypeAvoidance, key, float64(utils.Bool2Int32(actionContext.Triggered)))
metrics.UpdateAnalyzerWithKeyStatus(metrics.AnalyzeTypeRestore, key, float64(utils.Bool2Int32(actionContext.Restored)))

actionContext.Nep = n
actionContext.NodeQOS = n
actionContexts = append(actionContexts, actionContext)
}
}
Expand Down Expand Up @@ -203,7 +209,7 @@ func (s *AnormalyAnalyzer) getSeries(state []common.TimeSeries, selector *metav1
return series, nil
}

func (s *AnormalyAnalyzer) trigger(series []common.TimeSeries, object ensuranceapi.ObjectiveEnsurance) bool {
func (s *AnormalyAnalyzer) trigger(series []common.TimeSeries, object ensuranceapi.Rule) bool {
var triggered, threshold bool
for _, ts := range series {
triggered = s.evaluator.EvalWithMetric(object.MetricRule.Name, float64(object.MetricRule.Value.Value()), ts.Samples[0].Value)
Expand All @@ -221,32 +227,31 @@ func (s *AnormalyAnalyzer) trigger(series []common.TimeSeries, object ensurancea
return threshold
}

func (s *AnormalyAnalyzer) analyze(key string, object ensuranceapi.ObjectiveEnsurance, stateMap map[string][]common.TimeSeries) (ecache.ActionContext, error) {
var actionContext = ecache.ActionContext{Strategy: object.Strategy, ObjectiveEnsuranceName: object.Name, ActionName: object.AvoidanceActionName}
func (s *AnormalyAnalyzer) analyze(key string, rule ensuranceapi.Rule, stateMap map[string][]common.TimeSeries) (ecache.ActionContext, error) {
var actionContext = ecache.ActionContext{Strategy: rule.Strategy, RuleName: rule.Name, ActionName: rule.AvoidanceActionName}

state, ok := stateMap[object.MetricRule.Name]
state, ok := stateMap[rule.MetricRule.Name]
if !ok {
return actionContext, fmt.Errorf("metric %s not found", object.MetricRule.Name)
return actionContext, fmt.Errorf("metric %s not found", rule.MetricRule.Name)
}

//step1: get series from value
series, err := s.getSeries(state, object.MetricRule.Selector, object.MetricRule.Name)
series, err := s.getSeries(state, rule.MetricRule.Selector, rule.MetricRule.Name)
if err != nil {
return actionContext, err
}

//step2: check if triggered for NodeQOSEnsurance
threshold := s.trigger(series, object)

threshold := s.trigger(series, rule)
klog.V(4).Infof("For NodeQOS %s, metrics reach the threshold: %v", key, threshold)

//step3: check is triggered action or restored, set the detection
s.computeActionContext(threshold, key, object, &actionContext)
s.computeActionContext(threshold, key, rule, &actionContext)

return actionContext, nil
}

func (s *AnormalyAnalyzer) computeActionContext(threshold bool, key string, object ensuranceapi.ObjectiveEnsurance, ac *ecache.ActionContext) {
func (s *AnormalyAnalyzer) computeActionContext(threshold bool, key string, object ensuranceapi.Rule, ac *ecache.ActionContext) {
if threshold {
s.restored[key] = 0
triggered := utils.GetUint64FromMaps(key, s.triggered)
Expand Down Expand Up @@ -298,8 +303,8 @@ func (s *AnormalyAnalyzer) merge(stateMap map[string][]common.TimeSeries, action
if action.Spec.Throttle != nil {
throttlePods, throttleUpPods := s.getThrottlePods(actionCtx, action, stateMap)

// combine the throttle waterline
combineThrottleWaterLine(&executor.ThrottleExecutor, actionCtx)
// combine the throttle watermark
combineThrottleWatermark(&executor.ThrottleExecutor, actionCtx)
// combine the replicated pod
combineThrottleDuplicate(&executor.ThrottleExecutor, throttlePods, throttleUpPods)
}
Expand All @@ -308,8 +313,8 @@ func (s *AnormalyAnalyzer) merge(stateMap map[string][]common.TimeSeries, action
if action.Spec.Eviction != nil {
evictPods := s.getEvictPods(actionCtx.Triggered, action, stateMap)

// combine the evict waterline
combineEvictWaterLine(&executor.EvictExecutor, actionCtx)
// combine the evict watermark
combineEvictWatermark(&executor.EvictExecutor, actionCtx)
// combine the replicated pod
combineEvictDuplicate(&executor.EvictExecutor, evictPods)
}
Expand All @@ -322,7 +327,7 @@ func (s *AnormalyAnalyzer) merge(stateMap map[string][]common.TimeSeries, action
}

func (s *AnormalyAnalyzer) logEvent(ac ecache.ActionContext, now time.Time) {
var key = strings.Join([]string{ac.Nep.Name, ac.ObjectiveEnsuranceName}, "/")
var key = strings.Join([]string{ac.NodeQOS.Name, ac.RuleName}, "/")

if !(ac.Triggered || ac.Restored) {
return
Expand Down Expand Up @@ -375,7 +380,7 @@ func (s *AnormalyAnalyzer) notify(as executor.AvoidanceExecutor) {
}

func (s *AnormalyAnalyzer) actionTriggered(ac ecache.ActionContext) bool {
var key = strings.Join([]string{ac.Nep.Name, ac.ObjectiveEnsuranceName}, "/")
var key = strings.Join([]string{ac.NodeQOS.Name, ac.RuleName}, "/")

if v, ok := s.actionEventStatus[key]; ok {
if ac.Restored {
Expand All @@ -402,7 +407,12 @@ func (s *AnormalyAnalyzer) getThrottlePods(actionCtx ecache.ActionContext, actio
return throttlePods, throttleUpPods
}

for _, pod := range allPods {
filteredPods, err := s.filterPodQOSMatches(allPods)
if err != nil {
klog.Errorf("Failed to filter all pods: %v.", err)
return throttlePods, throttleUpPods
}
for _, pod := range filteredPods {
if actionCtx.Triggered {
throttlePods = append(throttlePods, podinfo.BuildPodActionContext(pod, stateMap, action, podinfo.ThrottleDown))
}
Expand All @@ -423,14 +433,37 @@ func (s *AnormalyAnalyzer) getEvictPods(triggered bool, action *ensuranceapi.Avo
klog.Errorf("Failed to list all pods: %v.", err)
return evictPods
}

for _, pod := range allPods {
filteredPods, err := s.filterPodQOSMatches(allPods)
if err != nil {
klog.Errorf("Failed to filter all pods: %v.", err)
return evictPods
}
for _, pod := range filteredPods {
evictPods = append(evictPods, podinfo.BuildPodActionContext(pod, stateMap, action, podinfo.Evict))

}
}
return evictPods
}

func (s *AnormalyAnalyzer) filterPodQOSMatches(pods []*v1.Pod) ([]*v1.Pod, error) {
filteredPods := []*v1.Pod{}
podQOSList, err := s.podQOSLister.List(labels.Everything())
// todo: not found error should be ignored
if err != nil {
klog.Errorf("Failed to list NodeQOS: %v", err)
return filteredPods, err
}
for _, qos := range podQOSList {
for _, pod := range pods {
if match(pod, qos) {
filteredPods = append(filteredPods, pod)
}
}
}
return filteredPods, nil
}

func (s *AnormalyAnalyzer) mergeSchedulingActions(actionContexts []ecache.ActionContext, avoidanceMaps map[string]*ensuranceapi.AvoidanceAction, ae *executor.AvoidanceExecutor) {
var now = time.Now()

Expand All @@ -442,7 +475,7 @@ func (s *AnormalyAnalyzer) mergeSchedulingActions(actionContexts []ecache.Action
for _, ac := range actionContexts {
action, ok := avoidanceMaps[ac.ActionName]
if !ok {
klog.Warningf("DoMerge for detection,but the action %s not found", ac.ActionName)
klog.Warningf("Action %s defined in nodeQOS %s is not found", ac.ActionName, ac.NodeQOS.Name)
continue
}

Expand Down Expand Up @@ -512,67 +545,67 @@ func combineEvictDuplicate(e *executor.EvictExecutor, evictPods executor.EvictPo
}
}

func combineThrottleWaterLine(e *executor.ThrottleExecutor, ac ecache.ActionContext) {
func combineThrottleWatermark(e *executor.ThrottleExecutor, ac ecache.ActionContext) {
if !ac.Triggered && !ac.Restored {
return
}

if ac.Triggered {
for _, ensurance := range ac.Nep.Spec.ObjectiveEnsurances {
if ensurance.Name == ac.ObjectiveEnsuranceName {
if e.ThrottleDownWaterLine == nil {
e.ThrottleDownWaterLine = make(map[executor.WaterLineMetric]*executor.WaterLine)
for _, ensurance := range ac.NodeQOS.Spec.Rules {
if ensurance.Name == ac.RuleName {
if e.ThrottleDownWatermark == nil {
e.ThrottleDownWatermark = make(map[executor.WatermarkMetric]*executor.Watermark)
}
// Use a heap here, so we don't need to use <nepName>-<MetricRuleName> as value, just use <MetricRuleName>
if e.ThrottleDownWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] == nil {
e.ThrottleDownWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] = &executor.WaterLine{}
// Use a heap here, so we don't need to use <nodeQOSName>-<MetricRuleName> as value, just use <MetricRuleName>
if e.ThrottleDownWatermark[executor.WatermarkMetric(ensurance.MetricRule.Name)] == nil {
e.ThrottleDownWatermark[executor.WatermarkMetric(ensurance.MetricRule.Name)] = &executor.Watermark{}
}
heap.Push(e.ThrottleDownWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)], ensurance.MetricRule.Value)
heap.Push(e.ThrottleDownWatermark[executor.WatermarkMetric(ensurance.MetricRule.Name)], ensurance.MetricRule.Value)
}
}
}

for waterLineMetric, waterlines := range e.ThrottleDownWaterLine {
klog.V(6).Infof("ThrottleDownWaterLine info: metric: %s, value: %#v", waterLineMetric, waterlines)
for watermarkMetric, watermarks := range e.ThrottleDownWatermark {
klog.V(6).Infof("ThrottleDownWatermark info: metric: %s, value: %#v", watermarkMetric, watermarks)
}

if ac.Restored {
for _, ensurance := range ac.Nep.Spec.ObjectiveEnsurances {
if ensurance.Name == ac.ObjectiveEnsuranceName {
if e.ThrottleUpWaterLine == nil {
e.ThrottleUpWaterLine = make(map[executor.WaterLineMetric]*executor.WaterLine)
for _, ensurance := range ac.NodeQOS.Spec.Rules {
if ensurance.Name == ac.RuleName {
if e.ThrottleUpWatermark == nil {
e.ThrottleUpWatermark = make(map[executor.WatermarkMetric]*executor.Watermark)
}
if e.ThrottleUpWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] == nil {
e.ThrottleUpWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] = &executor.WaterLine{}
if e.ThrottleUpWatermark[executor.WatermarkMetric(ensurance.MetricRule.Name)] == nil {
e.ThrottleUpWatermark[executor.WatermarkMetric(ensurance.MetricRule.Name)] = &executor.Watermark{}
}
heap.Push(e.ThrottleUpWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)], ensurance.MetricRule.Value)
heap.Push(e.ThrottleUpWatermark[executor.WatermarkMetric(ensurance.MetricRule.Name)], ensurance.MetricRule.Value)
}
}
}

for waterLineMetric, waterlines := range e.ThrottleUpWaterLine {
klog.V(6).Infof("ThrottleUpWaterLine info: metric: %s, value: %#v", waterLineMetric, waterlines)
for watermarkMetric, watermarks := range e.ThrottleUpWatermark {
klog.V(6).Infof("ThrottleUpWatermark info: metric: %s, value: %#v", watermarkMetric, watermarks)
}
}

func combineEvictWaterLine(e *executor.EvictExecutor, ac ecache.ActionContext) {
func combineEvictWatermark(e *executor.EvictExecutor, ac ecache.ActionContext) {
if !ac.Triggered {
return
}

for _, ensurance := range ac.Nep.Spec.ObjectiveEnsurances {
if ensurance.Name == ac.ObjectiveEnsuranceName {
if e.EvictWaterLine == nil {
e.EvictWaterLine = make(map[executor.WaterLineMetric]*executor.WaterLine)
for _, ensurance := range ac.NodeQOS.Spec.Rules {
if ensurance.Name == ac.RuleName {
if e.EvictWatermark == nil {
e.EvictWatermark = make(map[executor.WatermarkMetric]*executor.Watermark)
}
if e.EvictWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] == nil {
e.EvictWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)] = &executor.WaterLine{}
if e.EvictWatermark[executor.WatermarkMetric(ensurance.MetricRule.Name)] == nil {
e.EvictWatermark[executor.WatermarkMetric(ensurance.MetricRule.Name)] = &executor.Watermark{}
}
heap.Push(e.EvictWaterLine[executor.WaterLineMetric(ensurance.MetricRule.Name)], ensurance.MetricRule.Value)
heap.Push(e.EvictWatermark[executor.WatermarkMetric(ensurance.MetricRule.Name)], ensurance.MetricRule.Value)
}
}

for waterLineMetric, waterlines := range e.EvictWaterLine {
klog.V(6).Infof("EvictWaterLine info: metric: %s, value: %#v", waterLineMetric, waterlines)
for watermarkMetric, watermarks := range e.EvictWatermark {
klog.V(6).Infof("EvictWatermark info: metric: %s, value: %#v", watermarkMetric, watermarks)
}
}
Loading

0 comments on commit 9cef689

Please sign in to comment.