Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support remote adapter for metric-adapter #108

Merged
merged 1 commit into from
Jan 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cmd/craned/app/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,11 @@ func NewManagerCommand(ctx context.Context) *cobra.Command {

// Run runs the craned with options. This should never exit.
func Run(ctx context.Context, opts *options.Options) error {
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
config := ctrl.GetConfigOrDie()
config.QPS = float32(opts.ApiQps)
config.Burst = opts.ApiBurst

mgr, err := ctrl.NewManager(config, ctrl.Options{
Scheme: scheme,
MetricsBindAddress: opts.MetricsAddr,
Port: 9443,
Expand Down
6 changes: 6 additions & 0 deletions cmd/craned/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (

// Options hold the command-line options about crane manager
type Options struct {
// ApiQps for rest client
ApiQps int
// ApiBurst for rest client
ApiBurst int
// LeaderElection hold the configurations for manager leader election.
LeaderElection componentbaseconfig.LeaderElectionConfiguration
// MetricsAddr is The address the metric endpoint binds to.
Expand Down Expand Up @@ -56,6 +60,8 @@ func (o *Options) Validate() error {

// AddFlags adds flags to the specified FlagSet.
func (o *Options) AddFlags(flags *pflag.FlagSet) {
flags.IntVar(&o.ApiQps, "api-qps", 300, "QPS of rest config.")
flags.IntVar(&o.ApiBurst, "api-burst", 400, "Burst of rest config.")
flags.StringVar(&o.MetricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flags.StringVar(&o.BindAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flags.BoolVar(&o.LeaderElection.LeaderElect, "leader-elect", true, "Start a leader election client and gain leadership before executing the main loop. Enable this when running replicated components for high availability.")
Expand Down
55 changes: 40 additions & 15 deletions cmd/metric-adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/component-base/logs"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -49,18 +50,7 @@ type MetricAdapter struct {
Message string
}

func (a *MetricAdapter) makeProvider() (provider.CustomMetricsProvider, error) {
config, err := ctrl.GetConfig()
if err != nil {
return nil, fmt.Errorf("failed to get config: %v", err)
}

clientOptions := client.Options{Scheme: scheme}
client, err := client.New(config, clientOptions)
if err != nil {
return nil, fmt.Errorf("failed to new client: %v", err)
}

func (a *MetricAdapter) makeProvider(remoteAdapter *metricprovider.RemoteAdapter, config *rest.Config, client client.Client) (provider.CustomMetricsProvider, error) {
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("unable to create kube client: %v", err)
Expand All @@ -72,7 +62,7 @@ func (a *MetricAdapter) makeProvider() (provider.CustomMetricsProvider, error) {
})
recorder := broadcaster.NewRecorder(scheme, corev1.EventSource{Component: "crane-metric-adapter"})

return metricprovider.NewMetricProvider(client, recorder), nil
return metricprovider.NewMetricProvider(client, remoteAdapter, recorder), nil
}

func main() {
Expand All @@ -85,13 +75,49 @@ func main() {
cmd.OpenAPIConfig.Info.Title = "crane-metric-adapter"
cmd.OpenAPIConfig.Info.Version = "1.0.0"

var enableRemoteAdapter bool
var remoteAdapterServiceNamespace string
var remoteAdapterServiceName string
var remoteAdapterServicePort int
var apiQps int
var apiBurst int

cmd.Flags().StringVar(&cmd.Message, "msg", "Starting adapter...", "startup message")
cmd.Flags().BoolVar(&enableRemoteAdapter, "remote-adapter", false, "Enable a remote adapter to provide a set of custom metrics")
cmd.Flags().StringVar(&remoteAdapterServiceNamespace, "remote-adapter-service-namespace", "", "Namespace of remote adapter's service")
cmd.Flags().StringVar(&remoteAdapterServiceName, "remote-adapter-service-name", "", "Name of remote adapter's service")
cmd.Flags().IntVar(&remoteAdapterServicePort, "remote-adapter-service-port", 6443, "Port of remote adapter's service")
cmd.Flags().IntVar(&apiQps, "api-qps", 300, "QPS of rest config.")
cmd.Flags().IntVar(&apiBurst, "api-burst", 400, "Burst of rest config.")
cmd.Flags().AddGoFlagSet(flag.CommandLine) // make sure we get the klog flags
if err := cmd.Flags().Parse(os.Args); err != nil {
return
}

metricProvider, err := cmd.makeProvider()
config, err := ctrl.GetConfig()
if err != nil {
klog.Exitf("Failed to get config: %v", err)
}

config.QPS = float32(apiQps)
config.Burst = apiBurst

clientOptions := client.Options{Scheme: scheme}
client, err := client.New(config, clientOptions)
if err != nil {
klog.Exitf("Failed to get client: %v", err)
}

var remoteAdapter *metricprovider.RemoteAdapter
if enableRemoteAdapter {
klog.Infof("Enable remote adapter: %s/%s", remoteAdapterServiceNamespace, remoteAdapterServiceName)
remoteAdapter, err = metricprovider.NewRemoteAdapter(remoteAdapterServiceNamespace, remoteAdapterServiceName, remoteAdapterServicePort, config, client)
if err != nil {
klog.Exitf("Failed to create remote adapter: %v", err)
}
}

metricProvider, err := cmd.makeProvider(remoteAdapter, config, client)
if err != nil {
klog.Error(err, "Failed to make provider")
os.Exit(1)
Expand All @@ -103,5 +129,4 @@ func main() {
klog.Error(err, "Failed to run metrics adapter")
os.Exit(1)
}

}
6 changes: 6 additions & 0 deletions deploy/metric-adapter/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ rules:
- patch
- update
- watch
- apiGroups:
- "custom.metrics.k8s.io"
resources:
- '*'
verbs:
- '*'
---

apiVersion: rbac.authorization.k8s.io/v1
Expand Down
56 changes: 49 additions & 7 deletions pkg/metricprovider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,46 @@ var _ provider.CustomMetricsProvider = &MetricProvider{}

// MetricProvider is an implementation of provider.MetricsProvider which provide predictive metric for resource
type MetricProvider struct {
client client.Client
recorder record.EventRecorder
client client.Client
remoteAdapter *RemoteAdapter
recorder record.EventRecorder
}

// NewMetricProvider returns an instance of metricProvider
func NewMetricProvider(client client.Client, recorder record.EventRecorder) provider.CustomMetricsProvider {
func NewMetricProvider(client client.Client, remoteAdapter *RemoteAdapter, recorder record.EventRecorder) provider.CustomMetricsProvider {
provider := &MetricProvider{
client: client,
recorder: recorder,
client: client,
remoteAdapter: remoteAdapter,
recorder: recorder,
}
return provider
}

func (p *MetricProvider) GetMetricByName(ctx context.Context, name types.NamespacedName, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValue, error) {
klog.Info(fmt.Sprintf("Get metric by name for custom metric, GroupResource %s namespacedName %s metric %s metricSelector %s", info.GroupResource.String(), name.String(), info.Metric, metricSelector.String()))

if !IsLocalMetric(info) {
if p.remoteAdapter != nil {
return p.remoteAdapter.GetMetricByName(ctx, name, info, metricSelector)
} else {
return nil, apiErrors.NewServiceUnavailable("not supported")
}
}

return nil, apiErrors.NewServiceUnavailable("not supported")
}

// GetMetricBySelector fetches metric for pod resources, get predictive metric from giving selector
func (p *MetricProvider) GetMetricBySelector(ctx context.Context, namespace string, selector labels.Selector, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValueList, error) {
klog.Info(fmt.Sprintf("Get metric for custom metric, GroupResource %s namespace %s metric %s selector %s metricSelector %s", info.GroupResource.String(), namespace, info.Metric, selector.String(), metricSelector.String()))
klog.Info(fmt.Sprintf("Get metric by selector for custom metric, Info %v namespace %s selector %s metricSelector %s", info, namespace, selector.String(), metricSelector.String()))

if !IsLocalMetric(info) {
if p.remoteAdapter != nil {
return p.remoteAdapter.GetMetricBySelector(ctx, namespace, selector, info, metricSelector)
} else {
return nil, apiErrors.NewServiceUnavailable("not supported")
}
}

var matchingMetrics []custom_metrics.MetricValue
prediction, err := p.GetPrediction(ctx, namespace, metricSelector)
Expand Down Expand Up @@ -119,7 +139,7 @@ func (p *MetricProvider) GetMetricBySelector(ctx context.Context, namespace stri

averageValue := int64(math.Round(largestMetricValue.value * 1000 / float64(len(readyPods))))

klog.Info("Provide custom metric %s average value %f.", info.Metric, float64(averageValue)/1000)
klog.Infof("Provide custom metric %s average value %f.", info.Metric, float64(averageValue)/1000)

for name := range readyPods {
metric := custom_metrics.MetricValue{
Expand Down Expand Up @@ -153,6 +173,16 @@ func (p *MetricProvider) GetMetricBySelector(ctx context.Context, namespace stri
func (p *MetricProvider) ListAllMetrics() []provider.CustomMetricInfo {
klog.Info("List all custom metrics")

metricInfos := ListAllLocalMetrics()

if p.remoteAdapter != nil {
metricInfos = append(metricInfos, p.remoteAdapter.ListAllMetrics()...)
}

return metricInfos
}

func ListAllLocalMetrics() []provider.CustomMetricInfo {
return []provider.CustomMetricInfo{
{
GroupResource: schema.GroupResource{Group: "", Resource: "pods"},
Expand All @@ -167,6 +197,18 @@ func (p *MetricProvider) ListAllMetrics() []provider.CustomMetricInfo {
}
}

func IsLocalMetric(metricInfo provider.CustomMetricInfo) bool {
for _, info := range ListAllLocalMetrics() {
if info.Namespaced == metricInfo.Namespaced &&
info.Metric == metricInfo.Metric &&
info.GroupResource.String() == metricInfo.GroupResource.String() {
return true
}
}

return false
}

func (p *MetricProvider) GetPrediction(ctx context.Context, namespace string, metricSelector labels.Selector) (*predictionapi.TimeSeriesPrediction, error) {
labelSelector, err := labels.ConvertSelectorToLabelsMap(metricSelector.String())
if err != nil {
Expand Down
Loading