Skip to content

Commit

Permalink
crane-agent only create tsp
Browse files Browse the repository at this point in the history
Change the units of ext-cpu
  • Loading branch information
shijieqin committed Apr 21, 2022
1 parent 7d1147e commit 6089ed3
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 71 deletions.
2 changes: 1 addition & 1 deletion cmd/crane-agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func Run(ctx context.Context, opts *options.Options) error {
tspInformer := craneInformerFactory.Prediction().V1alpha1().TimeSeriesPredictions()

newAgent, err := agent.NewAgent(ctx, hostname, opts.RuntimeEndpoint, kubeClient, craneClient,
podInformer, nodeInformer, nepInformer, actionInformer, tspInformer, opts.NodeResourceOptions, opts.Ifaces, healthCheck, opts.CollectInterval)
podInformer, nodeInformer, nepInformer, actionInformer, tspInformer, opts.NodeResourceReserved, opts.Ifaces, healthCheck, opts.CollectInterval)

if err != nil {
return err
Expand Down
18 changes: 5 additions & 13 deletions cmd/crane-agent/app/options/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

"github.com/spf13/pflag"
cliflag "k8s.io/component-base/cli/flag"
)

// Options hold the command-line options about crane manager
Expand All @@ -21,21 +22,13 @@ type Options struct {
// MaxInactivity is the maximum time from last recorded activity before automatic restart
MaxInactivity time.Duration
// Ifaces is the network devices to collect metric
Ifaces []string
//NodeResourceOptions is the options of nodeResource
NodeResourceOptions NodeResourceOptions
}

type NodeResourceOptions struct {
ReserveCpuPercentStr string
ReserveMemoryPercentStr string
Ifaces []string
NodeResourceReserved map[string]string
}

// NewOptions builds an empty options.
func NewOptions() *Options {
return &Options{
NodeResourceOptions: NodeResourceOptions{},
}
return &Options{}
}

// Complete completes all the required options.
Expand All @@ -56,7 +49,6 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
flags.StringVar(&o.BindAddr, "bind-address", "0.0.0.0:8081", "The address the agent binds to for metrics, health-check and pprof, default: 0.0.0.0:8081.")
flags.DurationVar(&o.CollectInterval, "collect-interval", 10*time.Second, "Period for the state collector to collect metrics, default: 10s")
flags.StringArrayVar(&o.Ifaces, "ifaces", []string{"eth0"}, "The network devices to collect metric, use comma to separated, default: eth0")
flags.StringVar(&o.NodeResourceOptions.ReserveCpuPercentStr, "reserve-cpu-percent", "", "reserve cpu percentage of node.")
flags.StringVar(&o.NodeResourceOptions.ReserveMemoryPercentStr, "reserve-memory-percent", "", "reserve memory percentage of node.")
flags.Var(cliflag.NewMapStringString(&o.NodeResourceReserved), "node-resource-reserved", "A set of ResourceName=Percent (e.g. cpu=40%,memory=40%)")
flags.DurationVar(&o.MaxInactivity, "max-inactivity", 5*time.Minute, "Maximum time from last recorded activity before automatic restart, default: 5min")
}
2 changes: 1 addition & 1 deletion examples/noderesource-tsp-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ data:
resourceIdentifier: cpu
type: ExpressionQuery
expressionQuery:
expression: 'node_cpu_cannot_be_reclaimed_seconds{node=~"({{nodename}})(:\\d+)?"}'
expression: 'node_cpu_cannot_be_reclaimed_seconds{node=~"({{ .metadata.name }})(:\\d+)?"}'
predictionWindowSeconds: 180
kind: ConfigMap
metadata:
Expand Down
109 changes: 72 additions & 37 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package agent

