Skip to content

Commit

Permalink
Implement evpa-controller
Browse files Browse the repository at this point in the history
  • Loading branch information
qmhu committed Mar 17, 2022
1 parent 2023fa4 commit ba6b1c4
Show file tree
Hide file tree
Showing 14 changed files with 1,966 additions and 905 deletions.
148 changes: 89 additions & 59 deletions cmd/craned/app/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ import (
"github.com/gocrane/crane/pkg/controller/analytics"
"github.com/gocrane/crane/pkg/controller/cnp"
"github.com/gocrane/crane/pkg/controller/ehpa"
"github.com/gocrane/crane/pkg/controller/evpa"
"github.com/gocrane/crane/pkg/controller/recommendation"
"github.com/gocrane/crane/pkg/controller/timeseriesprediction"
"github.com/gocrane/crane/pkg/features"
"github.com/gocrane/crane/pkg/known"
"github.com/gocrane/crane/pkg/metrics"
"github.com/gocrane/crane/pkg/oom"
"github.com/gocrane/crane/pkg/prediction"
"github.com/gocrane/crane/pkg/prediction/dsp"
"github.com/gocrane/crane/pkg/prediction/percentile"
Expand All @@ -48,15 +50,6 @@ var (
scheme = runtime.NewScheme()
)

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(predictionapi.AddToScheme(scheme))
utilruntime.Must(analysisapi.AddToScheme(scheme))
utilruntime.Must(ensuranceapi.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme

}

// NewManagerCommand creates a *cobra.Command object with default parameters
func NewManagerCommand(ctx context.Context) *cobra.Command {
opts := options.NewOptions()
Expand Down Expand Up @@ -110,72 +103,57 @@ func Run(ctx context.Context, opts *options.Options) error {
return err
}

initializationScheme()
initializationWebhooks(mgr, opts)
initializationControllers(ctx, mgr, opts)
klog.Info("Starting crane manager")

if opts.WebhookConfig.Enabled {
initializationWebhooks(mgr)
}

// initialization custom collector metrics
initializationMetricCollector(mgr)
runAll(ctx, mgr, opts)

var eg errgroup.Group

eg.Go(func() error {
if err := mgr.Start(ctx); err != nil {
klog.Error(err, "problem running crane manager")
klog.Exit(err)
}
return nil
})

eg.Go(func() error {
// Start the craned web server
serverConfig := serverconfig.NewServerConfig()
if err := opts.ApplyTo(serverConfig); err != nil {
klog.Exit(err)
}
// use controller runtime rest config, we can not refer kubeconfig option directly because it is unexported variable in vendor/sigs.k8s.io/controller-runtime/pkg/client/config/config.go
serverConfig.KubeRestConfig = mgr.GetConfig()
craneServer, err := server.NewServer(serverConfig)
if err != nil {
klog.Exit(err)
}
craneServer.Run(ctx)
return nil
})
return nil
}

// wait for all components exit
if err := eg.Wait(); err != nil {
klog.Fatal(err)
func initializationScheme() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
if utilfeature.DefaultFeatureGate.Enabled(features.CraneAutoscaling) {
utilruntime.Must(autoscalingapi.AddToScheme(scheme))
}
if utilfeature.DefaultFeatureGate.Enabled(features.CraneNodeResource) || utilfeature.DefaultFeatureGate.Enabled(features.CraneClusterNodePrediction) {
utilruntime.Must(ensuranceapi.AddToScheme(scheme))
}
if utilfeature.DefaultMutableFeatureGate.Enabled(features.CraneAnalysis) {
utilruntime.Must(analysisapi.AddToScheme(scheme))
}
if utilfeature.DefaultFeatureGate.Enabled(features.CraneTimeSeriesPrediction) {
utilruntime.Must(predictionapi.AddToScheme(scheme))
}

return nil
}

func initializationMetricCollector(mgr ctrl.Manager) {
// register as prometheus metric collector
metrics.CustomCollectorRegister(metrics.NewTspMetricCollector(mgr.GetClient()))
}

func initializationWebhooks(mgr ctrl.Manager) {
func initializationWebhooks(mgr ctrl.Manager, opts *options.Options) {
if !opts.WebhookConfig.Enabled {
return
}

if certDir := os.Getenv("WEBHOOK_CERT_DIR"); len(certDir) > 0 {
mgr.GetWebhookServer().CertDir = certDir
}

if err := webhooks.SetupWebhookWithManager(mgr); err != nil {
if err := webhooks.SetupWebhookWithManager(mgr,
utilfeature.DefaultFeatureGate.Enabled(features.CraneAutoscaling),
utilfeature.DefaultFeatureGate.Enabled(features.CraneNodeResource),
utilfeature.DefaultFeatureGate.Enabled(features.CraneClusterNodePrediction),
utilfeature.DefaultMutableFeatureGate.Enabled(features.CraneAnalysis),
utilfeature.DefaultFeatureGate.Enabled(features.CraneTimeSeriesPrediction)); err != nil {
klog.Exit(err, "unable to create webhook", "webhook", "TimeSeriesPrediction")
}
}

// initializationControllers setup controllers with manager
func initializationControllers(ctx context.Context, mgr ctrl.Manager, opts *options.Options) {
autoscaling := utilfeature.DefaultFeatureGate.Enabled(features.CraneAutoscaling)
clusterNodePrediction := utilfeature.DefaultFeatureGate.Enabled(features.CraneClusterNodePrediction)
analysis := utilfeature.DefaultMutableFeatureGate.Enabled(features.CraneAnalysis)
timeseriespredict := utilfeature.DefaultFeatureGate.Enabled(features.CraneTimeSeriesPrediction)

discoveryClientSet, err := discovery.NewDiscoveryClientForConfig(mgr.GetConfig())
if err != nil {
klog.Exit(err, "Unable to create discover client")
Expand All @@ -188,8 +166,19 @@ func initializationControllers(ctx context.Context, mgr ctrl.Manager, opts *opti
scaleKindResolver,
)

if autoscaling {
utilruntime.Must(autoscalingapi.AddToScheme(scheme))
podOOMRecorder := &oom.PodOOMRecorder{
Client: mgr.GetClient(),
}
if err := podOOMRecorder.SetupWithManager(mgr); err != nil {
klog.Exit(err, "unable to create controller", "controller", "PodOOMRecorder")
}
go func() {
if err := podOOMRecorder.Run(ctx.Done()); err != nil {
klog.Warningf("Run oom recorder failed: %v", err)
}
}()

if utilfeature.DefaultFeatureGate.Enabled(features.CraneAutoscaling) {
if err := (&ehpa.EffectiveHPAController{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Expand Down Expand Up @@ -219,6 +208,15 @@ func initializationControllers(ctx context.Context, mgr ctrl.Manager, opts *opti
}).SetupWithManager(mgr); err != nil {
klog.Exit(err, "unable to create controller", "controller", "HPAReplicasController")
}

if err := (&evpa.EffectiveVPAController{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("effective-vpa-controller"),
OOMRecorder: podOOMRecorder,
}).SetupWithManager(mgr); err != nil {
klog.Exit(err, "unable to create controller", "controller", "EffectiveVPAController")
}
}

var dataSource providers.Interface
Expand Down Expand Up @@ -259,7 +257,7 @@ func initializationControllers(ctx context.Context, mgr ctrl.Manager, opts *opti
}

// TspController
if timeseriespredict {
if utilfeature.DefaultFeatureGate.Enabled(features.CraneTimeSeriesPrediction) {
tspController := timeseriesprediction.NewController(
mgr.GetClient(),
mgr.GetEventRecorderFor("time-series-prediction-controller"),
Expand All @@ -269,10 +267,9 @@ func initializationControllers(ctx context.Context, mgr ctrl.Manager, opts *opti
if err := tspController.SetupWithManager(mgr); err != nil {
klog.Exit(err, "unable to create controller", "controller", "TspController")
}

}

if analysis {
if utilfeature.DefaultMutableFeatureGate.Enabled(features.CraneAnalysis) {
if err := (&analytics.Controller{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Expand Down Expand Up @@ -302,7 +299,7 @@ func initializationControllers(ctx context.Context, mgr ctrl.Manager, opts *opti
}

// CnpController
if clusterNodePrediction {
if utilfeature.DefaultFeatureGate.Enabled(features.CraneClusterNodePrediction) {
if err := (&cnp.ClusterNodePredictionController{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Expand All @@ -313,3 +310,36 @@ func initializationControllers(ctx context.Context, mgr ctrl.Manager, opts *opti
}
}
}

func runAll(ctx context.Context, mgr ctrl.Manager, opts *options.Options) {
var eg errgroup.Group

eg.Go(func() error {
if err := mgr.Start(ctx); err != nil {
klog.Error(err, "problem running crane manager")
klog.Exit(err)
}
return nil
})

eg.Go(func() error {
// Start the craned web server
serverConfig := serverconfig.NewServerConfig()
if err := opts.ApplyTo(serverConfig); err != nil {
klog.Exit(err)
}
// use controller runtime rest config, we can not refer kubeconfig option directly because it is unexported variable in vendor/sigs.k8s.io/controller-runtime/pkg/client/config/config.go
serverConfig.KubeRestConfig = mgr.GetConfig()
craneServer, err := server.NewServer(serverConfig)
if err != nil {
klog.Exit(err)
}
craneServer.Run(ctx)
return nil
})

// wait for all components exit
if err := eg.Wait(); err != nil {
klog.Fatal(err)
}
}
Loading

0 comments on commit ba6b1c4

Please sign in to comment.