diff --git a/cmd/crane-agent/app/agent.go b/cmd/crane-agent/app/agent.go index 6dd1c682d..1daeaa1b6 100644 --- a/cmd/crane-agent/app/agent.go +++ b/cmd/crane-agent/app/agent.go @@ -100,7 +100,7 @@ func Run(ctx context.Context, opts *options.Options) error { tspInformer := craneInformerFactory.Prediction().V1alpha1().TimeSeriesPredictions() newAgent, err := agent.NewAgent(ctx, hostname, opts.RuntimeEndpoint, kubeClient, craneClient, - podInformer, nodeInformer, nepInformer, actionInformer, tspInformer, opts.NodeResourceOptions, opts.Ifaces, healthCheck, opts.CollectInterval) + podInformer, nodeInformer, nepInformer, actionInformer, tspInformer, opts.NodeResourceReserved, opts.Ifaces, healthCheck, opts.CollectInterval) if err != nil { return err diff --git a/cmd/crane-agent/app/options/option.go b/cmd/crane-agent/app/options/option.go index d657cf9c9..a222fd456 100644 --- a/cmd/crane-agent/app/options/option.go +++ b/cmd/crane-agent/app/options/option.go @@ -4,6 +4,7 @@ import ( "time" "github.com/spf13/pflag" + cliflag "k8s.io/component-base/cli/flag" ) // Options hold the command-line options about crane manager @@ -21,21 +22,13 @@ type Options struct { // MaxInactivity is the maximum time from last recorded activity before automatic restart MaxInactivity time.Duration // Ifaces is the network devices to collect metric - Ifaces []string - //NodeResourceOptions is the options of nodeResource - NodeResourceOptions NodeResourceOptions -} - -type NodeResourceOptions struct { - ReserveCpuPercentStr string - ReserveMemoryPercentStr string + Ifaces []string + NodeResourceReserved map[string]string } // NewOptions builds an empty options. func NewOptions() *Options { - return &Options{ - NodeResourceOptions: NodeResourceOptions{}, - } + return &Options{} } // Complete completes all the required options. @@ -56,7 +49,6 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) { flags.StringVar(&o.BindAddr, "bind-address", "0.0.0.0:8081", "The address the agent binds to for metrics, health-check and pprof, default: 0.0.0.0:8081.") flags.DurationVar(&o.CollectInterval, "collect-interval", 10*time.Second, "Period for the state collector to collect metrics, default: 10s") flags.StringArrayVar(&o.Ifaces, "ifaces", []string{"eth0"}, "The network devices to collect metric, use comma to separated, default: eth0") - flags.StringVar(&o.NodeResourceOptions.ReserveCpuPercentStr, "reserve-cpu-percent", "", "reserve cpu percentage of node.") - flags.StringVar(&o.NodeResourceOptions.ReserveMemoryPercentStr, "reserve-memory-percent", "", "reserve memory percentage of node.") + flags.Var(cliflag.NewMapStringString(&o.NodeResourceReserved), "node-resource-reserved", "A set of ResourceName=Percent (e.g. cpu=40%,memory=40%)") flags.DurationVar(&o.MaxInactivity, "max-inactivity", 5*time.Minute, "Maximum time from last recorded activity before automatic restart, default: 5min") } diff --git a/examples/noderesource-tsp-template.yaml b/examples/noderesource-tsp-template.yaml index cf47619bd..f8ffdfe9c 100644 --- a/examples/noderesource-tsp-template.yaml +++ b/examples/noderesource-tsp-template.yaml @@ -17,7 +17,7 @@ data: resourceIdentifier: cpu type: ExpressionQuery expressionQuery: - expression: 'node_cpu_cannot_be_reclaimed_seconds{node=~"({{nodename}})(:\\d+)?"}' + expression: 'node_cpu_cannot_be_reclaimed_seconds{node=~"({{ .metadata.name }})(:\\d+)?"}' predictionWindowSeconds: 180 kind: ConfigMap metadata: diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 3051f1fbf..98d4409ba 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -1,13 +1,17 @@ package agent import ( + "bytes" "context" + "encoding/json" "fmt" + "html/template" "net/http" - "strings" + "reflect" "time" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/uuid" @@ -28,7 +32,6 @@ import ( "github.com/gocrane/api/pkg/generated/informers/externalversions/ensurance/v1alpha1" predictionv1 "github.com/gocrane/api/pkg/generated/informers/externalversions/prediction/v1alpha1" v1alpha12 "github.com/gocrane/api/prediction/v1alpha1" - "github.com/gocrane/crane/cmd/crane-agent/app/options" "github.com/gocrane/crane/pkg/ensurance/analyzer" "github.com/gocrane/crane/pkg/ensurance/cm" "github.com/gocrane/crane/pkg/ensurance/collector" @@ -58,7 +61,7 @@ func NewAgent(ctx context.Context, nepInformer v1alpha1.NodeQOSEnsurancePolicyInformer, actionInformer v1alpha1.AvoidanceActionInformer, tspInformer predictionv1.TimeSeriesPredictionInformer, - nodeResourceOptions options.NodeResourceOptions, + nodeResourceReserved map[string]string, ifaces []string, healthCheck *metrics.HealthCheck, CollectInterval time.Duration, @@ -88,7 +91,14 @@ func NewAgent(ctx context.Context, avoidanceManager := executor.NewActionExecutor(kubeClient, nodeName, podInformer, nodeInformer, noticeCh, runtimeEndpoint) managers = appendManagerIfNotNil(managers, avoidanceManager) if nodeResource := utilfeature.DefaultFeatureGate.Enabled(features.CraneNodeResource); nodeResource { - nodeResourceManager := resource.NewNodeResourceManager(kubeClient, nodeName, nodeResourceOptions.ReserveCpuPercentStr, nodeResourceOptions.ReserveMemoryPercentStr, agent.CreateNodeResourceTsp(), nodeInformer, tspInformer, stateCollector.NodeResourceChann) + tspName, err := agent.CreateNodeResourceTsp() + if err != nil { + return agent, err + } + nodeResourceManager, err := resource.NewNodeResourceManager(kubeClient, nodeName, nodeResourceReserved, tspName, nodeInformer, tspInformer, stateCollector.NodeResourceChann) + if err != nil { + return agent, err + } managers = appendManagerIfNotNil(managers, nodeResourceManager) } @@ -127,61 +137,93 @@ func (a *Agent) Run(healthCheck *metrics.HealthCheck, enableProfiling bool, bind }() <-a.ctx.Done() - - _ = a.DeleteNodeResourceTsp() } func getAgentName(nodeName string) string { return nodeName + "." + string(uuid.NewUUID()) } -func (a *Agent) CreateNodeResourceTsp() string { +func (a *Agent) CreateNodeResourceTsp() (string, error) { tsp, err := a.craneClient.PredictionV1alpha1().TimeSeriesPredictions(resource.TspNamespace).Get(context.TODO(), a.GenerateNodeResourceTspName(), metav1.GetOptions{}) - if err == nil { - klog.V(4).Infof("Found old tsp %s in namespace %s", a.GenerateNodeResourceTspName(), resource.TspNamespace) - err := a.DeleteNodeResourceTsp() - if err != nil { - klog.Exitf("Delete old tsp %s with error: %v", a.GenerateNodeResourceTspName(), err) + if err != nil { + if !errors.IsNotFound(err) { + klog.Errorf("Failed to get noderesource tsp : %v", err) + return "", err } } config, err := a.kubeClient.CoreV1().ConfigMaps(resource.TspNamespace).Get(context.TODO(), "noderesource-tsp-template", metav1.GetOptions{}) if err != nil { - klog.Exitf("Get noderesource tsp configmap noderesource-tsp-template with error: %v", err) + klog.Errorf("Failed to get noderesource tsp configmap : %v", err) } if config == nil { - klog.Exitf("Can't get noderesource tsp configmap noderesource-tsp-template") + klog.Errorf("Can't get noderesource tsp configmap noderesource-tsp-template") } - spec := v1alpha12.TimeSeriesPredictionSpec{} - err = yaml.Unmarshal([]byte(strings.Replace(config.Data["spec"], "{{nodename}}", a.nodeName, -1)), &spec) + n, err := a.kubeClient.CoreV1().Nodes().Get(context.TODO(), a.nodeName, metav1.GetOptions{}) if err != nil { - klog.Exitf("Convert spec template error: %v", err) + klog.Errorf("Failed to get node : %v", err) + return "", err } - n, err := a.kubeClient.CoreV1().Nodes().Get(context.TODO(), a.nodeName, metav1.GetOptions{}) + spec := v1alpha12.TimeSeriesPredictionSpec{} + tpl, err := template.New("").Parse(config.Data["spec"]) if err != nil { - klog.Exitf("Get node error: %v", err) + klog.Errorf("Failed to convert spec template : %v", err) + return "", err + } + var buf bytes.Buffer + raw, _ := json.Marshal(n) + var data interface{} + _ = json.Unmarshal(raw, &data) + err = tpl.Execute(&buf, data) + if err != nil { + klog.Errorf("Failed to convert spec template : %v", err) + return "", err + } + err = yaml.Unmarshal(buf.Bytes(), &spec) + if err != nil { + klog.Errorf("Failed to convert spec template : %v", err) + return "", err } - tsp = &v1alpha12.TimeSeriesPrediction{} - - tsp.Name = a.GenerateNodeResourceTspName() - tsp.Namespace = resource.TspNamespace gvk, _ := apiutil.GVKForObject(n, scheme.Scheme) spec.TargetRef = v1.ObjectReference{ Kind: gvk.Kind, APIVersion: gvk.GroupVersion().String(), Name: a.nodeName, } - tsp.Spec = spec - _ = controllerutil.SetControllerReference(n, tsp, scheme.Scheme) - _, err = a.craneClient.PredictionV1alpha1().TimeSeriesPredictions(tsp.Namespace).Create(context.TODO(), tsp, metav1.CreateOptions{}) - if err != nil { - klog.Exitf("Create noderesource tsp %s with error: %v", a.GenerateNodeResourceTspName(), err) + + if tsp != nil { + klog.V(4).Infof("Discover the presence of old noderesource tsp and try to contrast the changes: %s", a.GenerateNodeResourceTspName()) + if reflect.DeepEqual(tsp.Spec, spec) { + return a.GenerateNodeResourceTspName(), nil + } + klog.V(4).Infof("Discover the presence of old noderesource tsp and the Tsp rules have been changed: %s", a.GenerateNodeResourceTspName()) + tsp.Spec = spec + _, err := a.craneClient.PredictionV1alpha1().TimeSeriesPredictions(tsp.Namespace).Update(context.TODO(), tsp, metav1.UpdateOptions{}) + if err != nil { + klog.Errorf("Failed to update noderesource tsp %s : %v", a.GenerateNodeResourceTspName(), err) + return "", err + } + klog.V(4).Infof("The noderesource tsp is updated successfully: %s", a.GenerateNodeResourceTspName()) + } else { + klog.V(4).Infof("The noderesource tsp does not exist, try to create a new one: %s", a.GenerateNodeResourceTspName()) + tsp = &v1alpha12.TimeSeriesPrediction{} + tsp.Name = a.GenerateNodeResourceTspName() + tsp.Namespace = resource.TspNamespace + tsp.Spec = spec + _ = controllerutil.SetControllerReference(n, tsp, scheme.Scheme) + _, err = a.craneClient.PredictionV1alpha1().TimeSeriesPredictions(tsp.Namespace).Create(context.TODO(), tsp, metav1.CreateOptions{}) + if err != nil { + klog.Errorf("Failed to create noderesource tsp %s : %v", a.GenerateNodeResourceTspName(), err) + return "", err + } + klog.V(4).Infof("The noderesource tsp is created successfully: %s", a.GenerateNodeResourceTspName()) } - return a.GenerateNodeResourceTspName() + + return a.GenerateNodeResourceTspName(), nil } func (a *Agent) DeleteNodeResourceTsp() error { @@ -193,14 +235,7 @@ func (a *Agent) DeleteNodeResourceTsp() error { } func (a *Agent) GenerateNodeResourceTspName() string { - return fmt.Sprintf("noderesource-%s", a.name) -} - -func appendManagerIfNotNil(managers []manager.Manager, m manager.Manager) []manager.Manager { - if m != nil { - return append(managers, m) - } - return managers + return fmt.Sprintf("noderesource-%s", a.nodeName) } func appendManagerIfNotNil(managers []manager.Manager, m manager.Manager) []manager.Manager { diff --git a/pkg/ensurance/cm/advanced_cpu_manager.go b/pkg/ensurance/cm/advanced_cpu_manager.go index c6911ae9a..1c8e8fb08 100644 --- a/pkg/ensurance/cm/advanced_cpu_manager.go +++ b/pkg/ensurance/cm/advanced_cpu_manager.go @@ -67,6 +67,8 @@ type AdvancedCpuManager struct { // stateFileDirectory holds the directory where the state file for checkpoints is held. stateFileDirectory string + exclusiveCPUSet cpuset.CPUSet + cadvisor.Manager } @@ -255,6 +257,8 @@ func (m *AdvancedCpuManager) syncState(doAllocate bool) { } } } + + m.exclusiveCPUSet = m.getExclusiveCpu() } func (m *AdvancedCpuManager) policyRemoveContainerByRef(podUID string, containerName string) error { @@ -299,7 +303,7 @@ func (m *AdvancedCpuManager) getSharedCpu() cpuset.CPUSet { return sharedCPUSet } -func (m *AdvancedCpuManager) GetExclusiveCpu() cpuset.CPUSet { +func (m *AdvancedCpuManager) getExclusiveCpu() cpuset.CPUSet { exclusiveCPUSet := cpuset.NewCPUSet() for _, pod := range m.activepods() { for _, container := range pod.Spec.Containers { @@ -313,6 +317,10 @@ func (m *AdvancedCpuManager) GetExclusiveCpu() cpuset.CPUSet { return exclusiveCPUSet } +func (m *AdvancedCpuManager) GetExclusiveCpu() cpuset.CPUSet { + return m.exclusiveCPUSet +} + func (m *AdvancedCpuManager) activepods() []*v1.Pod { allPods, _ := m.podLister.List(labels.Everything()) activePods := make([]*v1.Pod, 0, len(allPods)) diff --git a/pkg/ensurance/collector/cadvisor/cadvisor_unsupported.go b/pkg/ensurance/collector/cadvisor/cadvisor_unsupported.go index b53cfce86..9b20d808e 100644 --- a/pkg/ensurance/collector/cadvisor/cadvisor_unsupported.go +++ b/pkg/ensurance/collector/cadvisor/cadvisor_unsupported.go @@ -4,6 +4,8 @@ package cadvisor import ( + "errors" + info "github.com/google/cadvisor/info/v1" cadvisorapiv2 "github.com/google/cadvisor/info/v2" corelisters "k8s.io/client-go/listers/core/v1" @@ -11,13 +13,14 @@ import ( "github.com/gocrane/crane/pkg/common" "github.com/gocrane/crane/pkg/ensurance/collector/types" ) + var errUnsupported = errors.New("cAdvisor is unsupported in this build") type CadvisorCollectorUnsupport struct { Manager Manager } -type CadvisorManagerUnsupport struct {} +type CadvisorManagerUnsupport struct{} func NewCadvisorManager() Manager { return &CadvisorManagerUnsupport{} diff --git a/pkg/ensurance/collector/collector.go b/pkg/ensurance/collector/collector.go index 98e2ebcac..3024caac7 100644 --- a/pkg/ensurance/collector/collector.go +++ b/pkg/ensurance/collector/collector.go @@ -42,7 +42,6 @@ func NewStateCollector(nodeName string, nepLister ensuranceListers.NodeQOSEnsura analyzerChann := make(chan map[string][]common.TimeSeries) nodeResourceChann := make(chan map[string][]common.TimeSeries) podResourceChann := make(chan map[string][]common.TimeSeries) - c := cadvisor.NewCadvisorManager() return &StateCollector{ nodeName: nodeName, nepLister: nepLister, diff --git a/pkg/ensurance/collector/nodelocal/cpu.go b/pkg/ensurance/collector/nodelocal/cpu.go index 1b350bd5a..8fa7446c1 100644 --- a/pkg/ensurance/collector/nodelocal/cpu.go +++ b/pkg/ensurance/collector/nodelocal/cpu.go @@ -11,6 +11,7 @@ import ( "github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/load" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "github.com/gocrane/crane/pkg/common" "github.com/gocrane/crane/pkg/ensurance/collector/types" @@ -85,7 +86,10 @@ func collectCPU(nodeLocalContext *nodeLocalContext) (map[string][]common.TimeSer usagePercent := calculateBusy(nodeState.latestCpuState.stat, currentCpuState.stat) usageCore := usagePercent * float64(nodeState.cpuCoreNumbers) * 1000 / types.MaxPercentage - cpuSet := nodeLocalContext.exclusiveCPUSet() + cpuSet := cpuset.NewCPUSet() + if nodeLocalContext.exclusiveCPUSet != nil { + cpuSet = nodeLocalContext.exclusiveCPUSet() + } var exclusiveCPUIdle float64 = 0 for cpuId, stat := range currentCpuState.perStat { diff --git a/pkg/resource/node_resource_manager.go b/pkg/resource/node_resource_manager.go index 3870365c1..e6e7d31d7 100644 --- a/pkg/resource/node_resource_manager.go +++ b/pkg/resource/node_resource_manager.go @@ -60,7 +60,7 @@ type NodeResourceManager struct { tspLister predictionlisters.TimeSeriesPredictionLister tspSynced cache.InformerSynced - recorder record.EventRecorder + recorder record.EventRecorder stateChann chan map[string][]common.TimeSeries @@ -73,11 +73,16 @@ type NodeResourceManager struct { tspName string } -func NewNodeResourceManager(client clientset.Interface, nodeName string, reserveCpuPercentStr string, - reserveMemoryPercentStr string, tspName string, nodeInformer coreinformers.NodeInformer, - tspInformer predictionv1.TimeSeriesPredictionInformer, stateChann chan map[string][]common.TimeSeries) *NodeResourceManager { - reserveCpuPercent, _ := utils.ParsePercentage(reserveCpuPercentStr) - reserveMemoryPercent, _ := utils.ParsePercentage(reserveMemoryPercentStr) +func NewNodeResourceManager(client clientset.Interface, nodeName string, op map[string]string, tspName string, nodeInformer coreinformers.NodeInformer, + tspInformer predictionv1.TimeSeriesPredictionInformer, stateChann chan map[string][]common.TimeSeries) (*NodeResourceManager, error) { + reserveCpuPercent, err := utils.ParsePercentage(op[v1.ResourceCPU.String()]) + if err != nil { + return nil, err + } + reserveMemoryPercent, err := utils.ParsePercentage(op[v1.ResourceMemory.String()]) + if err != nil { + return nil, err + } eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartStructuredLogging(0) @@ -99,7 +104,7 @@ func NewNodeResourceManager(client clientset.Interface, nodeName string, reserve }, tspName: tspName, } - return o + return o, nil } func (o *NodeResourceManager) Name() string { @@ -153,7 +158,7 @@ func (o *NodeResourceManager) UpdateNodeResource() { if !equality.Semantic.DeepEqual(&node.Status, &nodeCopy.Status) { // Update Node status extend-resource info // TODO fix: strategic merge patch kubernetes - if _, err := o.client.CoreV1().Nodes().Update(context.TODO(), nodeCopy, metav1.UpdateOptions{}); err != nil { + if _, err := o.client.CoreV1().Nodes().UpdateStatus(context.TODO(), nodeCopy, metav1.UpdateOptions{}); err != nil { klog.Errorf("Failed to update node %s's status extend-resource, %v", nodeCopy.Name, err) return } @@ -188,7 +193,7 @@ func (o *NodeResourceManager) FindTargetNode(tsp *predictionapi.TimeSeriesPredic return false, nil } -func (o *NodeResourceManager) BuildNodeStatus(node *v1.Node) map[v1.ResourceName]string{ +func (o *NodeResourceManager) BuildNodeStatus(node *v1.Node) map[v1.ResourceName]string { tspCanNotBeReclaimedResource := o.GetCanNotBeReclaimedResourceFromTsp(node) localCanNotBeReclaimedResource := o.GetCanNotBeReclaimedResourceFromLocal() reserveCpuPercent := o.reserveResource.CpuPercent @@ -205,14 +210,14 @@ func (o *NodeResourceManager) BuildNodeStatus(node *v1.Node) map[v1.ResourceName maxUsage = localCanNotBeReclaimedResource[resourceName] resourceFrom = "local" } + var nextRecommendation float64 switch resourceName { case v1.ResourceCPU: - // cpu need to be scaled to m as ext resource cannot be decimal if reserveCpuPercent != nil { - nextRecommendation = (float64(node.Status.Allocatable.Cpu().Value())**reserveCpuPercent - maxUsage) * 1000 + nextRecommendation = float64(node.Status.Allocatable.Cpu().Value()) - float64(node.Status.Allocatable.Cpu().Value())*(*reserveCpuPercent) - maxUsage/1000 } else { - nextRecommendation = (float64(node.Status.Allocatable.Cpu().Value()) - maxUsage) * 1000 + nextRecommendation = float64(node.Status.Allocatable.Cpu().Value()) - maxUsage/1000 } case v1.ResourceMemory: // unit of memory in prometheus is in Ki, need to be converted to byte @@ -313,7 +318,7 @@ func (o *NodeResourceManager) GetCpuCoreCanNotBeReclaimedFromLocal() float64 { var extResContainerCpuUsageTotal float64 = 0 extResContainerCpuUsageTotalTimeSeries, ok := o.state[string(types.MetricNameExtResContainerCpuTotalUsage)] if ok { - extResContainerCpuUsageTotal = extResContainerCpuUsageTotalTimeSeries[0].Samples[0].Value + extResContainerCpuUsageTotal = extResContainerCpuUsageTotalTimeSeries[0].Samples[0].Value * 1000 } else { klog.V(1).Infof("Can't get %s from NodeResourceManager local state", types.MetricNameExtResContainerCpuTotalUsage) } @@ -326,6 +331,8 @@ func (o *NodeResourceManager) GetCpuCoreCanNotBeReclaimedFromLocal() float64 { klog.V(1).Infof("Can't get %s from NodeResourceManager local state", types.MetricNameExclusiveCPUIdle) } + klog.V(6).Infof("nodeCpuUsageTotal: %s, exclusiveCPUIdle: %s, extResContainerCpuUsageTotal: %s", nodeCpuUsageTotal, exclusiveCPUIdle, extResContainerCpuUsageTotal) + // 1. Exclusive tethered CPU cannot be reclaimed even if the free part is free, so add the exclusive CPUIdle to the CanNotBeReclaimed CPU // 2. The CPU used by extRes-container needs to be reclaimed, otherwise it will be double-counted due to the allotted mechanism of k8s, so the extResContainerCpuUsageTotal is subtracted from the CanNotBeReclaimedCpu return nodeCpuUsageTotal + exclusiveCPUIdle - extResContainerCpuUsageTotal diff --git a/pkg/utils/pod.go b/pkg/utils/pod.go index c08758e36..836f7a8b4 100644 --- a/pkg/utils/pod.go +++ b/pkg/utils/pod.go @@ -159,7 +159,7 @@ func GetContainerNameFromPod(pod *v1.Pod, containerId string) string { } func GetContainerFromPod(pod *v1.Pod, containerName string) *v1.Container { - if containerName == ""{ + if containerName == "" { return nil } for _, v := range pod.Spec.Containers { diff --git a/pkg/utils/string.go b/pkg/utils/string.go index 5ec868f56..9010814be 100644 --- a/pkg/utils/string.go +++ b/pkg/utils/string.go @@ -31,4 +31,4 @@ func ParsePercentage(input string) (float64, error) { return 0, err } return value / 100, nil -} \ No newline at end of file +}