Skip to content

Commit

Permalink
Merge pull request #479 from mfanjie/main
Browse files Browse the repository at this point in the history
always collect node local metrics when node resource controller is enabled.
  • Loading branch information
mfanjie authored Aug 16, 2022
2 parents 9c12d3f + 0f196e1 commit 5851b67
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 25 deletions.
7 changes: 6 additions & 1 deletion cmd/crane-agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -56,6 +57,11 @@ func NewAgentCommand(ctx context.Context) *cobra.Command {
if err := opts.Validate(); err != nil {
klog.Exitf("Opts validate failed: %v", err)
}

cmd.Flags().VisitAll(func(flag *pflag.Flag) {
klog.Infof("FLAG: --%s=%q\n", flag.Name, flag.Value)
})

if err := Run(ctx, opts); err != nil {
klog.Exit(err)
}
Expand All @@ -71,7 +77,6 @@ func NewAgentCommand(ctx context.Context) *cobra.Command {

func Run(ctx context.Context, opts *options.Options) error {
hostname := getHostName(opts.HostnameOverride)

healthCheck := metrics.NewHealthCheck(opts.MaxInactivity)
metrics.RegisterCraneAgent()

Expand Down
31 changes: 20 additions & 11 deletions pkg/ensurance/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *StateCollector) Run(stop <-chan struct{}) {
start := time.Now()
metrics.UpdateLastTime(string(known.ModuleStateCollector), metrics.StepMain, start)
s.healthCheck.UpdateLastActivity(start)
s.Collect(false)
s.Collect()
metrics.UpdateDurationFromStart(string(known.ModuleStateCollector), metrics.StepMain, start)
case <-stop:
klog.Infof("StateCollector exit")
Expand All @@ -111,7 +111,7 @@ func (s *StateCollector) Run(stop <-chan struct{}) {
return
}

func (s *StateCollector) Collect(waterLine bool) {
func (s *StateCollector) Collect() {
wg := sync.WaitGroup{}
start := time.Now()

Expand Down Expand Up @@ -182,17 +182,26 @@ func (s *StateCollector) UpdateCollectors() {
s.collectors.Store(types.CadvisorCollectorType, cadvisor.NewCadvisorCollector(s.podLister, s.GetCadvisorManager()))
}

if nodeResourceGate := utilfeature.DefaultFeatureGate.Enabled(features.CraneNodeResource); nodeResourceGate {
if _, exists := s.collectors.Load(types.NodeResourceCollectorType); !exists {
c := noderesource.NewNodeResourceCollector(s.nodeName, s.nodeLister, s.podLister)
if c != nil {
s.collectors.Store(types.NodeResourceCollectorType, c)
}
}
}
break
}
// if node resource controller is enabled, it indicates local metrics need to be collected no matter nep is defined or not
if nodeResourceGate := utilfeature.DefaultFeatureGate.Enabled(features.CraneNodeResource); nodeResourceGate {
if _, exists := s.collectors.Load(types.NodeLocalCollectorType); !exists {
nc := nodelocal.NewNodeLocal(s.ifaces, s.exclusiveCPUSet)
s.collectors.Store(types.NodeLocalCollectorType, nc)
}

if _, exists := s.collectors.Load(types.CadvisorCollectorType); !exists {
s.collectors.Store(types.CadvisorCollectorType, cadvisor.NewCadvisorCollector(s.podLister, s.GetCadvisorManager()))
}
if _, exists := s.collectors.Load(types.NodeResourceCollectorType); !exists {
c := noderesource.NewNodeResourceCollector(s.nodeName, s.nodeLister, s.podLister)
if c != nil {
s.collectors.Store(types.NodeResourceCollectorType, c)
}
}
nodeLocal = true
}
if !nodeLocal {
stopCollectors := []types.CollectType{types.NodeLocalCollectorType, types.CadvisorCollectorType}

Expand Down Expand Up @@ -249,7 +258,7 @@ func CheckMetricNameExist(name string) bool {

func (s *StateCollector) GetStateFunc() func() map[string][]common.TimeSeries {
return func() map[string][]common.TimeSeries {
s.Collect(true)
s.Collect()
return s.State
}
}
3 changes: 2 additions & 1 deletion pkg/ensurance/collector/nodelocal/nodelocal.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ var nodeLocalMetric = make(map[string][]types.MetricName, 10)
var collectFuncMap = make(map[string]collectFunc, 10)

func registerCollector(collectorName string, metricsNames []types.MetricName, collectorFunc collectFunc) {
klog.Infof("Registering node local metrics collector %s", collectorName)
if _, ok := nodeLocalMetric[collectorName]; ok {
klog.Infof("Warning: node local metrics collectorName %s is registered, not to register again", collectorName)
klog.Infof("Node local metrics collector %s is registered, not to register again", collectorName)
return
}

Expand Down
23 changes: 11 additions & 12 deletions pkg/resource/node_resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package resource

import (
"context"
"encoding/json"
"fmt"
"math"
"strconv"
Expand Down Expand Up @@ -193,15 +192,15 @@ func (o *NodeResourceManager) FindTargetNode(tsp *predictionapi.TimeSeriesPredic
return false, nil
}

func (o *NodeResourceManager) BuildNodeStatus(node *v1.Node) map[v1.ResourceName]string {
func (o *NodeResourceManager) BuildNodeStatus(node *v1.Node) map[string]int64 {
tspCanNotBeReclaimedResource := o.GetCanNotBeReclaimedResourceFromTsp(node)
localCanNotBeReclaimedResource := o.GetCanNotBeReclaimedResourceFromLocal()
reserveCpuPercent := o.reserveResource.CpuPercent
if nodeReserveCpuPercent, ok := getReserveResourcePercentFromNodeAnnotations(node.GetAnnotations(), v1.ResourceCPU.String()); ok {
reserveCpuPercent = &nodeReserveCpuPercent
}

extResourceFrom := map[v1.ResourceName]string{}
extResourceFrom := map[string]int64{}

for resourceName, value := range tspCanNotBeReclaimedResource {
resourceFrom := "tsp"
Expand Down Expand Up @@ -241,7 +240,7 @@ func (o *NodeResourceManager) BuildNodeStatus(node *v1.Node) map[v1.ResourceName
node.Status.Allocatable[v1.ResourceName(extResourceName)] =
*resource.NewQuantity(int64(nextRecommendation), resource.DecimalSI)

extResourceFrom[resourceName] = resourceFrom
extResourceFrom[resourceFrom+"-"+resourceName.String()] = int64(nextRecommendation)
}

return extResourceFrom
Expand Down Expand Up @@ -311,7 +310,7 @@ func (o *NodeResourceManager) GetCpuCoreCanNotBeReclaimedFromLocal() float64 {

nodeCpuUsageTotalTimeSeries, ok := o.state[string(types.MetricNameCpuTotalUsage)]
if !ok {
klog.V(1).Infof("Can't get %s from NodeResourceManager local state", types.MetricNameCpuTotalUsage)
klog.V(4).Infof("Can't get %s from NodeResourceManager local state, please make sure cpu metrics collector is defined in NodeQOS.", types.MetricNameCpuTotalUsage)
return 0
}
nodeCpuUsageTotal := nodeCpuUsageTotalTimeSeries[0].Samples[0].Value
Expand All @@ -321,15 +320,15 @@ func (o *NodeResourceManager) GetCpuCoreCanNotBeReclaimedFromLocal() float64 {
if ok {
extResContainerCpuUsageTotal = extResContainerCpuUsageTotalTimeSeries[0].Samples[0].Value * 1000
} else {
klog.V(1).Infof("Can't get %s from NodeResourceManager local state", types.MetricNameExtResContainerCpuTotalUsage)
klog.V(4).Infof("Can't get %s from NodeResourceManager local state", types.MetricNameExtResContainerCpuTotalUsage)
}

var exclusiveCPUIdle float64 = 0
exclusiveCPUIdleTimeSeries, ok := o.state[string(types.MetricNameExclusiveCPUIdle)]
if ok {
exclusiveCPUIdle = exclusiveCPUIdleTimeSeries[0].Samples[0].Value
} else {
klog.V(1).Infof("Can't get %s from NodeResourceManager local state", types.MetricNameExclusiveCPUIdle)
klog.V(4).Infof("Can't get %s from NodeResourceManager local state", types.MetricNameExclusiveCPUIdle)
}

klog.V(6).Infof("nodeCpuUsageTotal: %s, exclusiveCPUIdle: %s, extResContainerCpuUsageTotal: %s", nodeCpuUsageTotal, exclusiveCPUIdle, extResContainerCpuUsageTotal)
Expand Down Expand Up @@ -366,10 +365,10 @@ func getReserveResourcePercentFromNodeAnnotations(annotations map[string]string,
return reserveResourcePercent, ok
}

func generateUpdateEventMessage(resourcesFrom map[v1.ResourceName]string) string {
message, err := json.Marshal(resourcesFrom)
if err != nil {
return ""
func generateUpdateEventMessage(resourcesFrom map[string]int64) string {
message := ""
for k, v := range resourcesFrom {
message = message + fmt.Sprintf("Updating elastic resource %s with %d.", k, v)
}
return string(message)
return message
}

0 comments on commit 5851b67

Please sign in to comment.