Skip to content

Commit

Permalink
move nrt collection to collector
Browse files Browse the repository at this point in the history
Signed-off-by: Garrybest <garrybest@foxmail.com>
  • Loading branch information
Garrybest committed Sep 17, 2022
1 parent f49e618 commit 1cbbb77
Show file tree
Hide file tree
Showing 7 changed files with 305 additions and 168 deletions.
57 changes: 11 additions & 46 deletions cmd/crane-agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package app

import (
"context"
"encoding/json"
"flag"
"fmt"
"os"
"time"

Expand All @@ -19,9 +17,6 @@ import (
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/klog/v2"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme"
ctrl "sigs.k8s.io/controller-runtime"

ensuranceapi "github.com/gocrane/api/ensurance/v1alpha1"
Expand Down Expand Up @@ -90,10 +85,6 @@ func Run(ctx context.Context, opts *options.Options) error {
if err != nil {
return err
}
kubeletConfig, err := getKubeletConfig(ctx, kubeClient, hostname)
if err != nil {
return err
}

podInformerFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, informerSyncPeriod,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
Expand All @@ -115,8 +106,15 @@ func Run(ctx context.Context, opts *options.Options) error {
actionInformer := craneInformerFactory.Ensurance().V1alpha1().AvoidanceActions()
tspInformer := craneInformerFactory.Prediction().V1alpha1().TimeSeriesPredictions()

newAgent, err := agent.NewAgent(ctx, hostname, opts.RuntimeEndpoint, opts.CgroupDriver, opts.SysPath, kubeClient, craneClient, kubeletConfig, podInformer, nodeInformer,
nodeQOSInformer, podQOSInformer, actionInformer, tspInformer, opts.NodeResourceReserved, opts.Ifaces, healthCheck, opts.CollectInterval, opts.ExecuteExcess)
nrtInformerFactory := craneinformers.NewSharedInformerFactoryWithOptions(craneClient, informerSyncPeriod,
craneinformers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector(nodeNameField, hostname).String()
}),
)
nrtInformer := nrtInformerFactory.Topology().V1alpha1().NodeResourceTopologies()

newAgent, err := agent.NewAgent(ctx, hostname, opts.RuntimeEndpoint, opts.CgroupDriver, opts.SysPath, kubeClient, craneClient, podInformer, nodeInformer,
nodeQOSInformer, podQOSInformer, actionInformer, tspInformer, nrtInformer, opts.NodeResourceReserved, opts.Ifaces, healthCheck, opts.CollectInterval, opts.ExecuteExcess)

if err != nil {
return err
Expand All @@ -125,10 +123,12 @@ func Run(ctx context.Context, opts *options.Options) error {
podInformerFactory.Start(ctx.Done())
nodeInformerFactory.Start(ctx.Done())
craneInformerFactory.Start(ctx.Done())
nrtInformerFactory.Start(ctx.Done())

podInformerFactory.WaitForCacheSync(ctx.Done())
nodeInformerFactory.WaitForCacheSync(ctx.Done())
craneInformerFactory.WaitForCacheSync(ctx.Done())
nrtInformerFactory.WaitForCacheSync(ctx.Done())

newAgent.Run(healthCheck, opts.EnableProfiling, opts.BindAddr)
return nil
Expand Down Expand Up @@ -163,38 +163,3 @@ func getHostName(override string) string {
}
return nodeName
}

func getKubeletConfig(ctx context.Context, c kubernetes.Interface, hostname string) (*kubeletconfiginternal.KubeletConfiguration, error) {
result, err := c.CoreV1().RESTClient().Get().
Resource("nodes").
SubResource("proxy").
Name(hostname).
Suffix("configz").
Do(ctx).
Raw()
if err != nil {
return nil, err
}

// This hack because /configz reports the following structure:
// {"kubeletconfig": {the JSON representation of kubeletconfigv1beta1.KubeletConfiguration}}
type configzWrapper struct {
ComponentConfig kubeletconfigv1beta1.KubeletConfiguration `json:"kubeletconfig"`
}
configz := configzWrapper{}

if err = json.Unmarshal(result, &configz); err != nil {
return nil, fmt.Errorf("failed to unmarshal json for kubelet config: %v", err)
}

scheme, _, err := kubeletscheme.NewSchemeAndCodecs()
if err != nil {
return nil, err
}
cfg := kubeletconfiginternal.KubeletConfiguration{}
if err = scheme.Convert(&configz.ComponentConfig, &cfg, nil); err != nil {
return nil, err
}

return &cfg, nil
}
150 changes: 36 additions & 114 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,12 @@ import (
"reflect"
"time"

jsonpatch "github.com/evanphx/json-patch"
"github.com/jaypipes/ghw"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apiresource "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/yaml"
quotav1 "k8s.io/apiserver/pkg/quota/v1"
"k8s.io/apiserver/pkg/server/mux"
"k8s.io/apiserver/pkg/server/routes"
utilfeature "k8s.io/apiserver/pkg/util/feature"
Expand All @@ -29,67 +24,64 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
kubeletcpumanager "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/stats/pidlimit"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

ensuranceapi "github.com/gocrane/api/ensurance/v1alpha1"
craneclientset "github.com/gocrane/api/pkg/generated/clientset/versioned"
"github.com/gocrane/api/pkg/generated/informers/externalversions/ensurance/v1alpha1"
predictionv1 "github.com/gocrane/api/pkg/generated/informers/externalversions/prediction/v1alpha1"
topologyinformer "github.com/gocrane/api/pkg/generated/informers/externalversions/topology/v1alpha1"
predictionapi "github.com/gocrane/api/prediction/v1alpha1"
topologyapi "github.com/gocrane/api/topology/v1alpha1"

"github.com/gocrane/crane/pkg/ensurance/analyzer"
"github.com/gocrane/crane/pkg/ensurance/cm"
"github.com/gocrane/crane/pkg/ensurance/collector"
"github.com/gocrane/crane/pkg/ensurance/collector/cadvisor"
"github.com/gocrane/crane/pkg/ensurance/collector/noderesourcetopology"
"github.com/gocrane/crane/pkg/ensurance/executor"
"github.com/gocrane/crane/pkg/ensurance/manager"
"github.com/gocrane/crane/pkg/features"
"github.com/gocrane/crane/pkg/metrics"
"github.com/gocrane/crane/pkg/resource"
"github.com/gocrane/crane/pkg/topology"
"github.com/gocrane/crane/pkg/utils"
)

type Agent struct {
ctx context.Context
name string
nodeName string
kubeClient kubernetes.Interface
craneClient craneclientset.Interface
managers []manager.Manager
kubeletConfig *kubeletconfiginternal.KubeletConfiguration
ctx context.Context
name string
nodeName string
kubeClient kubernetes.Interface
craneClient craneclientset.Interface
managers []manager.Manager
}

func NewAgent(ctx context.Context,
nodeName, runtimeEndpoint, cgroupDriver, sysPath string,
kubeClient kubernetes.Interface,
craneClient craneclientset.Interface,
kubeletConfig *kubeletconfiginternal.KubeletConfiguration,
podInformer coreinformers.PodInformer,
nodeInformer coreinformers.NodeInformer,
nodeQOSInformer v1alpha1.NodeQOSInformer,
podQOSInformer v1alpha1.PodQOSInformer,
actionInformer v1alpha1.AvoidanceActionInformer,
tspInformer predictionv1.TimeSeriesPredictionInformer,
nrtInformer topologyinformer.NodeResourceTopologyInformer,
nodeResourceReserved map[string]string,
ifaces []string,
healthCheck *metrics.HealthCheck,
CollectInterval time.Duration,
collectInterval time.Duration,
executeExcess string,
) (*Agent, error) {
var managers []manager.Manager
var noticeCh = make(chan executor.AvoidanceExecutor)
agent := &Agent{
ctx: ctx,
name: getAgentName(nodeName),
nodeName: nodeName,
kubeClient: kubeClient,
craneClient: craneClient,
kubeletConfig: kubeletConfig,
ctx: ctx,
name: getAgentName(nodeName),
nodeName: nodeName,
kubeClient: kubeClient,
craneClient: craneClient,
}

utilruntime.Must(ensuranceapi.AddToScheme(scheme.Scheme))
Expand All @@ -101,7 +93,7 @@ func NewAgent(ctx context.Context,
exclusiveCPUSet = cpuManager.GetExclusiveCpu
managers = appendManagerIfNotNil(managers, cpuManager)
}
stateCollector := collector.NewStateCollector(nodeName, nodeQOSInformer.Lister(), podInformer.Lister(), nodeInformer.Lister(), ifaces, healthCheck, CollectInterval, exclusiveCPUSet, cadvisorManager)
stateCollector := collector.NewStateCollector(nodeName, sysPath, kubeClient, craneClient, nodeQOSInformer.Lister(), nrtInformer.Lister(), podInformer.Lister(), nodeInformer.Lister(), ifaces, healthCheck, collectInterval, exclusiveCPUSet, cadvisorManager)
managers = appendManagerIfNotNil(managers, stateCollector)
analyzerManager := analyzer.NewAnomalyAnalyzer(kubeClient, nodeName, podInformer, nodeInformer, nodeQOSInformer, podQOSInformer, actionInformer, stateCollector.AnalyzerChann, noticeCh)
managers = appendManagerIfNotNil(managers, analyzerManager)
Expand All @@ -122,9 +114,10 @@ func NewAgent(ctx context.Context,
managers = appendManagerIfNotNil(managers, podResourceManager)
}

_, err := agent.CreateNodeResourceTopology(sysPath)
if err != nil {
return agent, err
if utilfeature.DefaultFeatureGate.Enabled(features.CraneNodeResourceTopology) {
if err := agent.CreateNodeResourceTopology(sysPath); err != nil {
return agent, err
}
}

agent.managers = managers
Expand Down Expand Up @@ -251,84 +244,39 @@ func (a *Agent) CreateNodeResourceTsp() string {
return a.GenerateNodeResourceTspName()
}

func (a *Agent) CreateNodeResourceTopology(sysPath string) (*topologyapi.NodeResourceTopology, error) {
topo, err := ghw.Topology(ghw.WithPathOverrides(ghw.PathOverrides{
"/sys": sysPath,
}))
if err != nil {
return nil, fmt.Errorf("failed to detect topology info by GHW: %v", err)
}
klog.InfoS("Get topology info from GHW finished", "info", topo.String())

exist := true
func (a *Agent) CreateNodeResourceTopology(sysPath string) error {
nrt, err := a.craneClient.TopologyV1alpha1().NodeResourceTopologies().Get(context.TODO(), a.nodeName, metav1.GetOptions{})
if err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("Failed to get node resource topology: %v", err)
return nil, err
return err
}
exist = false
nrt = nil
}

node, err := a.kubeClient.CoreV1().Nodes().Get(context.TODO(), a.nodeName, metav1.GetOptions{})
if err != nil {
klog.Errorf("Failed to get node: %v", err)
return nil, err
return err
}

kubeReserved, err := parseResourceList(a.kubeletConfig.KubeReserved)
kubeletConfig, err := utils.GetKubeletConfig(context.TODO(), a.kubeClient, a.nodeName)
if err != nil {
return nil, err
}
systemReserved, err := parseResourceList(a.kubeletConfig.SystemReserved)
if err != nil {
return nil, err
klog.Errorf("Failed to get kubelet config: %v", err)
return err
}
reserved := quotav1.Add(kubeReserved, systemReserved)

cpuManagerPolicy := topologyapi.CPUManagerPolicyStatic
// If kubelet cpumanager policy is static, we should set the agent cpu manager policy to none.
if a.kubeletConfig.CPUManagerPolicy == string(kubeletcpumanager.PolicyStatic) {
cpuManagerPolicy = topologyapi.CPUManagerPolicyNone
newNrt, err := noderesourcetopology.BuildNodeResourceTopology(sysPath, kubeletConfig, node)
if err != nil {
klog.Errorf("Failed to build node resource topology: %v", err)
return err
}

nrtBuilder := topology.NewNRTBuilder()
nrtBuilder.WithNode(node)
nrtBuilder.WithReserved(reserved)
nrtBuilder.WithTopologyInfo(topo)
nrtBuilder.WithCPUManagerPolicy(cpuManagerPolicy)
newNrt := nrtBuilder.Build()
_ = controllerutil.SetControllerReference(node, newNrt, scheme.Scheme)

if exist {
newNrt.TypeMeta = nrt.TypeMeta
newNrt.ObjectMeta = nrt.ObjectMeta
oldData, err := json.Marshal(nrt)
if err != nil {
return nil, err
}
newData, err := json.Marshal(newNrt)
if err != nil {
return nil, err
}
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return nil, fmt.Errorf("failed to create merge patch: %v", err)
}
nrt, err = a.craneClient.TopologyV1alpha1().NodeResourceTopologies().Patch(context.TODO(), a.nodeName, types.MergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
klog.Errorf("Failed to update node resource topology %s: %v", a.nodeName, err)
return nil, err
}
return nrt, nil
} else {
nrt, err = a.craneClient.TopologyV1alpha1().NodeResourceTopologies().Create(context.TODO(), newNrt, metav1.CreateOptions{})
if err != nil {
klog.Errorf("Failed to create node resource topology %s: %v", a.nodeName, err)
return nil, err
}
return nrt, nil
if err = noderesourcetopology.CreateOrUpdateNodeResourceTopology(a.craneClient, nrt, newNrt); err != nil {
klog.Errorf("Failed to create or update node resource topology: %v", err)
return err
}
return nil
}

func (a *Agent) DeleteNodeResourceTsp() error {
Expand All @@ -349,29 +297,3 @@ func appendManagerIfNotNil(managers []manager.Manager, m manager.Manager) []mana
}
return managers
}

// parseResourceList parses the given configuration map into an API
// ResourceList or returns an error.
func parseResourceList(m map[string]string) (corev1.ResourceList, error) {
if len(m) == 0 {
return nil, nil
}
rl := make(corev1.ResourceList)
for k, v := range m {
switch corev1.ResourceName(k) {
// CPU, memory, local storage, and PID resources are supported.
case corev1.ResourceCPU, corev1.ResourceMemory, corev1.ResourceEphemeralStorage, pidlimit.PIDs:
q, err := apiresource.ParseQuantity(v)
if err != nil {
return nil, err
}
if q.Sign() == -1 {
return nil, fmt.Errorf("resource quantity for %q cannot be negative: %v", k, v)
}
rl[corev1.ResourceName(k)] = q
default:
return nil, fmt.Errorf("cannot reserve %q resource", k)
}
}
return rl, nil
}
Loading

0 comments on commit 1cbbb77

Please sign in to comment.