From 1cbbb7787ea78639e19e9c39b3dc2efe26c1abd9 Mon Sep 17 00:00:00 2001 From: Garrybest Date: Sat, 17 Sep 2022 19:48:40 +0800 Subject: [PATCH] move nrt collection to collector Signed-off-by: Garrybest --- cmd/crane-agent/app/agent.go | 57 ++---- pkg/agent/agent.go | 150 ++++----------- pkg/ensurance/collector/collector.go | 29 ++- .../noderesourcetopology.go | 179 ++++++++++++++++++ pkg/ensurance/collector/types/types.go | 11 +- pkg/features/features.go | 4 + pkg/utils/node.go | 43 ++++- 7 files changed, 305 insertions(+), 168 deletions(-) create mode 100644 pkg/ensurance/collector/noderesourcetopology/noderesourcetopology.go diff --git a/cmd/crane-agent/app/agent.go b/cmd/crane-agent/app/agent.go index 7000e6cf7..d0dbdce13 100644 --- a/cmd/crane-agent/app/agent.go +++ b/cmd/crane-agent/app/agent.go @@ -2,9 +2,7 @@ package app import ( "context" - "encoding/json" "flag" - "fmt" "os" "time" @@ -19,9 +17,6 @@ import ( "k8s.io/client-go/kubernetes" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/klog/v2" - kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1" - kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config" - kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme" ctrl "sigs.k8s.io/controller-runtime" ensuranceapi "github.com/gocrane/api/ensurance/v1alpha1" @@ -90,10 +85,6 @@ func Run(ctx context.Context, opts *options.Options) error { if err != nil { return err } - kubeletConfig, err := getKubeletConfig(ctx, kubeClient, hostname) - if err != nil { - return err - } podInformerFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, informerSyncPeriod, informers.WithTweakListOptions(func(options *metav1.ListOptions) { @@ -115,8 +106,15 @@ func Run(ctx context.Context, opts *options.Options) error { actionInformer := craneInformerFactory.Ensurance().V1alpha1().AvoidanceActions() tspInformer := craneInformerFactory.Prediction().V1alpha1().TimeSeriesPredictions() - newAgent, err := agent.NewAgent(ctx, hostname, opts.RuntimeEndpoint, opts.CgroupDriver, opts.SysPath, kubeClient, craneClient, kubeletConfig, podInformer, nodeInformer, - nodeQOSInformer, podQOSInformer, actionInformer, tspInformer, opts.NodeResourceReserved, opts.Ifaces, healthCheck, opts.CollectInterval, opts.ExecuteExcess) + nrtInformerFactory := craneinformers.NewSharedInformerFactoryWithOptions(craneClient, informerSyncPeriod, + craneinformers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector(nodeNameField, hostname).String() + }), + ) + nrtInformer := nrtInformerFactory.Topology().V1alpha1().NodeResourceTopologies() + + newAgent, err := agent.NewAgent(ctx, hostname, opts.RuntimeEndpoint, opts.CgroupDriver, opts.SysPath, kubeClient, craneClient, podInformer, nodeInformer, + nodeQOSInformer, podQOSInformer, actionInformer, tspInformer, nrtInformer, opts.NodeResourceReserved, opts.Ifaces, healthCheck, opts.CollectInterval, opts.ExecuteExcess) if err != nil { return err @@ -125,10 +123,12 @@ func Run(ctx context.Context, opts *options.Options) error { podInformerFactory.Start(ctx.Done()) nodeInformerFactory.Start(ctx.Done()) craneInformerFactory.Start(ctx.Done()) + nrtInformerFactory.Start(ctx.Done()) podInformerFactory.WaitForCacheSync(ctx.Done()) nodeInformerFactory.WaitForCacheSync(ctx.Done()) craneInformerFactory.WaitForCacheSync(ctx.Done()) + nrtInformerFactory.WaitForCacheSync(ctx.Done()) newAgent.Run(healthCheck, opts.EnableProfiling, opts.BindAddr) return nil @@ -163,38 +163,3 @@ func getHostName(override string) string { } return nodeName } - -func getKubeletConfig(ctx context.Context, c kubernetes.Interface, hostname string) (*kubeletconfiginternal.KubeletConfiguration, error) { - result, err := c.CoreV1().RESTClient().Get(). - Resource("nodes"). - SubResource("proxy"). - Name(hostname). - Suffix("configz"). - Do(ctx). - Raw() - if err != nil { - return nil, err - } - - // This hack because /configz reports the following structure: - // {"kubeletconfig": {the JSON representation of kubeletconfigv1beta1.KubeletConfiguration}} - type configzWrapper struct { - ComponentConfig kubeletconfigv1beta1.KubeletConfiguration `json:"kubeletconfig"` - } - configz := configzWrapper{} - - if err = json.Unmarshal(result, &configz); err != nil { - return nil, fmt.Errorf("failed to unmarshal json for kubelet config: %v", err) - } - - scheme, _, err := kubeletscheme.NewSchemeAndCodecs() - if err != nil { - return nil, err - } - cfg := kubeletconfiginternal.KubeletConfiguration{} - if err = scheme.Convert(&configz.ComponentConfig, &cfg, nil); err != nil { - return nil, err - } - - return &cfg, nil -} diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 830a9ee6d..0a7e87ad2 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -10,17 +10,12 @@ import ( "reflect" "time" - jsonpatch "github.com/evanphx/json-patch" - "github.com/jaypipes/ghw" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" - apiresource "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/yaml" - quotav1 "k8s.io/apiserver/pkg/quota/v1" "k8s.io/apiserver/pkg/server/mux" "k8s.io/apiserver/pkg/server/routes" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -29,9 +24,6 @@ import ( "k8s.io/client-go/kubernetes/scheme" "k8s.io/component-base/metrics/legacyregistry" "k8s.io/klog/v2" - kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config" - kubeletcpumanager "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" - "k8s.io/kubernetes/pkg/kubelet/stats/pidlimit" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -39,6 +31,7 @@ import ( craneclientset "github.com/gocrane/api/pkg/generated/clientset/versioned" "github.com/gocrane/api/pkg/generated/informers/externalversions/ensurance/v1alpha1" predictionv1 "github.com/gocrane/api/pkg/generated/informers/externalversions/prediction/v1alpha1" + topologyinformer "github.com/gocrane/api/pkg/generated/informers/externalversions/topology/v1alpha1" predictionapi "github.com/gocrane/api/prediction/v1alpha1" topologyapi "github.com/gocrane/api/topology/v1alpha1" @@ -46,50 +39,49 @@ import ( "github.com/gocrane/crane/pkg/ensurance/cm" "github.com/gocrane/crane/pkg/ensurance/collector" "github.com/gocrane/crane/pkg/ensurance/collector/cadvisor" + "github.com/gocrane/crane/pkg/ensurance/collector/noderesourcetopology" "github.com/gocrane/crane/pkg/ensurance/executor" "github.com/gocrane/crane/pkg/ensurance/manager" "github.com/gocrane/crane/pkg/features" "github.com/gocrane/crane/pkg/metrics" "github.com/gocrane/crane/pkg/resource" - "github.com/gocrane/crane/pkg/topology" + "github.com/gocrane/crane/pkg/utils" ) type Agent struct { - ctx context.Context - name string - nodeName string - kubeClient kubernetes.Interface - craneClient craneclientset.Interface - managers []manager.Manager - kubeletConfig *kubeletconfiginternal.KubeletConfiguration + ctx context.Context + name string + nodeName string + kubeClient kubernetes.Interface + craneClient craneclientset.Interface + managers []manager.Manager } func NewAgent(ctx context.Context, nodeName, runtimeEndpoint, cgroupDriver, sysPath string, kubeClient kubernetes.Interface, craneClient craneclientset.Interface, - kubeletConfig *kubeletconfiginternal.KubeletConfiguration, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, nodeQOSInformer v1alpha1.NodeQOSInformer, podQOSInformer v1alpha1.PodQOSInformer, actionInformer v1alpha1.AvoidanceActionInformer, tspInformer predictionv1.TimeSeriesPredictionInformer, + nrtInformer topologyinformer.NodeResourceTopologyInformer, nodeResourceReserved map[string]string, ifaces []string, healthCheck *metrics.HealthCheck, - CollectInterval time.Duration, + collectInterval time.Duration, executeExcess string, ) (*Agent, error) { var managers []manager.Manager var noticeCh = make(chan executor.AvoidanceExecutor) agent := &Agent{ - ctx: ctx, - name: getAgentName(nodeName), - nodeName: nodeName, - kubeClient: kubeClient, - craneClient: craneClient, - kubeletConfig: kubeletConfig, + ctx: ctx, + name: getAgentName(nodeName), + nodeName: nodeName, + kubeClient: kubeClient, + craneClient: craneClient, } utilruntime.Must(ensuranceapi.AddToScheme(scheme.Scheme)) @@ -101,7 +93,7 @@ func NewAgent(ctx context.Context, exclusiveCPUSet = cpuManager.GetExclusiveCpu managers = appendManagerIfNotNil(managers, cpuManager) } - stateCollector := collector.NewStateCollector(nodeName, nodeQOSInformer.Lister(), podInformer.Lister(), nodeInformer.Lister(), ifaces, healthCheck, CollectInterval, exclusiveCPUSet, cadvisorManager) + stateCollector := collector.NewStateCollector(nodeName, sysPath, kubeClient, craneClient, nodeQOSInformer.Lister(), nrtInformer.Lister(), podInformer.Lister(), nodeInformer.Lister(), ifaces, healthCheck, collectInterval, exclusiveCPUSet, cadvisorManager) managers = appendManagerIfNotNil(managers, stateCollector) analyzerManager := analyzer.NewAnomalyAnalyzer(kubeClient, nodeName, podInformer, nodeInformer, nodeQOSInformer, podQOSInformer, actionInformer, stateCollector.AnalyzerChann, noticeCh) managers = appendManagerIfNotNil(managers, analyzerManager) @@ -122,9 +114,10 @@ func NewAgent(ctx context.Context, managers = appendManagerIfNotNil(managers, podResourceManager) } - _, err := agent.CreateNodeResourceTopology(sysPath) - if err != nil { - return agent, err + if utilfeature.DefaultFeatureGate.Enabled(features.CraneNodeResourceTopology) { + if err := agent.CreateNodeResourceTopology(sysPath); err != nil { + return agent, err + } } agent.managers = managers @@ -251,84 +244,39 @@ func (a *Agent) CreateNodeResourceTsp() string { return a.GenerateNodeResourceTspName() } -func (a *Agent) CreateNodeResourceTopology(sysPath string) (*topologyapi.NodeResourceTopology, error) { - topo, err := ghw.Topology(ghw.WithPathOverrides(ghw.PathOverrides{ - "/sys": sysPath, - })) - if err != nil { - return nil, fmt.Errorf("failed to detect topology info by GHW: %v", err) - } - klog.InfoS("Get topology info from GHW finished", "info", topo.String()) - - exist := true +func (a *Agent) CreateNodeResourceTopology(sysPath string) error { nrt, err := a.craneClient.TopologyV1alpha1().NodeResourceTopologies().Get(context.TODO(), a.nodeName, metav1.GetOptions{}) if err != nil { if !errors.IsNotFound(err) { klog.Errorf("Failed to get node resource topology: %v", err) - return nil, err + return err } - exist = false + nrt = nil } node, err := a.kubeClient.CoreV1().Nodes().Get(context.TODO(), a.nodeName, metav1.GetOptions{}) if err != nil { klog.Errorf("Failed to get node: %v", err) - return nil, err + return err } - kubeReserved, err := parseResourceList(a.kubeletConfig.KubeReserved) + kubeletConfig, err := utils.GetKubeletConfig(context.TODO(), a.kubeClient, a.nodeName) if err != nil { - return nil, err - } - systemReserved, err := parseResourceList(a.kubeletConfig.SystemReserved) - if err != nil { - return nil, err + klog.Errorf("Failed to get kubelet config: %v", err) + return err } - reserved := quotav1.Add(kubeReserved, systemReserved) - cpuManagerPolicy := topologyapi.CPUManagerPolicyStatic - // If kubelet cpumanager policy is static, we should set the agent cpu manager policy to none. - if a.kubeletConfig.CPUManagerPolicy == string(kubeletcpumanager.PolicyStatic) { - cpuManagerPolicy = topologyapi.CPUManagerPolicyNone + newNrt, err := noderesourcetopology.BuildNodeResourceTopology(sysPath, kubeletConfig, node) + if err != nil { + klog.Errorf("Failed to build node resource topology: %v", err) + return err } - nrtBuilder := topology.NewNRTBuilder() - nrtBuilder.WithNode(node) - nrtBuilder.WithReserved(reserved) - nrtBuilder.WithTopologyInfo(topo) - nrtBuilder.WithCPUManagerPolicy(cpuManagerPolicy) - newNrt := nrtBuilder.Build() - _ = controllerutil.SetControllerReference(node, newNrt, scheme.Scheme) - - if exist { - newNrt.TypeMeta = nrt.TypeMeta - newNrt.ObjectMeta = nrt.ObjectMeta - oldData, err := json.Marshal(nrt) - if err != nil { - return nil, err - } - newData, err := json.Marshal(newNrt) - if err != nil { - return nil, err - } - patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) - if err != nil { - return nil, fmt.Errorf("failed to create merge patch: %v", err) - } - nrt, err = a.craneClient.TopologyV1alpha1().NodeResourceTopologies().Patch(context.TODO(), a.nodeName, types.MergePatchType, patchBytes, metav1.PatchOptions{}) - if err != nil { - klog.Errorf("Failed to update node resource topology %s: %v", a.nodeName, err) - return nil, err - } - return nrt, nil - } else { - nrt, err = a.craneClient.TopologyV1alpha1().NodeResourceTopologies().Create(context.TODO(), newNrt, metav1.CreateOptions{}) - if err != nil { - klog.Errorf("Failed to create node resource topology %s: %v", a.nodeName, err) - return nil, err - } - return nrt, nil + if err = noderesourcetopology.CreateOrUpdateNodeResourceTopology(a.craneClient, nrt, newNrt); err != nil { + klog.Errorf("Failed to create or update node resource topology: %v", err) + return err } + return nil } func (a *Agent) DeleteNodeResourceTsp() error { @@ -349,29 +297,3 @@ func appendManagerIfNotNil(managers []manager.Manager, m manager.Manager) []mana } return managers } - -// parseResourceList parses the given configuration map into an API -// ResourceList or returns an error. -func parseResourceList(m map[string]string) (corev1.ResourceList, error) { - if len(m) == 0 { - return nil, nil - } - rl := make(corev1.ResourceList) - for k, v := range m { - switch corev1.ResourceName(k) { - // CPU, memory, local storage, and PID resources are supported. - case corev1.ResourceCPU, corev1.ResourceMemory, corev1.ResourceEphemeralStorage, pidlimit.PIDs: - q, err := apiresource.ParseQuantity(v) - if err != nil { - return nil, err - } - if q.Sign() == -1 { - return nil, fmt.Errorf("resource quantity for %q cannot be negative: %v", k, v) - } - rl[corev1.ResourceName(k)] = q - default: - return nil, fmt.Errorf("cannot reserve %q resource", k) - } - } - return rl, nil -} diff --git a/pkg/ensurance/collector/collector.go b/pkg/ensurance/collector/collector.go index ef52bb5c2..468a55892 100644 --- a/pkg/ensurance/collector/collector.go +++ b/pkg/ensurance/collector/collector.go @@ -6,15 +6,20 @@ import ( "k8s.io/apimachinery/pkg/labels" utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + craneclientset "github.com/gocrane/api/pkg/generated/clientset/versioned" ensuranceListers "github.com/gocrane/api/pkg/generated/listers/ensurance/v1alpha1" + topologylisters "github.com/gocrane/api/pkg/generated/listers/topology/v1alpha1" + "github.com/gocrane/crane/pkg/common" "github.com/gocrane/crane/pkg/ensurance/collector/cadvisor" "github.com/gocrane/crane/pkg/ensurance/collector/nodelocal" "github.com/gocrane/crane/pkg/ensurance/collector/noderesource" + "github.com/gocrane/crane/pkg/ensurance/collector/noderesourcetopology" "github.com/gocrane/crane/pkg/ensurance/collector/types" "github.com/gocrane/crane/pkg/features" "github.com/gocrane/crane/pkg/known" @@ -24,7 +29,11 @@ import ( type StateCollector struct { nodeName string + sysPath string + kubeClient kubernetes.Interface + craneClient craneclientset.Interface nodeQOSLister ensuranceListers.NodeQOSLister + nrtLister topologylisters.NodeResourceTopologyLister podLister corelisters.PodLister nodeLister corelisters.NodeLister healthCheck *metrics.HealthCheck @@ -40,15 +49,23 @@ type StateCollector struct { rw sync.RWMutex } -func NewStateCollector(nodeName string, nodeQOSLister ensuranceListers.NodeQOSLister, podLister corelisters.PodLister, - nodeLister corelisters.NodeLister, ifaces []string, healthCheck *metrics.HealthCheck, collectInterval time.Duration, exclusiveCPUSet func() cpuset.CPUSet, manager cadvisor.Manager) *StateCollector { +func NewStateCollector(nodeName, sysPath string, kubeClient kubernetes.Interface, craneClient craneclientset.Interface, + nodeQOSLister ensuranceListers.NodeQOSLister, nrtLister topologylisters.NodeResourceTopologyLister, + podLister corelisters.PodLister, nodeLister corelisters.NodeLister, ifaces []string, + healthCheck *metrics.HealthCheck, collectInterval time.Duration, exclusiveCPUSet func() cpuset.CPUSet, + manager cadvisor.Manager, +) *StateCollector { analyzerChann := make(chan map[string][]common.TimeSeries) nodeResourceChann := make(chan map[string][]common.TimeSeries) podResourceChann := make(chan map[string][]common.TimeSeries) State := make(map[string][]common.TimeSeries) return &StateCollector{ nodeName: nodeName, + sysPath: sysPath, + kubeClient: kubeClient, + craneClient: craneClient, nodeQOSLister: nodeQOSLister, + nrtLister: nrtLister, podLister: podLister, nodeLister: nodeLister, healthCheck: healthCheck, @@ -132,6 +149,8 @@ func (s *StateCollector) Collect() { data[key] = series } s.rw.Unlock() + } else { + klog.ErrorS(err, "Failed to collect data in state collector", "type", c.GetType()) } }(c, s.State) @@ -216,6 +235,12 @@ func (s *StateCollector) UpdateCollectors() { } } } + if utilfeature.DefaultFeatureGate.Enabled(features.CraneNodeResourceTopology) { + if _, exists := s.collectors.Load(types.NodeResourceTopologyCollectorType); !exists { + s.collectors.Store(types.NodeResourceTopologyCollectorType, + noderesourcetopology.NewNodeResourceTopology(s.nodeName, s.sysPath, s.nrtLister, s.nodeLister, s.kubeClient, s.craneClient)) + } + } return } diff --git a/pkg/ensurance/collector/noderesourcetopology/noderesourcetopology.go b/pkg/ensurance/collector/noderesourcetopology/noderesourcetopology.go new file mode 100644 index 000000000..a3b0b3f49 --- /dev/null +++ b/pkg/ensurance/collector/noderesourcetopology/noderesourcetopology.go @@ -0,0 +1,179 @@ +package noderesourcetopology + +import ( + "context" + "encoding/json" + "fmt" + + jsonpatch "github.com/evanphx/json-patch" + "github.com/jaypipes/ghw" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + apiresource "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + patchtypes "k8s.io/apimachinery/pkg/types" + quotav1 "k8s.io/apiserver/pkg/quota/v1" + kubeclient "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + corelisters "k8s.io/client-go/listers/core/v1" + kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config" + kubeletcpumanager "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" + "k8s.io/kubernetes/pkg/kubelet/stats/pidlimit" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + craneclientset "github.com/gocrane/api/pkg/generated/clientset/versioned" + topologylisters "github.com/gocrane/api/pkg/generated/listers/topology/v1alpha1" + topologyapi "github.com/gocrane/api/topology/v1alpha1" + + "github.com/gocrane/crane/pkg/common" + "github.com/gocrane/crane/pkg/ensurance/collector/types" + "github.com/gocrane/crane/pkg/topology" + "github.com/gocrane/crane/pkg/utils" +) + +type NodeResourceTopology struct { + name types.CollectType + nodeName string + sysPath string + nrtLister topologylisters.NodeResourceTopologyLister + nodeLister corelisters.NodeLister + client kubeclient.Interface + craneClient craneclientset.Interface +} + +func NewNodeResourceTopology(nodeName, sysPath string, + nrtLister topologylisters.NodeResourceTopologyLister, nodeLister corelisters.NodeLister, + client kubeclient.Interface, craneClient craneclientset.Interface, +) *NodeResourceTopology { + return &NodeResourceTopology{ + nodeName: nodeName, + sysPath: sysPath, + nrtLister: nrtLister, + nodeLister: nodeLister, + client: client, + craneClient: craneClient, + } +} + +func (n *NodeResourceTopology) GetType() types.CollectType { + return types.NodeResourceTopologyCollectorType +} + +func (n *NodeResourceTopology) Collect() (map[string][]common.TimeSeries, error) { + nrt, err := n.nrtLister.Get(n.nodeName) + if err != nil { + return nil, err + } + + node, err := n.nodeLister.Get(n.nodeName) + if err != nil { + return nil, err + } + + kubeletConfig, err := utils.GetKubeletConfig(context.TODO(), n.client, n.nodeName) + if err != nil { + return nil, fmt.Errorf("failed to get config from kubelet endpoint: %v", err) + } + + newNrt, err := BuildNodeResourceTopology(n.sysPath, kubeletConfig, node) + if err != nil { + return nil, fmt.Errorf("failed to build node resource topology: %v", err) + } + + if err = CreateOrUpdateNodeResourceTopology(n.craneClient, nrt, newNrt); err != nil { + return nil, fmt.Errorf("failed to create or update node resource topology: %v", err) + } + return nil, nil +} + +func (n *NodeResourceTopology) Stop() error { + return nil +} + +func BuildNodeResourceTopology(sysPath string, kubeletConfig *kubeletconfiginternal.KubeletConfiguration, + node *corev1.Node) (*topologyapi.NodeResourceTopology, error) { + topo, err := ghw.Topology(ghw.WithPathOverrides(ghw.PathOverrides{ + "/sys": sysPath, + })) + if err != nil { + return nil, fmt.Errorf("failed to detect topology info by GHW: %v", err) + } + kubeReserved, err := parseResourceList(kubeletConfig.KubeReserved) + if err != nil { + return nil, err + } + systemReserved, err := parseResourceList(kubeletConfig.SystemReserved) + if err != nil { + return nil, err + } + reserved := quotav1.Add(kubeReserved, systemReserved) + + cpuManagerPolicy := topologyapi.CPUManagerPolicyStatic + // If kubelet cpumanager policy is static, we should set the agent cpu manager policy to none. + if kubeletConfig.CPUManagerPolicy == string(kubeletcpumanager.PolicyStatic) { + cpuManagerPolicy = topologyapi.CPUManagerPolicyNone + } + + nrtBuilder := topology.NewNRTBuilder() + nrtBuilder.WithNode(node) + nrtBuilder.WithReserved(reserved) + nrtBuilder.WithTopologyInfo(topo) + nrtBuilder.WithCPUManagerPolicy(cpuManagerPolicy) + newNrt := nrtBuilder.Build() + _ = controllerutil.SetControllerReference(node, newNrt, scheme.Scheme) + return newNrt, nil +} + +func CreateOrUpdateNodeResourceTopology(craneClient craneclientset.Interface, old, new *topologyapi.NodeResourceTopology) error { + if old == nil { + _, err := craneClient.TopologyV1alpha1().NodeResourceTopologies().Create(context.TODO(), new, metav1.CreateOptions{}) + return err + } + new.TypeMeta = old.TypeMeta + new.ObjectMeta = old.ObjectMeta + + if equality.Semantic.DeepEqual(old, new) { + return nil + } + + oldData, err := json.Marshal(old) + if err != nil { + return err + } + newData, err := json.Marshal(new) + if err != nil { + return err + } + patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) + if err != nil { + return fmt.Errorf("failed to create merge patch: %v", err) + } + _, err = craneClient.TopologyV1alpha1().NodeResourceTopologies().Patch(context.TODO(), new.Name, patchtypes.MergePatchType, patchBytes, metav1.PatchOptions{}) + return err +} + +// parseResourceList parses the given configuration map into an API +// ResourceList or returns an error. +func parseResourceList(m map[string]string) (corev1.ResourceList, error) { + if len(m) == 0 { + return nil, nil + } + rl := make(corev1.ResourceList) + for k, v := range m { + switch corev1.ResourceName(k) { + // CPU, memory, local storage, and PID resources are supported. + case corev1.ResourceCPU, corev1.ResourceMemory, corev1.ResourceEphemeralStorage, pidlimit.PIDs: + q, err := apiresource.ParseQuantity(v) + if err != nil { + return nil, err + } + if q.Sign() == -1 { + return nil, fmt.Errorf("resource quantity for %q cannot be negative: %v", k, v) + } + rl[corev1.ResourceName(k)] = q + default: + return nil, fmt.Errorf("cannot reserve %q resource", k) + } + } + return rl, nil +} diff --git a/pkg/ensurance/collector/types/types.go b/pkg/ensurance/collector/types/types.go index 681b0367b..17a1b2ca4 100644 --- a/pkg/ensurance/collector/types/types.go +++ b/pkg/ensurance/collector/types/types.go @@ -3,11 +3,12 @@ package types type CollectType string const ( - NodeLocalCollectorType CollectType = "node-local" - CadvisorCollectorType CollectType = "cadvisor" - EbpfCollectorType CollectType = "ebpf" - MetricsServerCollectorType CollectType = "metrics-server" - NodeResourceCollectorType CollectType = "node-resource" + NodeLocalCollectorType CollectType = "node-local" + CadvisorCollectorType CollectType = "cadvisor" + EbpfCollectorType CollectType = "ebpf" + MetricsServerCollectorType CollectType = "metrics-server" + NodeResourceCollectorType CollectType = "node-resource" + NodeResourceTopologyCollectorType CollectType = "node-resource-topology" ) type MetricName string diff --git a/pkg/features/features.go b/pkg/features/features.go index 77a40c5c6..a7026f95c 100644 --- a/pkg/features/features.go +++ b/pkg/features/features.go @@ -16,6 +16,9 @@ const ( // CraneNodeResource enables the node resource features. CraneNodeResource featuregate.Feature = "NodeResource" + // CraneNodeResourceTopology enables node resource topology features. + CraneNodeResourceTopology featuregate.Feature = "NodeResourceTopology" + // CranePodResource enables the pod resource features. CranePodResource featuregate.Feature = "PodResource" @@ -33,6 +36,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ CraneAutoscaling: {Default: true, PreRelease: featuregate.Alpha}, CraneAnalysis: {Default: true, PreRelease: featuregate.Alpha}, CraneNodeResource: {Default: true, PreRelease: featuregate.Alpha}, + CraneNodeResourceTopology: {Default: true, PreRelease: featuregate.Alpha}, CranePodResource: {Default: true, PreRelease: featuregate.Alpha}, CraneClusterNodePrediction: {Default: false, PreRelease: featuregate.Alpha}, CraneTimeSeriesPrediction: {Default: true, PreRelease: featuregate.Alpha}, diff --git a/pkg/utils/node.go b/pkg/utils/node.go index f8670bd72..4ad9d63ab 100644 --- a/pkg/utils/node.go +++ b/pkg/utils/node.go @@ -1,17 +1,23 @@ package utils import ( + "encoding/json" "fmt" "strconv" - topologyapi "github.com/gocrane/api/topology/v1alpha1" "golang.org/x/net/context" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" + kubeclient "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/klog/v2" + kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1" + kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config" + kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme" + + topologyapi "github.com/gocrane/api/topology/v1alpha1" ) const defaultRetryTimes = 3 @@ -172,3 +178,38 @@ func IsNodeAwareOfTopology(attr map[string]string) *bool { func BuildZoneName(nodeID int) string { return fmt.Sprintf("node%d", nodeID) } + +func GetKubeletConfig(ctx context.Context, c kubeclient.Interface, hostname string) (*kubeletconfiginternal.KubeletConfiguration, error) { + result, err := c.CoreV1().RESTClient().Get(). + Resource("nodes"). + SubResource("proxy"). + Name(hostname). + Suffix("configz"). + Do(ctx). + Raw() + if err != nil { + return nil, err + } + + // This hack because /configz reports the following structure: + // {"kubeletconfig": {the JSON representation of kubeletconfigv1beta1.KubeletConfiguration}} + type configzWrapper struct { + ComponentConfig kubeletconfigv1beta1.KubeletConfiguration `json:"kubeletconfig"` + } + configz := configzWrapper{} + + if err = json.Unmarshal(result, &configz); err != nil { + return nil, fmt.Errorf("failed to unmarshal json for kubelet config: %v", err) + } + + scheme, _, err := kubeletscheme.NewSchemeAndCodecs() + if err != nil { + return nil, err + } + cfg := kubeletconfiginternal.KubeletConfiguration{} + if err = scheme.Convert(&configz.ComponentConfig, &cfg, nil); err != nil { + return nil, err + } + + return &cfg, nil +}