import (
"bytes"
"context"
"encoding/json"
"fmt"
"html/template"
"net/http"
"strings"
"reflect"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/uuid"
Expand All @@ -28,7 +32,6 @@ import (
"github.com/gocrane/api/pkg/generated/informers/externalversions/ensurance/v1alpha1"
predictionv1 "github.com/gocrane/api/pkg/generated/informers/externalversions/prediction/v1alpha1"
v1alpha12 "github.com/gocrane/api/prediction/v1alpha1"
"github.com/gocrane/crane/cmd/crane-agent/app/options"
"github.com/gocrane/crane/pkg/ensurance/analyzer"
"github.com/gocrane/crane/pkg/ensurance/cm"
"github.com/gocrane/crane/pkg/ensurance/collector"
Expand Down Expand Up @@ -58,7 +61,7 @@ func NewAgent(ctx context.Context,
nepInformer v1alpha1.NodeQOSEnsurancePolicyInformer,
actionInformer v1alpha1.AvoidanceActionInformer,
tspInformer predictionv1.TimeSeriesPredictionInformer,
nodeResourceOptions options.NodeResourceOptions,
nodeResourceReserved map[string]string,
ifaces []string,
healthCheck *metrics.HealthCheck,
CollectInterval time.Duration,
Expand Down Expand Up @@ -88,7 +91,14 @@ func NewAgent(ctx context.Context,
avoidanceManager := executor.NewActionExecutor(kubeClient, nodeName, podInformer, nodeInformer, noticeCh, runtimeEndpoint)
managers = appendManagerIfNotNil(managers, avoidanceManager)
if nodeResource := utilfeature.DefaultFeatureGate.Enabled(features.CraneNodeResource); nodeResource {
nodeResourceManager := resource.NewNodeResourceManager(kubeClient, nodeName, nodeResourceOptions.ReserveCpuPercentStr, nodeResourceOptions.ReserveMemoryPercentStr, agent.CreateNodeResourceTsp(), nodeInformer, tspInformer, stateCollector.NodeResourceChann)
tspName, err := agent.CreateNodeResourceTsp()
if err != nil {
return agent, err
}
nodeResourceManager, err := resource.NewNodeResourceManager(kubeClient, nodeName, nodeResourceReserved, tspName, nodeInformer, tspInformer, stateCollector.NodeResourceChann)
if err != nil {
return agent, err
}
managers = appendManagerIfNotNil(managers, nodeResourceManager)
}

Expand Down Expand Up @@ -127,61 +137,93 @@ func (a *Agent) Run(healthCheck *metrics.HealthCheck, enableProfiling bool, bind
}()

<-a.ctx.Done()

_ = a.DeleteNodeResourceTsp()
}

func getAgentName(nodeName string) string {
return nodeName + "." + string(uuid.NewUUID())
}

func (a *Agent) CreateNodeResourceTsp() string {
func (a *Agent) CreateNodeResourceTsp() (string, error) {
tsp, err := a.craneClient.PredictionV1alpha1().TimeSeriesPredictions(resource.TspNamespace).Get(context.TODO(), a.GenerateNodeResourceTspName(), metav1.GetOptions{})
if err == nil {
klog.V(4).Infof("Found old tsp %s in namespace %s", a.GenerateNodeResourceTspName(), resource.TspNamespace)
err := a.DeleteNodeResourceTsp()
if err != nil {
klog.Exitf("Delete old tsp %s with error: %v", a.GenerateNodeResourceTspName(), err)
if err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("Failed to get noderesource tsp : %v", err)
return "", err
}
}
config, err := a.kubeClient.CoreV1().ConfigMaps(resource.TspNamespace).Get(context.TODO(), "noderesource-tsp-template", metav1.GetOptions{})

if err != nil {
klog.Exitf("Get noderesource tsp configmap noderesource-tsp-template with error: %v", err)
klog.Errorf("Failed to get noderesource tsp configmap : %v", err)
}

if config == nil {
klog.Exitf("Can't get noderesource tsp configmap noderesource-tsp-template")
klog.Errorf("Can't get noderesource tsp configmap noderesource-tsp-template")
}

spec := v1alpha12.TimeSeriesPredictionSpec{}
err = yaml.Unmarshal([]byte(strings.Replace(config.Data["spec"], "{{nodename}}", a.nodeName, -1)), &spec)
n, err := a.kubeClient.CoreV1().Nodes().Get(context.TODO(), a.nodeName, metav1.GetOptions{})
if err != nil {
klog.Exitf("Convert spec template error: %v", err)
klog.Errorf("Failed to get node : %v", err)
return "", err
}

n, err := a.kubeClient.CoreV1().Nodes().Get(context.TODO(), a.nodeName, metav1.GetOptions{})
spec := v1alpha12.TimeSeriesPredictionSpec{}
tpl, err := template.New("").Parse(config.Data["spec"])
if err != nil {
klog.Exitf("Get node error: %v", err)
klog.Errorf("Failed to convert spec template : %v", err)
return "", err
}
var buf bytes.Buffer
raw, _ := json.Marshal(n)
var data interface{}
_ = json.Unmarshal(raw, &data)
err = tpl.Execute(&buf, data)
if err != nil {
klog.Errorf("Failed to convert spec template : %v", err)
return "", err
}
err = yaml.Unmarshal(buf.Bytes(), &spec)
if err != nil {
klog.Errorf("Failed to convert spec template : %v", err)
return "", err
}

tsp = &v1alpha12.TimeSeriesPrediction{}

tsp.Name = a.GenerateNodeResourceTspName()
tsp.Namespace = resource.TspNamespace
gvk, _ := apiutil.GVKForObject(n, scheme.Scheme)
spec.TargetRef = v1.ObjectReference{
Kind: gvk.Kind,
APIVersion: gvk.GroupVersion().String(),
Name: a.nodeName,
}
tsp.Spec = spec
_ = controllerutil.SetControllerReference(n, tsp, scheme.Scheme)
_, err = a.craneClient.PredictionV1alpha1().TimeSeriesPredictions(tsp.Namespace).Create(context.TODO(), tsp, metav1.CreateOptions{})
if err != nil {
klog.Exitf("Create noderesource tsp %s with error: %v", a.GenerateNodeResourceTspName(), err)

if tsp != nil {
klog.V(4).Infof("Discover the presence of old noderesource tsp and try to contrast the changes: %s", a.GenerateNodeResourceTspName())
if reflect.DeepEqual(tsp.Spec, spec) {
return a.GenerateNodeResourceTspName(), nil
}
klog.V(4).Infof("Discover the presence of old noderesource tsp and the Tsp rules have been changed: %s", a.GenerateNodeResourceTspName())
tsp.Spec = spec
_, err := a.craneClient.PredictionV1alpha1().TimeSeriesPredictions(tsp.Namespace).Update(context.TODO(), tsp, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update noderesource tsp %s : %v", a.GenerateNodeResourceTspName(), err)
return "", err
}
klog.V(4).Infof("The noderesource tsp is updated successfully: %s", a.GenerateNodeResourceTspName())
} else {
klog.V(4).Infof("The noderesource tsp does not exist, try to create a new one: %s", a.GenerateNodeResourceTspName())
tsp = &v1alpha12.TimeSeriesPrediction{}
tsp.Name = a.GenerateNodeResourceTspName()
tsp.Namespace = resource.TspNamespace
tsp.Spec = spec
_ = controllerutil.SetControllerReference(n, tsp, scheme.Scheme)
_, err = a.craneClient.PredictionV1alpha1().TimeSeriesPredictions(tsp.Namespace).Create(context.TODO(), tsp, metav1.CreateOptions{})
if err != nil {
klog.Errorf("Failed to create noderesource tsp %s : %v", a.GenerateNodeResourceTspName(), err)
return "", err
}
klog.V(4).Infof("The noderesource tsp is created successfully: %s", a.GenerateNodeResourceTspName())
}
return a.GenerateNodeResourceTspName()

return a.GenerateNodeResourceTspName(), nil
}

func (a *Agent) DeleteNodeResourceTsp() error {
Expand All @@ -193,14 +235,7 @@ func (a *Agent) DeleteNodeResourceTsp() error {
}

func (a *Agent) GenerateNodeResourceTspName() string {
return fmt.Sprintf("noderesource-%s", a.name)
}

func appendManagerIfNotNil(managers []manager.Manager, m manager.Manager) []manager.Manager {
if m != nil {
return append(managers, m)
}
return managers
return fmt.Sprintf("noderesource-%s", a.nodeName)
}

func appendManagerIfNotNil(managers []manager.Manager, m manager.Manager) []manager.Manager {
Expand Down
10 changes: 9 additions & 1 deletion pkg/ensurance/cm/advanced_cpu_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type AdvancedCpuManager struct {
// stateFileDirectory holds the directory where the state file for checkpoints is held.
stateFileDirectory string

exclusiveCPUSet cpuset.CPUSet

cadvisor.Manager
}

Expand Down Expand Up @@ -255,6 +257,8 @@ func (m *AdvancedCpuManager) syncState(doAllocate bool) {
}
}
}

m.exclusiveCPUSet = m.getExclusiveCpu()
}

func (m *AdvancedCpuManager) policyRemoveContainerByRef(podUID string, containerName string) error {
Expand Down Expand Up @@ -299,7 +303,7 @@ func (m *AdvancedCpuManager) getSharedCpu() cpuset.CPUSet {
return sharedCPUSet
}

func (m *AdvancedCpuManager) GetExclusiveCpu() cpuset.CPUSet {
func (m *AdvancedCpuManager) getExclusiveCpu() cpuset.CPUSet {
exclusiveCPUSet := cpuset.NewCPUSet()
for _, pod := range m.activepods() {
for _, container := range pod.Spec.Containers {
Expand All @@ -313,6 +317,10 @@ func (m *AdvancedCpuManager) GetExclusiveCpu() cpuset.CPUSet {
return exclusiveCPUSet
}

func (m *AdvancedCpuManager) GetExclusiveCpu() cpuset.CPUSet {
return m.exclusiveCPUSet
}

func (m *AdvancedCpuManager) activepods() []*v1.Pod {
allPods, _ := m.podLister.List(labels.Everything())
activePods := make([]*v1.Pod, 0, len(allPods))
Expand Down
5 changes: 4 additions & 1 deletion pkg/ensurance/collector/cadvisor/cadvisor_unsupported.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,23 @@
package cadvisor

import (
"errors"

info "github.com/google/cadvisor/info/v1"
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
corelisters "k8s.io/client-go/listers/core/v1"

"github.com/gocrane/crane/pkg/common"
"github.com/gocrane/crane/pkg/ensurance/collector/types"
)

var errUnsupported = errors.New("cAdvisor is unsupported in this build")

type CadvisorCollectorUnsupport struct {
Manager Manager
}

type CadvisorManagerUnsupport struct {}
type CadvisorManagerUnsupport struct{}

func NewCadvisorManager() Manager {
return &CadvisorManagerUnsupport{}
Expand Down
1 change: 0 additions & 1 deletion pkg/ensurance/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ func NewStateCollector(nodeName string, nepLister ensuranceListers.NodeQOSEnsura
analyzerChann := make(chan map[string][]common.TimeSeries)
nodeResourceChann := make(chan map[string][]common.TimeSeries)
podResourceChann := make(chan map[string][]common.TimeSeries)
c := cadvisor.NewCadvisorManager()
return &StateCollector{
nodeName: nodeName,
nepLister: nepLister,
Expand Down
6 changes: 5 additions & 1 deletion pkg/ensurance/collector/nodelocal/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/load"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"

"github.com/gocrane/crane/pkg/common"
"github.com/gocrane/crane/pkg/ensurance/collector/types"
Expand Down Expand Up @@ -85,7 +86,10 @@ func collectCPU(nodeLocalContext *nodeLocalContext) (map[string][]common.TimeSer
usagePercent := calculateBusy(nodeState.latestCpuState.stat, currentCpuState.stat)
usageCore := usagePercent * float64(nodeState.cpuCoreNumbers) * 1000 / types.MaxPercentage

cpuSet := nodeLocalContext.exclusiveCPUSet()
cpuSet := cpuset.NewCPUSet()
if nodeLocalContext.exclusiveCPUSet != nil {
cpuSet = nodeLocalContext.exclusiveCPUSet()
}
var exclusiveCPUIdle float64 = 0

for cpuId, stat := range currentCpuState.perStat {
Expand Down
Loading

0 comments on commit 6089ed3

Please sign in to comment.